rivet/source/mod.rs
1pub mod mysql;
2pub(crate) mod pg_numeric_wire;
3pub mod postgres;
4pub(crate) mod query;
5pub(crate) mod tls;
6
7use arrow::datatypes::SchemaRef;
8use arrow::record_batch::RecordBatch;
9
10use crate::config::SourceConfig;
11use crate::error::Result;
12use crate::plan::IncrementalCursorPlan;
13use crate::tuning::SourceTuning;
14use crate::types::{ColumnOverrides, CursorState, TypeMapping};
15
16/// Summary of a source table relevant to chunked-mode planning. Source-neutral
17/// shape so plan-build can ask either Postgres or MySQL for the same answer.
18///
19/// Populated by `crate::source::postgres::introspect_pg_table_for_chunking` and
20/// `crate::source::mysql::introspect_mysql_table_for_chunking`. Both helpers
21/// rely on catalog stats (`pg_class` / `information_schema.TABLES`) so the
22/// numbers are only as fresh as the last `ANALYZE` / autoanalyse.
23#[derive(Debug, Clone, Default)]
24pub(crate) struct TableIntrospection {
25 /// Name of the single integer-family PK column, if present and safe to
26 /// range-chunk. `None` when the table has no PK, has a composite PK, or
27 /// the PK type is not an integer family (text, uuid, decimal, …).
28 pub single_int_pk: Option<String>,
29 /// Best-effort row count: PG `reltuples`, MySQL `TABLE_ROWS`. `0` means
30 /// the table is empty or stats are unavailable.
31 pub row_estimate: i64,
32 /// Heap-size-per-row in bytes. `None` for empty / unanalysed tables.
33 /// Used to convert `chunk_size_memory_mb` into a row count.
34 pub avg_row_bytes: Option<i64>,
35}
36
37/// Receives schema and batches from a source, one at a time.
38pub trait BatchSink {
39 fn on_schema(&mut self, schema: SchemaRef) -> Result<()>;
40 fn on_batch(&mut self, batch: &RecordBatch) -> Result<()>;
41}
42
43/// Read-only inputs for a single export call.
44///
45/// Packs the parameters that used to live as 5 positional args on
46/// `Source::export` into a named struct. `sink` is **not** part of this struct
47/// — it is `&mut` and conceptually the output channel, separate from the
48/// read-only request configuration.
49pub struct ExportRequest<'a> {
50 /// Already-materialized SQL (after `resolve_query`). The driver still wraps
51 /// it with the dialect-specific incremental predicate via
52 /// [`crate::source::query::build_incremental_query`] when `incremental` is set.
53 pub query: &'a str,
54 pub incremental: Option<&'a IncrementalCursorPlan>,
55 pub cursor: Option<&'a CursorState>,
56 pub tuning: &'a SourceTuning,
57 /// Per-column type declarations from `rivet.yaml` (`exports[].columns:`).
58 /// Drivers apply them during schema building so e.g. a `NUMERIC` column
59 /// without declared precision can still be exported as `Decimal128(18,2)`
60 /// when the user has stated the type explicitly.
61 pub column_overrides: &'a ColumnOverrides,
62}
63
64pub trait Source: Send {
65 /// Execute `request.query` and stream batches into `sink`.
66 fn export(&mut self, request: &ExportRequest<'_>, sink: &mut dyn BatchSink) -> Result<()>;
67
68 fn query_scalar(&mut self, sql: &str) -> Result<Option<String>>;
69
70 /// Return `TypeMapping` for every column in `query` without fetching rows.
71 ///
72 /// Used by `rivet check --type-report` to show the full type provenance
73 /// (source native type → RivetType → Arrow type → fidelity) before export.
74 /// Implementations execute `SELECT * FROM (...) AS _q LIMIT 0` so only
75 /// server-side type metadata is transferred.
76 fn type_mappings(
77 &mut self,
78 query: &str,
79 column_overrides: &ColumnOverrides,
80 ) -> Result<Vec<TypeMapping>>;
81}
82
83pub fn create_source(config: &SourceConfig) -> Result<Box<dyn Source>> {
84 use crate::config::SourceType;
85 let url = config.resolve_url()?;
86 warn_if_tls_disabled(config);
87 match config.source_type {
88 SourceType::Postgres => Ok(Box::new(postgres::PostgresSource::connect_with_tls(
89 &url,
90 config.tls.as_ref(),
91 )?)),
92 SourceType::Mysql => Ok(Box::new(mysql::MysqlSource::connect_with_tls(
93 &url,
94 config.tls.as_ref(),
95 )?)),
96 }
97}
98
99/// One-time nudge to enable TLS when the current config connects in plaintext.
100/// Emitted at `warn` level so operators see it even at the default log level.
101/// `create_source` is called multiple times per run (plan/preflight/exec/chunk
102/// workers), so we gate the warning behind a `Once` to fire exactly once per
103/// process rather than 3-4 times in stderr.
104pub(crate) fn warn_if_tls_disabled(config: &SourceConfig) {
105 let enforced = config.tls.as_ref().is_some_and(|t| t.mode.is_enforced());
106 if !enforced {
107 static WARNED: std::sync::Once = std::sync::Once::new();
108 WARNED.call_once(|| {
109 log::warn!(
110 "source: TLS is not enforced — credentials and result rows cross the network in plaintext. \
111 Add `source.tls.mode: verify-full` (with `ca_file:` if your CA is private) to enable transport security."
112 );
113 });
114 }
115}