Skip to main content

rivet_cli/source/
mod.rs

1pub mod mysql;
2pub mod postgres;
3
4use arrow::datatypes::SchemaRef;
5use arrow::record_batch::RecordBatch;
6
7use crate::config::SourceConfig;
8use crate::error::Result;
9use crate::tuning::SourceTuning;
10use crate::types::CursorState;
11
12/// Receives schema and batches from a source, one at a time.
13pub trait BatchSink {
14    fn on_schema(&mut self, schema: SchemaRef) -> Result<()>;
15    fn on_batch(&mut self, batch: &RecordBatch) -> Result<()>;
16}
17
18pub trait Source: Send {
19    fn export(
20        &mut self,
21        query: &str,
22        cursor_column: Option<&str>,
23        cursor: Option<&CursorState>,
24        tuning: &SourceTuning,
25        sink: &mut dyn BatchSink,
26    ) -> Result<()>;
27
28    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>>;
29}
30
31pub fn create_source(config: &SourceConfig) -> Result<Box<dyn Source>> {
32    use crate::config::SourceType;
33    let url = config.resolve_url()?;
34    match config.source_type {
35        SourceType::Postgres => Ok(Box::new(postgres::PostgresSource::connect(&url)?)),
36        SourceType::Mysql => Ok(Box::new(mysql::MysqlSource::connect(&url)?)),
37    }
38}