Skip to main content

chunkshop/sources/
mod.rs

1//! Sources — input document iterators per backing store.
2//!
3//! The `Document` struct (in `base`) is always available — chunkers consume
4//! `&Document` regardless of which features are enabled. The fetcher impls are
5//! gated:
6//! - file / HTTP / S3 / json_corpus fetchers require the `source` feature.
7//! - DB-table sources (PG / MariaDB / SQLite / ClickHouse) additionally reuse
8//!   the backend connection layer, so they require `all(source, sink)`.
9
10// `Document` is always available.
11pub mod base;
12pub use base::Document;
13
14#[cfg(all(feature = "source", feature = "sink"))]
15use anyhow::anyhow;
16#[cfg(feature = "source")]
17use anyhow::Result;
18
19#[cfg(all(feature = "source", feature = "sink"))]
20use crate::config::SourceConfig;
21
22#[cfg(feature = "source")]
23pub mod files;
24#[cfg(feature = "source")]
25pub mod http;
26#[cfg(feature = "source")]
27pub mod json_corpus;
28#[cfg(feature = "source")]
29pub mod s3;
30
31#[cfg(feature = "source")]
32pub use files::FilesSource;
33#[cfg(feature = "source")]
34pub use http::HttpSource;
35#[cfg(feature = "source")]
36pub use json_corpus::JsonCorpusSource;
37#[cfg(feature = "source")]
38pub use s3::S3Source;
39
40// DB-table sources reuse `crate::backends::*`, which is `sink`-gated.
41#[cfg(all(feature = "source", feature = "sink"))]
42pub mod clickhouse_table;
43#[cfg(all(feature = "source", feature = "sink"))]
44pub mod mariadb_table;
45#[cfg(all(feature = "source", feature = "sink"))]
46pub mod pg_table;
47#[cfg(all(feature = "source", feature = "sink"))]
48pub mod sqlite_table;
49// RM-A Task 5: agent-memory staging-table source. Lives behind the `memory`
50// feature (which itself requires `source` + `sink`).
51#[cfg(feature = "memory")]
52pub mod session_staging;
53
54#[cfg(all(feature = "source", feature = "sink"))]
55pub use clickhouse_table::ClickhouseTableSource;
56#[cfg(all(feature = "source", feature = "sink"))]
57pub use mariadb_table::MariadbTableSource;
58#[cfg(all(feature = "source", feature = "sink"))]
59pub use pg_table::PgTableSource;
60#[cfg(feature = "memory")]
61pub use session_staging::SessionStagingSource;
62#[cfg(all(feature = "source", feature = "sink"))]
63pub use sqlite_table::SqliteTableSource;
64
65/// Sum type for runtime polymorphism. R1 shipped the original 5 sources.
66/// R2/R3/R4 added MariadbTable, SqliteTable, ClickhouseTable respectively.
67///
68/// `AnySource` enumerates DB-table variants, which require the backend layer.
69/// It is therefore only built when both `source` and `sink` are enabled —
70/// the same gate that `run_cell` / `Pipeline` live behind.
71#[cfg(all(feature = "source", feature = "sink"))]
72pub enum AnySource {
73    Files(FilesSource),
74    JsonCorpus(JsonCorpusSource),
75    PgTable(PgTableSource),
76    MariadbTable(MariadbTableSource),
77    SqliteTable(SqliteTableSource),
78    Http(HttpSource),
79    S3(S3Source),
80    ClickhouseTable(ClickhouseTableSource),
81    #[cfg(feature = "memory")]
82    SessionStaging(SessionStagingSource),
83}
84
85#[cfg(all(feature = "source", feature = "sink"))]
86impl AnySource {
87    pub async fn iter_documents(&self) -> Result<Vec<Document>> {
88        match self {
89            AnySource::Files(s) => s.iter_documents(),
90            AnySource::JsonCorpus(s) => s.iter_documents(),
91            AnySource::PgTable(s) => s.iter_documents().await,
92            AnySource::MariadbTable(s) => s.iter_documents().await,
93            AnySource::SqliteTable(s) => s.iter_documents().await,
94            AnySource::Http(s) => s.iter_documents().await,
95            AnySource::S3(s) => s.iter_documents().await,
96            AnySource::ClickhouseTable(s) => s.iter_documents().await,
97            #[cfg(feature = "memory")]
98            AnySource::SessionStaging(s) => s.iter_documents().await,
99        }
100    }
101
102    /// RM-A O3 crash-safety: runner calls this AFTER the per-doc write
103    /// loop succeeds. For `SessionStaging` (the only source that has
104    /// post-iteration state today) it advances the `consumed` watermark
105    /// for the sessions just emitted. Non-memory sources have no
106    /// post-iteration state — this is a no-op for them.
107    pub async fn commit_processed(&self) -> Result<()> {
108        match self {
109            #[cfg(feature = "memory")]
110            AnySource::SessionStaging(s) => s.commit_processed().await,
111            _ => Ok(()),
112        }
113    }
114}
115
116#[cfg(all(feature = "source", feature = "sink"))]
117pub fn load_source(cfg: &SourceConfig) -> Result<AnySource> {
118    match cfg {
119        SourceConfig::Files(c) => Ok(AnySource::Files(FilesSource::new(c.clone()))),
120        SourceConfig::JsonCorpus(c) => Ok(AnySource::JsonCorpus(JsonCorpusSource::new(c.clone()))),
121        SourceConfig::PgTable(c) => Ok(AnySource::PgTable(PgTableSource::new(c.clone()))),
122        SourceConfig::MariadbTable(c) => {
123            Ok(AnySource::MariadbTable(MariadbTableSource::new(c.clone())))
124        }
125        SourceConfig::SqliteTable(c) => {
126            Ok(AnySource::SqliteTable(SqliteTableSource::new(c.clone())))
127        }
128        SourceConfig::Http(c) => Ok(AnySource::Http(HttpSource::new(c.clone()))),
129        SourceConfig::S3(c) => Ok(AnySource::S3(S3Source::new(c.clone()))),
130        SourceConfig::ClickhouseTable(c) => Ok(AnySource::ClickhouseTable(
131            ClickhouseTableSource::new(c.clone()),
132        )),
133        #[cfg(feature = "memory")]
134        SourceConfig::SessionStaging(c) => Ok(AnySource::SessionStaging(
135            SessionStagingSource::new(c.clone()),
136        )),
137        #[cfg(not(feature = "memory"))]
138        SourceConfig::SessionStaging(_) => Err(anyhow!(
139            "session_staging source requires the `memory` feature (RM-A; chunkshop#9)"
140        )),
141        SourceConfig::Inline(_) => Err(anyhow!(
142            "inline source is not used via load_source — Pipeline::new handles it directly"
143        )),
144    }
145}