Skip to main content

rivet/source/
mod.rs

1pub mod mysql;
2pub(crate) mod pg_numeric_wire;
3pub mod postgres;
4pub(crate) mod query;
5pub(crate) mod tls;
6
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9
10use crate::config::SourceConfig;
11use crate::error::Result;
12use crate::plan::IncrementalCursorPlan;
13use crate::tuning::SourceTuning;
14use crate::types::{ColumnOverrides, CursorState, TypeMapping};
15
16/// Summary of a source table relevant to chunked-mode planning. Source-neutral
17/// shape so plan-build can ask either Postgres or MySQL for the same answer.
18///
19/// Populated by `crate::source::postgres::introspect_pg_table_for_chunking` and
20/// `crate::source::mysql::introspect_mysql_table_for_chunking`. Both helpers
21/// rely on catalog stats (`pg_class` / `information_schema.TABLES`) so the
22/// numbers are only as fresh as the last `ANALYZE` / autoanalyse.
23#[derive(Debug, Clone, Default)]
24pub(crate) struct TableIntrospection {
25    /// Name of the single integer-family PK column, if present and safe to
26    /// range-chunk. `None` when the table has no PK, has a composite PK, or
27    /// the PK type is not an integer family (text, uuid, decimal, …).
28    pub single_int_pk: Option<String>,
29    /// Best-effort row count: PG `reltuples`, MySQL `TABLE_ROWS`. `0` means
30    /// the table is empty or stats are unavailable.
31    pub row_estimate: i64,
32    /// Heap-size-per-row in bytes. `None` for empty / unanalysed tables.
33    /// Used to convert `chunk_size_memory_mb` into a row count.
34    pub avg_row_bytes: Option<i64>,
35}
36
37/// Receives schema and batches from a source, one at a time.
38pub trait BatchSink {
39    fn on_schema(&mut self, schema: SchemaRef) -> Result<()>;
40    fn on_batch(&mut self, batch: &RecordBatch) -> Result<()>;
41}
42
43/// Read-only inputs for a single export call.
44///
45/// Packs the parameters that used to live as 5 positional args on
46/// `Source::export` into a named struct. `sink` is **not** part of this struct
47/// — it is `&mut` and conceptually the output channel, separate from the
48/// read-only request configuration.
49pub struct ExportRequest<'a> {
50    /// Already-materialized SQL (after `resolve_query`). The driver still wraps
51    /// it with the dialect-specific incremental predicate via
52    /// [`crate::source::query::build_incremental_query`] when `incremental` is set.
53    pub query: &'a str,
54    pub incremental: Option<&'a IncrementalCursorPlan>,
55    pub cursor: Option<&'a CursorState>,
56    pub tuning: &'a SourceTuning,
57    /// Per-column type declarations from `rivet.yaml` (`exports[].columns:`).
58    /// Drivers apply them during schema building so e.g. a `NUMERIC` column
59    /// without declared precision can still be exported as `Decimal128(18,2)`
60    /// when the user has stated the type explicitly.
61    pub column_overrides: &'a ColumnOverrides,
62}
63
64pub trait Source: Send {
65    /// Execute `request.query` and stream batches into `sink`.
66    fn export(&mut self, request: &ExportRequest<'_>, sink: &mut dyn BatchSink) -> Result<()>;
67
68    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>>;
69
70    /// Return `TypeMapping` for every column in `query` without fetching rows.
71    ///
72    /// Used by `rivet check --type-report` to show the full type provenance
73    /// (source native type → RivetType → Arrow type → fidelity) before export.
74    /// Implementations execute `SELECT * FROM (...) AS _q LIMIT 0` so only
75    /// server-side type metadata is transferred.
76    fn type_mappings(
77        &mut self,
78        query: &str,
79        column_overrides: &ColumnOverrides,
80    ) -> Result<Vec<TypeMapping>>;
81}
82
83pub fn create_source(config: &SourceConfig) -> Result<Box<dyn Source>> {
84    use crate::config::SourceType;
85    let url = config.resolve_url()?;
86    warn_if_tls_disabled(config);
87    match config.source_type {
88        SourceType::Postgres => Ok(Box::new(postgres::PostgresSource::connect_with_tls(
89            &url,
90            config.tls.as_ref(),
91        )?)),
92        SourceType::Mysql => Ok(Box::new(mysql::MysqlSource::connect_with_tls(
93            &url,
94            config.tls.as_ref(),
95        )?)),
96    }
97}
98
99/// One-time nudge to enable TLS when the current config connects in plaintext.
100/// Emitted at `warn` level so operators see it even at the default log level.
101/// `create_source` is called multiple times per run (plan/preflight/exec/chunk
102/// workers), so we gate the warning behind a `Once` to fire exactly once per
103/// process rather than 3-4 times in stderr.
104pub(crate) fn warn_if_tls_disabled(config: &SourceConfig) {
105    let enforced = config.tls.as_ref().is_some_and(|t| t.mode.is_enforced());
106    if !enforced {
107        static WARNED: std::sync::Once = std::sync::Once::new();
108        WARNED.call_once(|| {
109            log::warn!(
110                "source: TLS is not enforced — credentials and result rows cross the network in plaintext. \
111                 Add `source.tls.mode: verify-full` (with `ca_file:` if your CA is private) to enable transport security."
112            );
113        });
114    }
115}