corpora-engine 0.1.0

Bus orchestrator and IO edges (parser, fs walk, writer, git oracle) for corpora.
Documentation
//! Single-threaded, deterministic FIFO bus. Each event is dispatched to every interested
//! subscriber in registration order; their emissions are collected then appended, so the
//! output is reproducible — the property CI and `report --check` rely on.

use std::collections::VecDeque;
use std::sync::Arc;

use corpora_core::{Config, Context, Event, RunSummary, Severity, Subscriber};

pub struct Engine {
    subs: Vec<Box<dyn Subscriber>>,
    cfg: Arc<Config>,
}

impl Engine {
    pub fn new(cfg: Config) -> Self {
        Engine {
            subs: Vec::new(),
            cfg: Arc::new(cfg),
        }
    }

    pub fn register(&mut self, s: impl Subscriber + 'static) -> &mut Self {
        self.subs.push(Box::new(s));
        self
    }

    pub fn run(&mut self, seed: Event) -> RunSummary {
        let cfg = self.cfg.clone();
        let mut queue: VecDeque<Event> = VecDeque::from([seed]);
        let mut errors = 0usize;
        let mut warnings = 0usize;
        let mut settled = false;

        loop {
            let ev = match queue.pop_front() {
                Some(ev) => ev,
                // The discover→parse cascade has fully drained: every Parsed has been seen
                // by the GraphBuilder. Signal the build exactly once. (`Settled` is the
                // engine's quiescence barrier, not the source's job — otherwise it races
                // ahead of the Parsed events a Discovered later produces. Watch mode will
                // re-arm this per change-batch in Phase 5.)
                None if !settled => {
                    settled = true;
                    Event::Settled
                }
                None => break,
            };

            if let Event::Diagnostic(d) = &ev {
                match d.severity {
                    Severity::Error => errors += 1,
                    Severity::Warning => warnings += 1,
                }
            }

            let mut out: VecDeque<Event> = VecDeque::new();
            for s in self.subs.iter_mut() {
                if s.interest().wants(&ev) {
                    let mut cx = Context::new(&cfg, &mut out);
                    s.handle(&ev, &mut cx);
                }
            }
            queue.extend(out);
        }

        RunSummary { errors, warnings }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::graph_builder::GraphBuilder;
    use crate::source::{Source, SourceDriver, VecSource};
    use corpora_core::subscriber::ek;
    use corpora_core::*;

    fn rec(id: &str, path: &str) -> Record {
        Record::minimal(
            Some(Id(id.into())),
            DocPath(path.into()),
            Kind::Decision,
            Lifecycle::Current,
            Authority::Normative,
            Facet::Narrative,
        )
    }

    #[test]
    fn pipeline_runs_and_tallies_graph_diagnostics() {
        // Two records, same id, different paths → GraphBuilder's build emits a DUP error.
        let recs = vec![Arc::new(rec("D1", "a.md")), Arc::new(rec("D1", "b.md"))];
        let mut e = Engine::new(Config::new("."));
        e.register(SourceDriver::new(Box::new(VecSource::new(recs))))
            .register(GraphBuilder::new());
        let s = e.run(Event::RunStarted {
            run: RunId(1),
            mode: Mode::Check,
        });
        assert_eq!(s.errors, 1);
    }

    /// A source that only *discovers* paths; parsing happens in a later dispatch.
    struct DiscoverSource(Vec<&'static str>);
    impl Source for DiscoverSource {
        fn scan(&mut self, cx: &mut Context<'_>) {
            for p in &self.0 {
                cx.emit(Event::Discovered(DocPath((*p).into())));
            }
        }
    }

    /// Stand-in for the real parser: turns each Discovered into a Parsed one dispatch later.
    struct DeferredParser;
    impl Subscriber for DeferredParser {
        fn name(&self) -> &'static str {
            "deferred-parser"
        }
        fn interest(&self) -> Interest {
            Interest::only(ek::DISCOVERED)
        }
        fn handle(&mut self, ev: &Event, cx: &mut Context<'_>) {
            if let Event::Discovered(p) = ev {
                let r = Record::minimal(
                    Some(Id("D1".into())),
                    p.clone(),
                    Kind::Decision,
                    Lifecycle::Current,
                    Authority::Normative,
                    Facet::Narrative,
                );
                cx.emit(Event::Parsed(Arc::new(r)));
            }
        }
    }

    #[test]
    fn settled_waits_for_deferred_parses() {
        // Two files discovered, parsed a dispatch later, sharing an id. The DUP only fires
        // if both records reached the graph — i.e. the engine injected Settled *after* the
        // discover→parse cascade drained, not when the source returned. (Regression.)
        let mut e = Engine::new(Config::new("."));
        e.register(SourceDriver::new(Box::new(DiscoverSource(vec!["a.md", "b.md"]))))
            .register(DeferredParser)
            .register(GraphBuilder::new());
        let s = e.run(Event::RunStarted {
            run: RunId(1),
            mode: Mode::Check,
        });
        assert_eq!(s.errors, 1, "both deferred-parsed records must reach the graph");
    }
}