pub mod base;
pub use base::Document;
#[cfg(all(feature = "source", feature = "sink"))]
use anyhow::anyhow;
#[cfg(feature = "source")]
use anyhow::Result;
#[cfg(all(feature = "source", feature = "sink"))]
use crate::config::SourceConfig;
#[cfg(feature = "source")]
pub mod files;
#[cfg(feature = "source")]
pub mod http;
#[cfg(feature = "source")]
pub mod json_corpus;
#[cfg(feature = "source")]
pub mod s3;
#[cfg(feature = "source")]
pub use files::FilesSource;
#[cfg(feature = "source")]
pub use http::HttpSource;
#[cfg(feature = "source")]
pub use json_corpus::JsonCorpusSource;
#[cfg(feature = "source")]
pub use s3::S3Source;
#[cfg(all(feature = "source", feature = "sink"))]
pub mod clickhouse_table;
#[cfg(all(feature = "source", feature = "sink"))]
pub mod mariadb_table;
#[cfg(all(feature = "source", feature = "sink"))]
pub mod pg_table;
#[cfg(all(feature = "source", feature = "sink"))]
pub mod sqlite_table;
#[cfg(all(feature = "source", feature = "sink"))]
pub use clickhouse_table::ClickhouseTableSource;
#[cfg(all(feature = "source", feature = "sink"))]
pub use mariadb_table::MariadbTableSource;
#[cfg(all(feature = "source", feature = "sink"))]
pub use pg_table::PgTableSource;
#[cfg(all(feature = "source", feature = "sink"))]
pub use sqlite_table::SqliteTableSource;
#[cfg(all(feature = "source", feature = "sink"))]
pub enum AnySource {
Files(FilesSource),
JsonCorpus(JsonCorpusSource),
PgTable(PgTableSource),
MariadbTable(MariadbTableSource),
SqliteTable(SqliteTableSource),
Http(HttpSource),
S3(S3Source),
ClickhouseTable(ClickhouseTableSource),
}
#[cfg(all(feature = "source", feature = "sink"))]
impl AnySource {
pub async fn iter_documents(&self) -> Result<Vec<Document>> {
match self {
AnySource::Files(s) => s.iter_documents(),
AnySource::JsonCorpus(s) => s.iter_documents(),
AnySource::PgTable(s) => s.iter_documents().await,
AnySource::MariadbTable(s) => s.iter_documents().await,
AnySource::SqliteTable(s) => s.iter_documents().await,
AnySource::Http(s) => s.iter_documents().await,
AnySource::S3(s) => s.iter_documents().await,
AnySource::ClickhouseTable(s) => s.iter_documents().await,
}
}
}
#[cfg(all(feature = "source", feature = "sink"))]
pub fn load_source(cfg: &SourceConfig) -> Result<AnySource> {
match cfg {
SourceConfig::Files(c) => Ok(AnySource::Files(FilesSource::new(c.clone()))),
SourceConfig::JsonCorpus(c) => Ok(AnySource::JsonCorpus(JsonCorpusSource::new(c.clone()))),
SourceConfig::PgTable(c) => Ok(AnySource::PgTable(PgTableSource::new(c.clone()))),
SourceConfig::MariadbTable(c) => {
Ok(AnySource::MariadbTable(MariadbTableSource::new(c.clone())))
}
SourceConfig::SqliteTable(c) => {
Ok(AnySource::SqliteTable(SqliteTableSource::new(c.clone())))
}
SourceConfig::Http(c) => Ok(AnySource::Http(HttpSource::new(c.clone()))),
SourceConfig::S3(c) => Ok(AnySource::S3(S3Source::new(c.clone()))),
SourceConfig::ClickhouseTable(c) => Ok(AnySource::ClickhouseTable(
ClickhouseTableSource::new(c.clone()),
)),
SourceConfig::Inline(_) => Err(anyhow!(
"inline source is not used via load_source — Pipeline::new handles it directly"
)),
}
}