Skip to main content

ferrule_sql/
connection.rs

1use crate::error::SqlError;
2use crate::guard::SizeGuards;
3use crate::stream::{BoxRowStream, RowCursor};
4use crate::value::{ColumnInfo, Row};
5use async_trait::async_trait;
6use secrecy::SecretString;
7
8/// Backend-agnostic connection options.
9///
10/// `password` carries a credential the *caller* already resolved
11/// (env var, OS keyring, interactive prompt, or any host-supplied
12/// source). `ferrule-sql` performs no credential resolution itself —
13/// it has no dependency on `rpassword`, an OS keyring, or `dirs`. An
14/// embedded library consumer supplies the secret here rather than
15/// baking it into the connection URL.
16///
17/// When `password` is `Some`, it takes precedence over any password
18/// component embedded in the connection URL. When it is `None`, each
19/// backend falls back to the URL's password component, so callers
20/// that prefer to embed the password in the URL keep working.
21///
22/// The secret is wrapped in [`SecretString`] so it is redacted in
23/// `Debug` output and zeroized on drop.
24#[derive(Debug, Clone, Default)]
25pub struct ConnectOptions {
26    /// Disable TLS certificate verification. Emits a warning on stderr.
27    pub insecure: bool,
28    /// Caller-resolved credential. Takes precedence over the URL's
29    /// password component; `None` falls back to the URL.
30    pub password: Option<SecretString>,
31}
32
33impl ConnectOptions {
34    /// Resolve the effective password for this connection: the
35    /// caller-supplied [`ConnectOptions::password`] if present,
36    /// otherwise the password component of `url`.
37    ///
38    /// This is the single precedence rule every backend honors so the
39    /// "resolved secret wins, URL is the fallback" contract lives in
40    /// one place. The returned [`SecretString`] keeps the credential
41    /// redacted and zeroize-on-drop right up to the point each driver
42    /// consumes it via `expose_secret()`.
43    #[must_use]
44    pub fn effective_password(&self, url: &crate::url::DatabaseUrl) -> Option<SecretString> {
45        self.password.clone().or_else(|| url.password())
46    }
47}
48
49/// Result of a query — columns plus rows.
50#[derive(Debug, Clone)]
51pub struct QueryResult {
52    pub columns: Vec<ColumnInfo>,
53    pub rows: Vec<Row>,
54}
55
56/// Summary of a non-query execution (DML / DDL).
57#[derive(Debug, Clone)]
58pub struct ExecutionSummary {
59    pub rows_affected: Option<u64>,
60    pub command_tag: Option<String>,
61}
62
63/// A single statement within a multi-statement batch.
64#[derive(Debug, Clone)]
65pub enum StatementResult {
66    Query(QueryResult),
67    Summary(ExecutionSummary),
68}
69
70/// Payload for [`Connection::bulk_insert_rows`].
71///
72/// `table` is unquoted — each backend is responsible for quoting it
73/// for its own dialect. `columns` is the destination column order;
74/// each row in `rows` must have the same length and match positionally.
75///
76/// `copy_format` is consulted only by the Postgres backend and selects
77/// between `COPY … WITH (FORMAT TEXT)` and `COPY … WITH (FORMAT BINARY)`.
78/// All other backends ignore the field — their bulk paths use a
79/// protocol-native wire format (TDS, MySQL `LOAD DATA`, ODPI-C array
80/// DML).
81#[derive(Debug)]
82pub struct BulkInsert<'a> {
83    pub table: &'a str,
84    pub columns: &'a [ColumnInfo],
85    pub rows: &'a [Row],
86    pub copy_format: crate::copy::CopyFormat,
87}
88
89/// A foreign-key edge in a schema, returned by
90/// [`Connection::list_foreign_keys`]. The columns lists are aligned:
91/// `child_columns[i]` references `parent_columns[i]`.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct ForeignKey {
94    pub child_table: String,
95    pub child_columns: Vec<String>,
96    pub parent_table: String,
97    pub parent_columns: Vec<String>,
98    /// Informational only: e.g. `"CASCADE"`, `"SET NULL"`, `"NO ACTION"`.
99    /// `None` when the backend doesn't expose this (or it isn't set).
100    pub on_delete: Option<String>,
101}
102
103/// One schema / database / namespace returned by
104/// [`Connection::list_schemas`].
105///
106/// "Schema" is the cross-backend umbrella term: a PostgreSQL / MSSQL
107/// schema, a MySQL database, an Oracle user/owner, or a SQLite attached
108/// database. `name` is the identifier as the server reports it
109/// (unquoted). `is_default` marks the schema the connection currently
110/// resolves unqualified objects against (PostgreSQL `current_schema()` /
111/// `public`, MySQL `DATABASE()`, MSSQL `SCHEMA_NAME()` / `dbo`, Oracle
112/// `USER`, SQLite `main`) so a UI can pre-select it.
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct SchemaInfo {
115    /// The schema / database / owner name, unquoted, as the server reports it.
116    pub name: String,
117    /// `true` for the schema the connection resolves unqualified objects
118    /// against. At most one entry is flagged per connection; a backend
119    /// that cannot determine the current schema flags none.
120    pub is_default: bool,
121}
122
123/// Decode the `is_default` flag of a [`SchemaInfo`] from the second
124/// column of a backend introspection query.
125///
126/// Backends that go through the generic [`AsyncConnection::query`] path
127/// (MySQL, MSSQL, Oracle) return the `name = current_schema` comparison
128/// as a [`Value`](crate::value::Value): a `BIT`/boolean surfaces as
129/// `Value::Bool`, a `CASE ... THEN 1` surfaces as `Value::Int64`. A
130/// `NULL` comparison (e.g. MySQL `DATABASE()` is `NULL` when no database
131/// is selected) decodes to `false` rather than erroring.
132pub(crate) fn is_default_from_value(value: Option<&crate::value::Value>) -> bool {
133    match value {
134        Some(crate::value::Value::Bool(b)) => *b,
135        Some(crate::value::Value::Int64(n)) => *n != 0,
136        _ => false,
137    }
138}
139
140/// Internal async backend trait — implemented by every driver.
141///
142/// This is **not** part of `ferrule-sql`'s public surface: it is the
143/// async machinery the synchronous [`Connection`] trait is built on top
144/// of. Each backend (`postgres`, `mysql`, …) implements
145/// `AsyncConnection`; the public sync API is reached exclusively through
146/// [`Connection`], whose wrapper [`crate::sync::SyncConnection`] drives
147/// these futures to completion on a private current-thread runtime.
148///
149/// Embedders never name this trait. It is `pub(crate)` so the
150/// per-backend modules implement it in-crate while it stays absent from
151/// the public API — no `async fn` / `Future` is reachable through it
152/// from outside the crate.
153#[async_trait]
154pub(crate) trait AsyncConnection: Send {
155    /// Execute a statement that may not return rows (INSERT, UPDATE, CREATE, etc.).
156    async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError>;
157
158    /// Execute a SELECT-like query and return rows.
159    async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError>;
160
161    /// Open a native server-side cursor for `sql` and return the column
162    /// metadata plus a back-pressured stream of decoded rows.
163    ///
164    /// Unlike [`query`](Self::query) this does **not** buffer the result:
165    /// the returned [`BoxRowStream`] pulls rows from the driver's native
166    /// cursor (`tokio-postgres` `query_raw`, `mysql_async` `query_iter`,
167    /// `tiberius` `QueryStream`, or a `spawn_blocking` row-stepping
168    /// producer feeding a bounded channel for `rusqlite` / `oracle`) only
169    /// as the consumer advances it, so peak memory stays bounded. The
170    /// stream borrows `self` for the async backends, which is why the
171    /// public cursor API holds the connection for the cursor's lifetime.
172    async fn query_stream(
173        &mut self,
174        sql: &str,
175    ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError>;
176
177    /// Execute one or more statements.
178    ///
179    /// The default implementation tries `query()` first, then falls back to
180    /// `execute()` — i.e. single-statement only. Backends that natively
181    /// support multi-resultsets (Postgres, MSSQL) should override this.
182    async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
183        match self.query(sql).await {
184            Ok(result) => Ok(vec![StatementResult::Query(result)]),
185            Err(SqlError::QueryFailed(_)) => {
186                let summary = self.execute(sql).await?;
187                Ok(vec![StatementResult::Summary(summary)])
188            }
189            Err(e) => Err(e),
190        }
191    }
192
193    /// Test connectivity (ping / `SELECT 1`).
194    async fn ping(&mut self) -> Result<(), SqlError>;
195
196    /// List tables in the given schema (or default schema if `None`).
197    async fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError>;
198
199    /// List the schemas / databases / owners visible on this
200    /// connection, ordered by name. The entry whose `is_default` is
201    /// set is the one unqualified objects resolve against.
202    async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError>;
203
204    /// Describe the columns of a single table.
205    async fn describe_table(
206        &mut self,
207        schema: Option<&str>,
208        table: &str,
209    ) -> Result<QueryResult, SqlError>;
210
211    /// Return the column names of `table`'s primary key, in key
212    /// position order. Returns an empty `Vec` when the table has no
213    /// declared PK. The `schema` argument follows the same default-
214    /// schema convention as [`describe_table`](Self::describe_table).
215    ///
216    /// Implementations must not infer a PK from unique indexes — only
217    /// declared primary-key constraints. Callers that want to override
218    /// the conflict key supply it explicitly.
219    async fn primary_key(
220        &mut self,
221        schema: Option<&str>,
222        table: &str,
223    ) -> Result<Vec<String>, SqlError>;
224
225    /// Return every foreign-key edge in `schema` (or the default
226    /// schema if `None`). Used by schema-level copy to topologically
227    /// order tables — load parents before children.
228    ///
229    /// Backends without referential integrity (SQLite when
230    /// `foreign_keys=off`, MySQL on MyISAM, etc.) still return any
231    /// declared FKs; runtime enforcement is a separate concern.
232    async fn list_foreign_keys(
233        &mut self,
234        schema: Option<&str>,
235    ) -> Result<Vec<ForeignKey>, SqlError>;
236
237    /// Insert `target.rows` into `target.table` using the backend's
238    /// native bulk loader (Postgres `COPY FROM STDIN`, MSSQL
239    /// `BulkLoadRequest`, MySQL `LOAD DATA LOCAL INFILE`, Oracle
240    /// `oracle::Batch`). Returns the number of rows accepted.
241    ///
242    /// Backends that have no native bulk loader (SQLite, and the
243    /// proxy / tunnel wrappers in their current shape) must return
244    /// [`SqlError::BulkUnavailable`] so the caller can route the
245    /// batch through the generic INSERT path. Treat this method as
246    /// required — forgetting to implement it on a new backend or
247    /// wrapper is a bug we want to catch at compile time, not at
248    /// runtime in the "just slow" form.
249    async fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError>;
250}
251
252/// A synchronous, blocking database connection — `ferrule-sql`'s public
253/// connection API.
254///
255/// Every method **blocks the calling thread** until the database
256/// responds; there is no `async fn` / `Future` anywhere on this trait.
257/// Embedders that have no async runtime of their own (the motivating
258/// case: a single-threaded synchronous host) call these directly.
259///
260/// **Runtime model.** The async drivers (`tokio-postgres`,
261/// `mysql_async`, `tiberius`) are still used underneath, driven by a
262/// private current-thread `tokio` runtime that the concrete handle
263/// ([`crate::sync::SyncConnection`]) owns. That runtime is an
264/// implementation detail — it never surfaces in the public API. SQLite
265/// and Oracle are natively synchronous and call straight through.
266///
267/// **Memory model.** [`query`](Self::query) buffers the full result set
268/// in memory (the `Vec<Row>` inside [`QueryResult`]) but is bounded by
269/// the connection's [`SizeGuards`]: an oversized
270/// cell/row or a result past the total cap fails fast with a structured
271/// error instead of OOMing. For an unbounded result use
272/// [`query_cursor`](Self::query_cursor), which streams from a native
273/// database cursor at `O(batch)` memory and never buffers the whole
274/// result.
275///
276/// **Reentrancy.** Because the private runtime is current-thread, do not
277/// call these methods from inside another `block_on` on the same thread
278/// — that nests runtimes and panics. Async embedders must hop to a
279/// blocking thread (`tokio::task::spawn_blocking` or a dedicated OS
280/// thread) before using a `Connection`.
281pub trait Connection: Send {
282    /// Execute a statement that may not return rows (INSERT, UPDATE, CREATE, etc.).
283    /// Blocks until the statement completes.
284    fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError>;
285
286    /// Execute a SELECT-like query and return all rows. Blocks until the
287    /// full result set is buffered in memory.
288    fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError>;
289
290    /// Open a streaming read cursor for `sql`, returning a [`RowCursor`]
291    /// that pulls rows from a native database cursor at bounded memory.
292    ///
293    /// Use this — not [`query`](Self::query) — to ingest a large result
294    /// under a fixed memory budget: the cursor never materializes the
295    /// whole result, and it applies the connection's per-cell / per-row
296    /// [`SizeGuards`] as each row is decoded. The
297    /// returned cursor borrows the connection for its
298    /// lifetime (the async drivers' row stream is tied to the connection
299    /// handle), so the connection cannot be used for another statement
300    /// until the cursor is dropped. Blocks on each batch; see
301    /// [`RowCursor`] for the bounded-memory and reentrancy contract.
302    fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError>;
303
304    /// The size guards currently applied to this connection's reads
305    /// (per-cell, per-row, per-result byte caps). The default
306    /// implementation reports [`SizeGuards::default`]; the concrete
307    /// [`SyncConnection`](crate::sync::SyncConnection) tracks the real
308    /// configured value.
309    fn size_guards(&self) -> SizeGuards {
310        SizeGuards::default()
311    }
312
313    /// Install new size guards for subsequent reads. Lets a host raise or
314    /// lower the per-cell / per-row / per-result caps (e.g. the CLI
315    /// wiring its `[limits]` config). The default implementation is a
316    /// no-op for wrappers that hold no guard state; the concrete
317    /// [`SyncConnection`](crate::sync::SyncConnection) stores it.
318    fn set_size_guards(&mut self, guards: SizeGuards) {
319        let _ = guards;
320    }
321
322    /// Execute one or more statements, one result per statement. Backends
323    /// that natively support multi-resultsets (Postgres, MSSQL) split the
324    /// batch; others fall back to single-statement behavior. Blocks until
325    /// the batch completes.
326    fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError>;
327
328    /// Test connectivity (ping / `SELECT 1`). Blocks on a round-trip.
329    fn ping(&mut self) -> Result<(), SqlError>;
330
331    /// List tables in the given schema (or default schema if `None`).
332    fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError>;
333
334    /// List the schemas / databases / owners visible on this
335    /// connection, ordered by name. The entry whose `is_default` is
336    /// set is the schema unqualified objects resolve against (PG
337    /// `current_schema()`, MySQL `DATABASE()`, MSSQL `SCHEMA_NAME()`,
338    /// Oracle `USER`, SQLite `main`). Blocks on a round-trip.
339    fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError>;
340
341    /// Describe the columns of a single table.
342    fn describe_table(
343        &mut self,
344        schema: Option<&str>,
345        table: &str,
346    ) -> Result<QueryResult, SqlError>;
347
348    /// Return the column names of `table`'s primary key, in key position
349    /// order. Returns an empty `Vec` when the table has no declared PK.
350    /// Must not infer a PK from unique indexes — only declared
351    /// primary-key constraints.
352    fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError>;
353
354    /// Return every foreign-key edge in `schema` (or the default schema
355    /// if `None`). Used by schema-level copy to order tables — parents
356    /// before children.
357    fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError>;
358
359    /// Insert `target.rows` into `target.table` using the backend's
360    /// native bulk loader. Returns the number of rows accepted, or
361    /// [`SqlError::BulkUnavailable`] when the backend has no native loader
362    /// so the caller can fall back to the generic INSERT path. Blocks
363    /// until the whole batch has streamed to the server.
364    fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError>;
365}
366
367/// Forwarding impl so a boxed connection is itself a [`Connection`].
368///
369/// Lets call sites pass `&mut Box<dyn Connection>` where a `&mut dyn
370/// Connection` is expected (the cross-backend copy path and the test
371/// helpers both hold owned `Box<dyn Connection>` handles). Each method
372/// simply delegates to the boxed value.
373impl Connection for Box<dyn Connection> {
374    fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
375        (**self).execute(sql)
376    }
377    fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
378        (**self).query(sql)
379    }
380    fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError> {
381        (**self).query_cursor(sql)
382    }
383    fn size_guards(&self) -> SizeGuards {
384        (**self).size_guards()
385    }
386    fn set_size_guards(&mut self, guards: SizeGuards) {
387        (**self).set_size_guards(guards)
388    }
389    fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
390        (**self).execute_multi(sql)
391    }
392    fn ping(&mut self) -> Result<(), SqlError> {
393        (**self).ping()
394    }
395    fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
396        (**self).list_tables(schema)
397    }
398    fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
399        (**self).list_schemas()
400    }
401    fn describe_table(
402        &mut self,
403        schema: Option<&str>,
404        table: &str,
405    ) -> Result<QueryResult, SqlError> {
406        (**self).describe_table(schema, table)
407    }
408    fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError> {
409        (**self).primary_key(schema, table)
410    }
411    fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError> {
412        (**self).list_foreign_keys(schema)
413    }
414    fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
415        (**self).bulk_insert_rows(target)
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use crate::url::DatabaseUrl;
423    use secrecy::ExposeSecret;
424
425    /// A caller-resolved secret in `ConnectOptions::password` wins over
426    /// the password embedded in the URL — this is the precedence rule
427    /// that lets an embedder hand ferrule-sql a credential it resolved
428    /// (keyring, prompt) instead of baking it into the URL.
429    #[test]
430    fn effective_password_prefers_opts_over_url() {
431        let url = DatabaseUrl::parse("postgres://user:url_pw@localhost/db").unwrap();
432        let opts = ConnectOptions {
433            password: Some(SecretString::new("resolved_pw".into())),
434            ..Default::default()
435        };
436        let got = opts.effective_password(&url).expect("a password");
437        assert_eq!(got.expose_secret(), "resolved_pw");
438    }
439
440    /// With no caller-supplied secret, ferrule-sql falls back to the
441    /// URL's password component, so the URL-embedded path keeps working.
442    #[test]
443    fn effective_password_falls_back_to_url() {
444        let url = DatabaseUrl::parse("postgres://user:url_pw@localhost/db").unwrap();
445        let opts = ConnectOptions::default();
446        let got = opts.effective_password(&url).expect("a password");
447        assert_eq!(got.expose_secret(), "url_pw");
448    }
449
450    /// No secret anywhere yields `None` — backends then connect without
451    /// a password (e.g. trust/peer auth or a passwordless local socket).
452    #[test]
453    fn effective_password_none_when_absent_everywhere() {
454        let url = DatabaseUrl::parse("postgres://user@localhost/db").unwrap();
455        let opts = ConnectOptions::default();
456        assert!(opts.effective_password(&url).is_none());
457    }
458}