Skip to main content

corpora_engine/
source.rs

1//! Plugin seam #0 — where records come from. [`SourceDriver`] adapts a [`Source`] onto
2//! the bus: scan on `RunStarted`, re-discover on `Changed`.
3
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use corpora_core::subscriber::ek;
9use corpora_core::{Context, DocPath, Event, Interest, Record, Subscriber};
10
11pub trait Source {
12    fn scan(&mut self, cx: &mut Context<'_>);
13}
14
15pub struct SourceDriver {
16    inner: Box<dyn Source>,
17}
18
19impl SourceDriver {
20    pub fn new(inner: Box<dyn Source>) -> Self {
21        SourceDriver { inner }
22    }
23}
24
25impl Subscriber for SourceDriver {
26    fn name(&self) -> &'static str {
27        "source"
28    }
29
30    fn interest(&self) -> Interest {
31        Interest::only(ek::RUN_STARTED | ek::CHANGED)
32    }
33
34    fn handle(&mut self, ev: &Event, cx: &mut Context<'_>) {
35        match ev {
36            // Scan emits Discovered/Parsed; the engine injects `Settled` once the resulting
37            // cascade drains, so the graph builds only after every record has arrived.
38            Event::RunStarted { .. } => self.inner.scan(cx),
39            Event::Changed(p) => cx.emit(Event::Discovered(p.clone())),
40            _ => {}
41        }
42    }
43}
44
45/// Walks `root` recursively for `*.md` files, emitting `Discovered` in sorted order.
46pub struct FsWalk {
47    root: PathBuf,
48}
49
50impl FsWalk {
51    pub fn new(root: impl Into<PathBuf>) -> Self {
52        FsWalk { root: root.into() }
53    }
54}
55
56impl Source for FsWalk {
57    fn scan(&mut self, cx: &mut Context<'_>) {
58        let mut paths = Vec::new();
59        walk(&self.root, &mut paths);
60        paths.sort(); // deterministic discovery order
61        for p in paths {
62            cx.emit(Event::Discovered(DocPath(p)));
63        }
64    }
65}
66
67fn walk(dir: &Path, out: &mut Vec<String>) {
68    let Ok(entries) = fs::read_dir(dir) else {
69        return;
70    };
71    for e in entries.flatten() {
72        let path = e.path();
73        if path.is_dir() {
74            walk(&path, out);
75        } else if path.extension().and_then(|x| x.to_str()) == Some("md") {
76            if let Some(s) = path.to_str() {
77                out.push(s.to_string());
78            }
79        }
80    }
81}
82
83/// In-memory source of pre-built records — emits `Parsed` directly, bypassing the parser.
84/// For tests and embedding the engine without a filesystem.
85pub struct VecSource {
86    records: Vec<Arc<Record>>,
87}
88
89impl VecSource {
90    pub fn new(records: Vec<Arc<Record>>) -> Self {
91        VecSource { records }
92    }
93}
94
95impl Source for VecSource {
96    fn scan(&mut self, cx: &mut Context<'_>) {
97        for r in &self.records {
98            cx.emit(Event::Parsed(r.clone()));
99        }
100    }
101}