Skip to main content

corpora_engine/
engine.rs

1//! Single-threaded, deterministic FIFO bus. Each event is dispatched to every interested
2//! subscriber in registration order; their emissions are collected then appended, so the
3//! output is reproducible — the property CI and `report --check` rely on.
4
5use std::collections::VecDeque;
6use std::sync::Arc;
7
8use corpora_core::{Config, Context, Event, RunSummary, Severity, Subscriber};
9
10pub struct Engine {
11    subs: Vec<Box<dyn Subscriber>>,
12    cfg: Arc<Config>,
13}
14
15impl Engine {
16    pub fn new(cfg: Config) -> Self {
17        Engine {
18            subs: Vec::new(),
19            cfg: Arc::new(cfg),
20        }
21    }
22
23    pub fn register(&mut self, s: impl Subscriber + 'static) -> &mut Self {
24        self.subs.push(Box::new(s));
25        self
26    }
27
28    pub fn run(&mut self, seed: Event) -> RunSummary {
29        let cfg = self.cfg.clone();
30        let mut queue: VecDeque<Event> = VecDeque::from([seed]);
31        let mut errors = 0usize;
32        let mut warnings = 0usize;
33        let mut settled = false;
34
35        loop {
36            let ev = match queue.pop_front() {
37                Some(ev) => ev,
38                // The discover→parse cascade has fully drained: every Parsed has been seen
39                // by the GraphBuilder. Signal the build exactly once. (`Settled` is the
40                // engine's quiescence barrier, not the source's job — otherwise it races
41                // ahead of the Parsed events a Discovered later produces. Watch mode will
42                // re-arm this per change-batch in Phase 5.)
43                None if !settled => {
44                    settled = true;
45                    Event::Settled
46                }
47                None => break,
48            };
49
50            if let Event::Diagnostic(d) = &ev {
51                match d.severity {
52                    Severity::Error => errors += 1,
53                    Severity::Warning => warnings += 1,
54                }
55            }
56
57            let mut out: VecDeque<Event> = VecDeque::new();
58            for s in self.subs.iter_mut() {
59                if s.interest().wants(&ev) {
60                    let mut cx = Context::new(&cfg, &mut out);
61                    s.handle(&ev, &mut cx);
62                }
63            }
64            queue.extend(out);
65        }
66
67        RunSummary { errors, warnings }
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74    use crate::graph_builder::GraphBuilder;
75    use crate::source::{Source, SourceDriver, VecSource};
76    use corpora_core::subscriber::ek;
77    use corpora_core::*;
78
79    fn rec(id: &str, path: &str) -> Record {
80        Record::minimal(
81            Some(Id(id.into())),
82            DocPath(path.into()),
83            Kind::Decision,
84            Lifecycle::Current,
85            Authority::Normative,
86            Facet::Narrative,
87        )
88    }
89
90    #[test]
91    fn pipeline_runs_and_tallies_graph_diagnostics() {
92        // Two records, same id, different paths → GraphBuilder's build emits a DUP error.
93        let recs = vec![Arc::new(rec("D1", "a.md")), Arc::new(rec("D1", "b.md"))];
94        let mut e = Engine::new(Config::new("."));
95        e.register(SourceDriver::new(Box::new(VecSource::new(recs))))
96            .register(GraphBuilder::new());
97        let s = e.run(Event::RunStarted {
98            run: RunId(1),
99            mode: Mode::Check,
100        });
101        assert_eq!(s.errors, 1);
102    }
103
104    /// A source that only *discovers* paths; parsing happens in a later dispatch.
105    struct DiscoverSource(Vec<&'static str>);
106    impl Source for DiscoverSource {
107        fn scan(&mut self, cx: &mut Context<'_>) {
108            for p in &self.0 {
109                cx.emit(Event::Discovered(DocPath((*p).into())));
110            }
111        }
112    }
113
114    /// Stand-in for the real parser: turns each Discovered into a Parsed one dispatch later.
115    struct DeferredParser;
116    impl Subscriber for DeferredParser {
117        fn name(&self) -> &'static str {
118            "deferred-parser"
119        }
120        fn interest(&self) -> Interest {
121            Interest::only(ek::DISCOVERED)
122        }
123        fn handle(&mut self, ev: &Event, cx: &mut Context<'_>) {
124            if let Event::Discovered(p) = ev {
125                let r = Record::minimal(
126                    Some(Id("D1".into())),
127                    p.clone(),
128                    Kind::Decision,
129                    Lifecycle::Current,
130                    Authority::Normative,
131                    Facet::Narrative,
132                );
133                cx.emit(Event::Parsed(Arc::new(r)));
134            }
135        }
136    }
137
138    #[test]
139    fn settled_waits_for_deferred_parses() {
140        // Two files discovered, parsed a dispatch later, sharing an id. The DUP only fires
141        // if both records reached the graph — i.e. the engine injected Settled *after* the
142        // discover→parse cascade drained, not when the source returned. (Regression.)
143        let mut e = Engine::new(Config::new("."));
144        e.register(SourceDriver::new(Box::new(DiscoverSource(vec!["a.md", "b.md"]))))
145            .register(DeferredParser)
146            .register(GraphBuilder::new());
147        let s = e.run(Event::RunStarted {
148            run: RunId(1),
149            mode: Mode::Check,
150        });
151        assert_eq!(s.errors, 1, "both deferred-parsed records must reach the graph");
152    }
153}