Skip to main content

rivet/source/
mod.rs

1pub(crate) mod batch_controller;
2pub mod mssql;
3pub mod mysql;
4pub(crate) mod pg_numeric_wire;
5pub mod postgres;
6pub(crate) mod query;
7pub(crate) mod tls;
8
9use arrow::datatypes::SchemaRef;
10use arrow::record_batch::RecordBatch;
11
12use crate::config::SourceConfig;
13use crate::error::Result;
14use crate::plan::IncrementalCursorPlan;
15use crate::tuning::SourceTuning;
16use crate::types::{ColumnOverrides, CursorState, TypeMapping};
17
18/// Summary of a source table relevant to chunked-mode planning. Source-neutral
19/// shape so plan-build can ask either Postgres or MySQL for the same answer.
20///
21/// Populated by `crate::source::postgres::introspect_pg_table_for_chunking` and
22/// `crate::source::mysql::introspect_mysql_table_for_chunking`. Both helpers
23/// rely on catalog stats (`pg_class` / `information_schema.TABLES`) so the
24/// numbers are only as fresh as the last `ANALYZE` / autoanalyse.
25///
26/// # Why this is a data-shape seam, not a trait
27///
28/// The two per-engine introspection functions have identical signatures
29/// (`fn(url, tls, qualified_table) -> Result<TableIntrospection>`) and return
30/// this shared struct. The parallel shape sometimes invites a refactor along
31/// the lines of `trait Introspector { fn introspect_table(...) }` with one
32/// impl per engine — that refactor adds ceremony without reducing duplication,
33/// because the *bodies* share nothing useful: PG queries `pg_class` /
34/// `pg_index` / `pg_attribute` / `pg_type` (PG-specific type names like
35/// `int2`/`int4`/`int8`) via the `postgres` client; MySQL queries
36/// `information_schema.TABLES` / `STATISTICS` with the InnoDB
37/// `AVG_ROW_LENGTH` overflow correction via the `mysql` client. No shared
38/// implementation logic exists to extract into trait-default methods. A
39/// trait would only rename where the engine match happens
40/// (`match config.source.source_type { … }` at the call site → factory
41/// returning `Box<dyn Introspector>`); the match doesn't disappear.
42///
43/// The seam therefore lives at the **data shape**: this struct is the
44/// shared contract, the two free functions are the adapters, the per-call
45/// dispatch is an `enum`-driven `match`. See ADR-0015 for the full
46/// rationale and the architecture-review walks that led here.
47#[derive(Debug, Clone, Default)]
48pub(crate) struct TableIntrospection {
49    /// Name of the single integer-family PK column, if present and safe to
50    /// range-chunk. `None` when the table has no PK, has a composite PK, or
51    /// the PK type is not an integer family (text, uuid, decimal, …).
52    pub single_int_pk: Option<String>,
53    /// Single-column, NOT NULL, **unique** index columns usable as a keyset
54    /// (seek) pagination key — PK first (any type), then other UNIQUE indexes
55    /// (OPT-4). Index-backed and unique by construction, so `ORDER BY key
56    /// LIMIT n` is a bounded index range scan (never a filesort) and
57    /// `WHERE key > last` never skips rows with a duplicate key. Empty when the
58    /// table has no such key.
59    pub keyset_keys: Vec<String>,
60    /// Best-effort row count: PG `reltuples`, MySQL `TABLE_ROWS`. `0` means
61    /// the table is empty or stats are unavailable.
62    pub row_estimate: i64,
63    /// Heap-size-per-row in bytes. `None` for empty / unanalysed tables.
64    /// Used to convert `chunk_size_memory_mb` into a row count.
65    pub avg_row_bytes: Option<i64>,
66}
67
68impl TableIntrospection {
69    /// The auto-selected keyset key: the first usable single-column unique
70    /// NOT NULL key (PK preferred). `None` when the table has none.
71    pub fn auto_keyset_key(&self) -> Option<&str> {
72        self.keyset_keys.first().map(String::as_str)
73    }
74
75    /// Whether `col` is a usable keyset key (single-column, unique, NOT NULL,
76    /// index-backed). Used to validate an explicit `chunk_by_key`.
77    pub fn is_usable_keyset_key(&self, col: &str) -> bool {
78        self.keyset_keys.iter().any(|k| k == col)
79    }
80}
81
82/// Receives schema and batches from a source, one at a time.
83pub trait BatchSink {
84    fn on_schema(&mut self, schema: SchemaRef) -> Result<()>;
85    fn on_batch(&mut self, batch: &RecordBatch) -> Result<()>;
86}
87
88/// Read-only inputs for a single export call.
89///
90/// Packs the parameters that used to live as 5 positional args on
91/// `Source::export` into a named struct. `sink` is **not** part of this struct
92/// — it is `&mut` and conceptually the output channel, separate from the
93/// read-only request configuration.
94pub struct ExportRequest<'a> {
95    /// Already-materialized SQL (after `resolve_query`). The driver still wraps
96    /// it with the dialect-specific incremental predicate via
97    /// [`crate::source::query::build_incremental_query`] when `incremental` is set.
98    pub query: &'a str,
99    /// The *unwrapped* base query to resolve catalog-dependent type hints from
100    /// (PostgreSQL `NUMERIC` precision/scale, which the wire protocol omits — the
101    /// driver parses the `FROM` clause and asks `pg_catalog`). Chunked, dense and
102    /// keyset runners wrap `query` in a `SELECT … FROM (<base>) …` subquery that
103    /// hides the source table from the catalog parser, so they pass the original
104    /// base query here. `None` ⇒ resolve from `query` (full/incremental, where it
105    /// is already the unwrapped form). Drivers that read precision from the wire
106    /// (MySQL) ignore this field.
107    pub catalog_hint_query: Option<&'a str>,
108    pub incremental: Option<&'a IncrementalCursorPlan>,
109    pub cursor: Option<&'a CursorState>,
110    pub tuning: &'a SourceTuning,
111    /// Per-column type declarations from `rivet.yaml` (`exports[].columns:`).
112    /// Drivers apply them during schema building so e.g. a `NUMERIC` column
113    /// without declared precision can still be exported as `Decimal128(18,2)`
114    /// when the user has stated the type explicitly.
115    pub column_overrides: &'a ColumnOverrides,
116    /// Keyset (seek) pagination page size (OPT-4). When `Some(n)` *and*
117    /// `incremental` carries the key plan, the driver builds one keyset page
118    /// (`WHERE key > cursor ORDER BY key LIMIT n`) instead of the unbounded
119    /// incremental/snapshot query. The keyset runner drives the outer loop.
120    pub page_limit: Option<usize>,
121}
122
123impl<'a> ExportRequest<'a> {
124    /// A request whose `query` is already the **unwrapped base** form, so
125    /// catalog type hints resolve directly from it. Use for snapshot,
126    /// incremental and keyset runners: the driver applies any incremental /
127    /// keyset predicate internally, so the source table stays visible to the
128    /// catalog parser and `catalog_hint_query` is `None`.
129    pub fn unwrapped(
130        query: &'a str,
131        tuning: &'a SourceTuning,
132        column_overrides: &'a ColumnOverrides,
133    ) -> Self {
134        Self {
135            query,
136            catalog_hint_query: None,
137            incremental: None,
138            cursor: None,
139            tuning,
140            column_overrides,
141            page_limit: None,
142        }
143    }
144
145    /// A request whose `query` is a `SELECT … FROM (<base>) …` **wrapper** that
146    /// hides the source table (chunked / dense / time-window). `base` — the
147    /// unwrapped query catalog hints resolve from — is a required argument, so a
148    /// wrapping runner cannot silently fall back to the table-hiding wrapper and
149    /// lose PG `NUMERIC` precision (the bug the catalog-hint fix / ADR-0020
150    /// closed). Drivers that read precision from the wire (MySQL) ignore it.
151    pub fn wrapped(
152        query: &'a str,
153        base: &'a str,
154        tuning: &'a SourceTuning,
155        column_overrides: &'a ColumnOverrides,
156    ) -> Self {
157        Self {
158            query,
159            catalog_hint_query: Some(base),
160            incremental: None,
161            cursor: None,
162            tuning,
163            column_overrides,
164            page_limit: None,
165        }
166    }
167
168    /// Attach the incremental cursor plan (the driver builds the `WHERE cursor >
169    /// ? ORDER BY` predicate). Pass-through `Option` so mode-polymorphic callers
170    /// can forward `strategy.incremental_plan()` directly.
171    pub fn with_incremental(mut self, plan: Option<&'a IncrementalCursorPlan>) -> Self {
172        self.incremental = plan;
173        self
174    }
175
176    /// Attach the last committed cursor value the next run resumes after.
177    pub fn with_cursor(mut self, cursor: Option<&'a CursorState>) -> Self {
178        self.cursor = cursor;
179        self
180    }
181
182    /// Set the keyset (seek) page size — one bounded `… WHERE key > cursor ORDER
183    /// BY key LIMIT n` page instead of the unbounded query.
184    pub fn with_page_limit(mut self, page_limit: usize) -> Self {
185        self.page_limit = Some(page_limit);
186        self
187    }
188}
189
190pub trait Source: Send {
191    /// Execute `request.query` and stream batches into `sink`.
192    fn export(&mut self, request: &ExportRequest<'_>, sink: &mut dyn BatchSink) -> Result<()>;
193
194    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>>;
195
196    /// Return `TypeMapping` for every column in `query` without fetching rows.
197    ///
198    /// Used by `rivet check --type-report` to show the full type provenance
199    /// (source native type → RivetType → Arrow type → fidelity) before export.
200    /// Implementations execute `SELECT * FROM (...) AS _q LIMIT 0` so only
201    /// server-side type metadata is transferred.
202    fn type_mappings(
203        &mut self,
204        query: &str,
205        column_overrides: &ColumnOverrides,
206    ) -> Result<Vec<TypeMapping>>;
207
208    /// Sample a monotonic source-pressure counter for the OPT-2 concurrency
209    /// governor (`pipeline::chunked::exec`).
210    ///
211    /// Higher = more pressure. The governor compares successive samples
212    /// (`cur > prev` ⇒ under pressure) — the same convention the adaptive
213    /// batch-size loop already uses. Returns `None` when the engine can't
214    /// cheaply sample a pressure proxy, in which case the governor holds
215    /// parallelism flat. Default: `None`.
216    fn sample_pressure(&mut self) -> Option<u64> {
217        None
218    }
219}
220
221pub fn create_source(config: &SourceConfig) -> Result<Box<dyn Source>> {
222    use crate::config::SourceType;
223    let url = config.resolve_url()?;
224    warn_if_tls_disabled(config);
225    match config.source_type {
226        SourceType::Postgres => Ok(Box::new(postgres::PostgresSource::connect_with_tls(
227            &url,
228            config.tls.as_ref(),
229        )?)),
230        SourceType::Mysql => Ok(Box::new(mysql::MysqlSource::connect_with_tls(
231            &url,
232            config.tls.as_ref(),
233        )?)),
234        SourceType::Mssql => Ok(Box::new(mssql::MssqlSource::connect_with_tls(
235            &url,
236            config.tls.as_ref(),
237        )?)),
238    }
239}
240
241/// One-time nudge to enable TLS when the current config connects in plaintext.
242/// Emitted at `warn` level so operators see it even at the default log level.
243/// `create_source` is called multiple times per run (plan/preflight/exec/chunk
244/// workers), so we gate the warning behind a `Once` to fire exactly once per
245/// process rather than 3-4 times in stderr.
246pub(crate) fn warn_if_tls_disabled(config: &SourceConfig) {
247    let enforced = config.tls.as_ref().is_some_and(|t| t.mode.is_enforced());
248    if !enforced {
249        static WARNED: std::sync::Once = std::sync::Once::new();
250        WARNED.call_once(|| {
251            log::warn!(
252                "source: TLS is not enforced — credentials and result rows cross the network in plaintext. \
253                 Add `source.tls.mode: verify-full` (with `ca_file:` if your CA is private) to enable transport security."
254            );
255        });
256    }
257}