1pub mod base;
12pub use base::Document;
13
14#[cfg(all(feature = "source", feature = "sink"))]
15use anyhow::anyhow;
16#[cfg(feature = "source")]
17use anyhow::Result;
18
19#[cfg(all(feature = "source", feature = "sink"))]
20use crate::config::SourceConfig;
21
22#[cfg(feature = "source")]
23pub mod files;
24#[cfg(feature = "source")]
25pub mod http;
26#[cfg(feature = "source")]
27pub mod json_corpus;
28#[cfg(feature = "source")]
29pub mod s3;
30
31#[cfg(feature = "source")]
32pub use files::FilesSource;
33#[cfg(feature = "source")]
34pub use http::HttpSource;
35#[cfg(feature = "source")]
36pub use json_corpus::JsonCorpusSource;
37#[cfg(feature = "source")]
38pub use s3::S3Source;
39
40#[cfg(all(feature = "source", feature = "sink"))]
42pub mod clickhouse_table;
43#[cfg(all(feature = "source", feature = "sink"))]
44pub mod mariadb_table;
45#[cfg(all(feature = "source", feature = "sink"))]
46pub mod pg_table;
47#[cfg(all(feature = "source", feature = "sink"))]
48pub mod sqlite_table;
49#[cfg(feature = "memory")]
52pub mod session_staging;
53
54#[cfg(all(feature = "source", feature = "sink"))]
55pub use clickhouse_table::ClickhouseTableSource;
56#[cfg(all(feature = "source", feature = "sink"))]
57pub use mariadb_table::MariadbTableSource;
58#[cfg(all(feature = "source", feature = "sink"))]
59pub use pg_table::PgTableSource;
60#[cfg(feature = "memory")]
61pub use session_staging::SessionStagingSource;
62#[cfg(all(feature = "source", feature = "sink"))]
63pub use sqlite_table::SqliteTableSource;
64
65#[cfg(all(feature = "source", feature = "sink"))]
72pub enum AnySource {
73 Files(FilesSource),
74 JsonCorpus(JsonCorpusSource),
75 PgTable(PgTableSource),
76 MariadbTable(MariadbTableSource),
77 SqliteTable(SqliteTableSource),
78 Http(HttpSource),
79 S3(S3Source),
80 ClickhouseTable(ClickhouseTableSource),
81 #[cfg(feature = "memory")]
82 SessionStaging(SessionStagingSource),
83}
84
85#[cfg(all(feature = "source", feature = "sink"))]
86impl AnySource {
87 pub async fn iter_documents(&self) -> Result<Vec<Document>> {
88 match self {
89 AnySource::Files(s) => s.iter_documents(),
90 AnySource::JsonCorpus(s) => s.iter_documents(),
91 AnySource::PgTable(s) => s.iter_documents().await,
92 AnySource::MariadbTable(s) => s.iter_documents().await,
93 AnySource::SqliteTable(s) => s.iter_documents().await,
94 AnySource::Http(s) => s.iter_documents().await,
95 AnySource::S3(s) => s.iter_documents().await,
96 AnySource::ClickhouseTable(s) => s.iter_documents().await,
97 #[cfg(feature = "memory")]
98 AnySource::SessionStaging(s) => s.iter_documents().await,
99 }
100 }
101
102 pub async fn commit_processed(&self) -> Result<()> {
108 match self {
109 #[cfg(feature = "memory")]
110 AnySource::SessionStaging(s) => s.commit_processed().await,
111 _ => Ok(()),
112 }
113 }
114}
115
116#[cfg(all(feature = "source", feature = "sink"))]
117pub fn load_source(cfg: &SourceConfig) -> Result<AnySource> {
118 match cfg {
119 SourceConfig::Files(c) => Ok(AnySource::Files(FilesSource::new(c.clone()))),
120 SourceConfig::JsonCorpus(c) => Ok(AnySource::JsonCorpus(JsonCorpusSource::new(c.clone()))),
121 SourceConfig::PgTable(c) => Ok(AnySource::PgTable(PgTableSource::new(c.clone()))),
122 SourceConfig::MariadbTable(c) => {
123 Ok(AnySource::MariadbTable(MariadbTableSource::new(c.clone())))
124 }
125 SourceConfig::SqliteTable(c) => {
126 Ok(AnySource::SqliteTable(SqliteTableSource::new(c.clone())))
127 }
128 SourceConfig::Http(c) => Ok(AnySource::Http(HttpSource::new(c.clone()))),
129 SourceConfig::S3(c) => Ok(AnySource::S3(S3Source::new(c.clone()))),
130 SourceConfig::ClickhouseTable(c) => Ok(AnySource::ClickhouseTable(
131 ClickhouseTableSource::new(c.clone()),
132 )),
133 #[cfg(feature = "memory")]
134 SourceConfig::SessionStaging(c) => Ok(AnySource::SessionStaging(
135 SessionStagingSource::new(c.clone()),
136 )),
137 #[cfg(not(feature = "memory"))]
138 SourceConfig::SessionStaging(_) => Err(anyhow!(
139 "session_staging source requires the `memory` feature (RM-A; chunkshop#9)"
140 )),
141 SourceConfig::Inline(_) => Err(anyhow!(
142 "inline source is not used via load_source — Pipeline::new handles it directly"
143 )),
144 }
145}