Skip to main content

rivet/source/
mod.rs

1pub mod mysql;
2pub(crate) mod pg_numeric_wire;
3pub mod postgres;
4pub(crate) mod query;
5pub(crate) mod tls;
6
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9
10use crate::config::SourceConfig;
11use crate::error::Result;
12use crate::plan::IncrementalCursorPlan;
13use crate::tuning::SourceTuning;
14use crate::types::{ColumnOverrides, CursorState, TypeMapping};
15
16/// Summary of a source table relevant to chunked-mode planning. Source-neutral
17/// shape so plan-build can ask either Postgres or MySQL for the same answer.
18///
19/// Populated by `crate::source::postgres::introspect_pg_table_for_chunking` and
20/// `crate::source::mysql::introspect_mysql_table_for_chunking`. Both helpers
21/// rely on catalog stats (`pg_class` / `information_schema.TABLES`) so the
22/// numbers are only as fresh as the last `ANALYZE` / autoanalyse.
23///
24/// # Why this is a data-shape seam, not a trait
25///
26/// The two per-engine introspection functions have identical signatures
27/// (`fn(url, tls, qualified_table) -> Result<TableIntrospection>`) and return
28/// this shared struct. The parallel shape sometimes invites a refactor along
29/// the lines of `trait Introspector { fn introspect_table(...) }` with one
30/// impl per engine — that refactor adds ceremony without reducing duplication,
31/// because the *bodies* share nothing useful: PG queries `pg_class` /
32/// `pg_index` / `pg_attribute` / `pg_type` (PG-specific type names like
33/// `int2`/`int4`/`int8`) via the `postgres` client; MySQL queries
34/// `information_schema.TABLES` / `STATISTICS` with the InnoDB
35/// `AVG_ROW_LENGTH` overflow correction via the `mysql` client. No shared
36/// implementation logic exists to extract into trait-default methods. A
37/// trait would only rename where the engine match happens
38/// (`match config.source.source_type { … }` at the call site → factory
39/// returning `Box<dyn Introspector>`); the match doesn't disappear.
40///
41/// The seam therefore lives at the **data shape**: this struct is the
42/// shared contract, the two free functions are the adapters, the per-call
43/// dispatch is an `enum`-driven `match`. See ADR-0015 for the full
44/// rationale and the architecture-review walks that led here.
45#[derive(Debug, Clone, Default)]
46pub(crate) struct TableIntrospection {
47    /// Name of the single integer-family PK column, if present and safe to
48    /// range-chunk. `None` when the table has no PK, has a composite PK, or
49    /// the PK type is not an integer family (text, uuid, decimal, …).
50    pub single_int_pk: Option<String>,
51    /// Single-column, NOT NULL, **unique** index columns usable as a keyset
52    /// (seek) pagination key — PK first (any type), then other UNIQUE indexes
53    /// (OPT-4). Index-backed and unique by construction, so `ORDER BY key
54    /// LIMIT n` is a bounded index range scan (never a filesort) and
55    /// `WHERE key > last` never skips rows with a duplicate key. Empty when the
56    /// table has no such key.
57    pub keyset_keys: Vec<String>,
58    /// Best-effort row count: PG `reltuples`, MySQL `TABLE_ROWS`. `0` means
59    /// the table is empty or stats are unavailable.
60    pub row_estimate: i64,
61    /// Heap-size-per-row in bytes. `None` for empty / unanalysed tables.
62    /// Used to convert `chunk_size_memory_mb` into a row count.
63    pub avg_row_bytes: Option<i64>,
64}
65
66impl TableIntrospection {
67    /// The auto-selected keyset key: the first usable single-column unique
68    /// NOT NULL key (PK preferred). `None` when the table has none.
69    pub fn auto_keyset_key(&self) -> Option<&str> {
70        self.keyset_keys.first().map(String::as_str)
71    }
72
73    /// Whether `col` is a usable keyset key (single-column, unique, NOT NULL,
74    /// index-backed). Used to validate an explicit `chunk_by_key`.
75    pub fn is_usable_keyset_key(&self, col: &str) -> bool {
76        self.keyset_keys.iter().any(|k| k == col)
77    }
78}
79
80/// Receives schema and batches from a source, one at a time.
81pub trait BatchSink {
82    fn on_schema(&mut self, schema: SchemaRef) -> Result<()>;
83    fn on_batch(&mut self, batch: &RecordBatch) -> Result<()>;
84}
85
86/// Read-only inputs for a single export call.
87///
88/// Packs the parameters that used to live as 5 positional args on
89/// `Source::export` into a named struct. `sink` is **not** part of this struct
90/// — it is `&mut` and conceptually the output channel, separate from the
91/// read-only request configuration.
92pub struct ExportRequest<'a> {
93    /// Already-materialized SQL (after `resolve_query`). The driver still wraps
94    /// it with the dialect-specific incremental predicate via
95    /// [`crate::source::query::build_incremental_query`] when `incremental` is set.
96    pub query: &'a str,
97    pub incremental: Option<&'a IncrementalCursorPlan>,
98    pub cursor: Option<&'a CursorState>,
99    pub tuning: &'a SourceTuning,
100    /// Per-column type declarations from `rivet.yaml` (`exports[].columns:`).
101    /// Drivers apply them during schema building so e.g. a `NUMERIC` column
102    /// without declared precision can still be exported as `Decimal128(18,2)`
103    /// when the user has stated the type explicitly.
104    pub column_overrides: &'a ColumnOverrides,
105    /// Keyset (seek) pagination page size (OPT-4). When `Some(n)` *and*
106    /// `incremental` carries the key plan, the driver builds one keyset page
107    /// (`WHERE key > cursor ORDER BY key LIMIT n`) instead of the unbounded
108    /// incremental/snapshot query. The keyset runner drives the outer loop.
109    pub page_limit: Option<usize>,
110}
111
112pub trait Source: Send {
113    /// Execute `request.query` and stream batches into `sink`.
114    fn export(&mut self, request: &ExportRequest<'_>, sink: &mut dyn BatchSink) -> Result<()>;
115
116    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>>;
117
118    /// Return `TypeMapping` for every column in `query` without fetching rows.
119    ///
120    /// Used by `rivet check --type-report` to show the full type provenance
121    /// (source native type → RivetType → Arrow type → fidelity) before export.
122    /// Implementations execute `SELECT * FROM (...) AS _q LIMIT 0` so only
123    /// server-side type metadata is transferred.
124    fn type_mappings(
125        &mut self,
126        query: &str,
127        column_overrides: &ColumnOverrides,
128    ) -> Result<Vec<TypeMapping>>;
129
130    /// Sample a monotonic source-pressure counter for the OPT-2 concurrency
131    /// governor (`pipeline::chunked::exec`).
132    ///
133    /// Higher = more pressure. The governor compares successive samples
134    /// (`cur > prev` ⇒ under pressure) — the same convention the adaptive
135    /// batch-size loop already uses. Returns `None` when the engine can't
136    /// cheaply sample a pressure proxy, in which case the governor holds
137    /// parallelism flat. Default: `None`.
138    fn sample_pressure(&mut self) -> Option<u64> {
139        None
140    }
141}
142
143pub fn create_source(config: &SourceConfig) -> Result<Box<dyn Source>> {
144    use crate::config::SourceType;
145    let url = config.resolve_url()?;
146    warn_if_tls_disabled(config);
147    match config.source_type {
148        SourceType::Postgres => Ok(Box::new(postgres::PostgresSource::connect_with_tls(
149            &url,
150            config.tls.as_ref(),
151        )?)),
152        SourceType::Mysql => Ok(Box::new(mysql::MysqlSource::connect_with_tls(
153            &url,
154            config.tls.as_ref(),
155        )?)),
156    }
157}
158
159/// One-time nudge to enable TLS when the current config connects in plaintext.
160/// Emitted at `warn` level so operators see it even at the default log level.
161/// `create_source` is called multiple times per run (plan/preflight/exec/chunk
162/// workers), so we gate the warning behind a `Once` to fire exactly once per
163/// process rather than 3-4 times in stderr.
164pub(crate) fn warn_if_tls_disabled(config: &SourceConfig) {
165    let enforced = config.tls.as_ref().is_some_and(|t| t.mode.is_enforced());
166    if !enforced {
167        static WARNED: std::sync::Once = std::sync::Once::new();
168        WARNED.call_once(|| {
169            log::warn!(
170                "source: TLS is not enforced — credentials and result rows cross the network in plaintext. \
171                 Add `source.tls.mode: verify-full` (with `ca_file:` if your CA is private) to enable transport security."
172            );
173        });
174    }
175}