ferrule_sql/connection.rs
1use crate::error::SqlError;
2use crate::guard::SizeGuards;
3use crate::stream::{BoxRowStream, RowCursor};
4use crate::value::{ColumnInfo, Row};
5use async_trait::async_trait;
6use secrecy::SecretString;
7
8/// Backend-agnostic connection options.
9///
10/// `password` carries a credential the *caller* already resolved
11/// (env var, OS keyring, interactive prompt, or any host-supplied
12/// source). `ferrule-sql` performs no credential resolution itself —
13/// it has no dependency on `rpassword`, an OS keyring, or `dirs`. An
14/// embedded library consumer supplies the secret here rather than
15/// baking it into the connection URL.
16///
17/// When `password` is `Some`, it takes precedence over any password
18/// component embedded in the connection URL. When it is `None`, each
19/// backend falls back to the URL's password component, so callers
20/// that prefer to embed the password in the URL keep working.
21///
22/// The secret is wrapped in [`SecretString`] so it is redacted in
23/// `Debug` output and zeroized on drop.
24#[derive(Debug, Clone, Default)]
25pub struct ConnectOptions {
26 /// Disable TLS certificate verification. Emits a warning on stderr.
27 pub insecure: bool,
28 /// Caller-resolved credential. Takes precedence over the URL's
29 /// password component; `None` falls back to the URL.
30 pub password: Option<SecretString>,
31}
32
33impl ConnectOptions {
34 /// Resolve the effective password for this connection: the
35 /// caller-supplied [`ConnectOptions::password`] if present,
36 /// otherwise the password component of `url`.
37 ///
38 /// This is the single precedence rule every backend honors so the
39 /// "resolved secret wins, URL is the fallback" contract lives in
40 /// one place. The returned [`SecretString`] keeps the credential
41 /// redacted and zeroize-on-drop right up to the point each driver
42 /// consumes it via `expose_secret()`.
43 #[must_use]
44 pub fn effective_password(&self, url: &crate::url::DatabaseUrl) -> Option<SecretString> {
45 self.password.clone().or_else(|| url.password())
46 }
47}
48
49/// Result of a query — columns plus rows.
50#[derive(Debug, Clone)]
51pub struct QueryResult {
52 pub columns: Vec<ColumnInfo>,
53 pub rows: Vec<Row>,
54}
55
56/// Summary of a non-query execution (DML / DDL).
57#[derive(Debug, Clone)]
58pub struct ExecutionSummary {
59 pub rows_affected: Option<u64>,
60 pub command_tag: Option<String>,
61}
62
63/// A single statement within a multi-statement batch.
64#[derive(Debug, Clone)]
65pub enum StatementResult {
66 Query(QueryResult),
67 Summary(ExecutionSummary),
68}
69
70/// Payload for [`Connection::bulk_insert_rows`].
71///
72/// `table` is unquoted — each backend is responsible for quoting it
73/// for its own dialect. `columns` is the destination column order;
74/// each row in `rows` must have the same length and match positionally.
75///
76/// `copy_format` is consulted only by the Postgres backend and selects
77/// between `COPY … WITH (FORMAT TEXT)` and `COPY … WITH (FORMAT BINARY)`.
78/// All other backends ignore the field — their bulk paths use a
79/// protocol-native wire format (TDS, MySQL `LOAD DATA`, ODPI-C array
80/// DML).
81#[derive(Debug)]
82pub struct BulkInsert<'a> {
83 pub table: &'a str,
84 pub columns: &'a [ColumnInfo],
85 pub rows: &'a [Row],
86 pub copy_format: crate::copy::CopyFormat,
87}
88
89/// A foreign-key edge in a schema, returned by
90/// [`Connection::list_foreign_keys`]. The columns lists are aligned:
91/// `child_columns[i]` references `parent_columns[i]`.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct ForeignKey {
94 pub child_table: String,
95 pub child_columns: Vec<String>,
96 pub parent_table: String,
97 pub parent_columns: Vec<String>,
98 /// Informational only: e.g. `"CASCADE"`, `"SET NULL"`, `"NO ACTION"`.
99 /// `None` when the backend doesn't expose this (or it isn't set).
100 pub on_delete: Option<String>,
101}
102
103/// One schema / database / namespace returned by
104/// [`Connection::list_schemas`].
105///
106/// "Schema" is the cross-backend umbrella term: a PostgreSQL / MSSQL
107/// schema, a MySQL database, an Oracle user/owner, or a SQLite attached
108/// database. `name` is the identifier as the server reports it
109/// (unquoted). `is_default` marks the schema the connection currently
110/// resolves unqualified objects against (PostgreSQL `current_schema()` /
111/// `public`, MySQL `DATABASE()`, MSSQL `SCHEMA_NAME()` / `dbo`, Oracle
112/// `USER`, SQLite `main`) so a UI can pre-select it.
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct SchemaInfo {
115 /// The schema / database / owner name, unquoted, as the server reports it.
116 pub name: String,
117 /// `true` for the schema the connection resolves unqualified objects
118 /// against. At most one entry is flagged per connection; a backend
119 /// that cannot determine the current schema flags none.
120 pub is_default: bool,
121}
122
123/// Decode the `is_default` flag of a [`SchemaInfo`] from the second
124/// column of a backend introspection query.
125///
126/// Backends that go through the generic [`AsyncConnection::query`] path
127/// (MySQL, MSSQL, Oracle) return the `name = current_schema` comparison
128/// as a [`Value`](crate::value::Value): a `BIT`/boolean surfaces as
129/// `Value::Bool`, a `CASE ... THEN 1` surfaces as `Value::Int64`. A
130/// `NULL` comparison (e.g. MySQL `DATABASE()` is `NULL` when no database
131/// is selected) decodes to `false` rather than erroring.
132pub(crate) fn is_default_from_value(value: Option<&crate::value::Value>) -> bool {
133 match value {
134 Some(crate::value::Value::Bool(b)) => *b,
135 Some(crate::value::Value::Int64(n)) => *n != 0,
136 _ => false,
137 }
138}
139
140/// Internal async backend trait — implemented by every driver.
141///
142/// This is **not** part of `ferrule-sql`'s public surface: it is the
143/// async machinery the synchronous [`Connection`] trait is built on top
144/// of. Each backend (`postgres`, `mysql`, …) implements
145/// `AsyncConnection`; the public sync API is reached exclusively through
146/// [`Connection`], whose wrapper [`crate::sync::SyncConnection`] drives
147/// these futures to completion on a private current-thread runtime.
148///
149/// Embedders never name this trait. It is `pub(crate)` so the
150/// per-backend modules implement it in-crate while it stays absent from
151/// the public API — no `async fn` / `Future` is reachable through it
152/// from outside the crate.
153#[async_trait]
154pub(crate) trait AsyncConnection: Send {
155 /// Execute a statement that may not return rows (INSERT, UPDATE, CREATE, etc.).
156 async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError>;
157
158 /// Execute a SELECT-like query and return rows.
159 async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError>;
160
161 /// Open a native server-side cursor for `sql` and return the column
162 /// metadata plus a back-pressured stream of decoded rows.
163 ///
164 /// Unlike [`query`](Self::query) this does **not** buffer the result:
165 /// the returned [`BoxRowStream`] pulls rows from the driver's native
166 /// cursor (`tokio-postgres` `query_raw`, `mysql_async` `query_iter`,
167 /// `tiberius` `QueryStream`, or a `spawn_blocking` row-stepping
168 /// producer feeding a bounded channel for `rusqlite` / `oracle`) only
169 /// as the consumer advances it, so peak memory stays bounded. The
170 /// stream borrows `self` for the async backends, which is why the
171 /// public cursor API holds the connection for the cursor's lifetime.
172 async fn query_stream(
173 &mut self,
174 sql: &str,
175 ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError>;
176
177 /// Execute one or more statements.
178 ///
179 /// The default implementation tries `query()` first, then falls back to
180 /// `execute()` — i.e. single-statement only. Backends that natively
181 /// support multi-resultsets (Postgres, MSSQL) should override this.
182 async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
183 match self.query(sql).await {
184 Ok(result) => Ok(vec![StatementResult::Query(result)]),
185 Err(SqlError::QueryFailed(_)) => {
186 let summary = self.execute(sql).await?;
187 Ok(vec![StatementResult::Summary(summary)])
188 }
189 Err(e) => Err(e),
190 }
191 }
192
193 /// Test connectivity (ping / `SELECT 1`).
194 async fn ping(&mut self) -> Result<(), SqlError>;
195
196 /// List tables in the given schema (or default schema if `None`).
197 async fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError>;
198
199 /// List the schemas / databases / owners visible on this
200 /// connection, ordered by name. The entry whose `is_default` is
201 /// set is the one unqualified objects resolve against.
202 async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError>;
203
204 /// Describe the columns of a single table.
205 async fn describe_table(
206 &mut self,
207 schema: Option<&str>,
208 table: &str,
209 ) -> Result<QueryResult, SqlError>;
210
211 /// Return the column names of `table`'s primary key, in key
212 /// position order. Returns an empty `Vec` when the table has no
213 /// declared PK. The `schema` argument follows the same default-
214 /// schema convention as [`describe_table`](Self::describe_table).
215 ///
216 /// Implementations must not infer a PK from unique indexes — only
217 /// declared primary-key constraints. Callers that want to override
218 /// the conflict key supply it explicitly.
219 async fn primary_key(
220 &mut self,
221 schema: Option<&str>,
222 table: &str,
223 ) -> Result<Vec<String>, SqlError>;
224
225 /// Return every foreign-key edge in `schema` (or the default
226 /// schema if `None`). Used by schema-level copy to topologically
227 /// order tables — load parents before children.
228 ///
229 /// Backends without referential integrity (SQLite when
230 /// `foreign_keys=off`, MySQL on MyISAM, etc.) still return any
231 /// declared FKs; runtime enforcement is a separate concern.
232 async fn list_foreign_keys(
233 &mut self,
234 schema: Option<&str>,
235 ) -> Result<Vec<ForeignKey>, SqlError>;
236
237 /// Insert `target.rows` into `target.table` using the backend's
238 /// native bulk loader (Postgres `COPY FROM STDIN`, MSSQL
239 /// `BulkLoadRequest`, MySQL `LOAD DATA LOCAL INFILE`, Oracle
240 /// `oracle::Batch`). Returns the number of rows accepted.
241 ///
242 /// Backends that have no native bulk loader (SQLite, and the
243 /// proxy / tunnel wrappers in their current shape) must return
244 /// [`SqlError::BulkUnavailable`] so the caller can route the
245 /// batch through the generic INSERT path. Treat this method as
246 /// required — forgetting to implement it on a new backend or
247 /// wrapper is a bug we want to catch at compile time, not at
248 /// runtime in the "just slow" form.
249 async fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError>;
250}
251
252/// A synchronous, blocking database connection — `ferrule-sql`'s public
253/// connection API.
254///
255/// Every method **blocks the calling thread** until the database
256/// responds; there is no `async fn` / `Future` anywhere on this trait.
257/// Embedders that have no async runtime of their own (the motivating
258/// case: a single-threaded synchronous host) call these directly.
259///
260/// **Runtime model.** The async drivers (`tokio-postgres`,
261/// `mysql_async`, `tiberius`) are still used underneath, driven by a
262/// private current-thread `tokio` runtime that the concrete handle
263/// ([`crate::sync::SyncConnection`]) owns. That runtime is an
264/// implementation detail — it never surfaces in the public API. SQLite
265/// and Oracle are natively synchronous and call straight through.
266///
267/// **Memory model.** [`query`](Self::query) buffers the full result set
268/// in memory (the `Vec<Row>` inside [`QueryResult`]) but is bounded by
269/// the connection's [`SizeGuards`]: an oversized
270/// cell/row or a result past the total cap fails fast with a structured
271/// error instead of OOMing. For an unbounded result use
272/// [`query_cursor`](Self::query_cursor), which streams from a native
273/// database cursor at `O(batch)` memory and never buffers the whole
274/// result.
275///
276/// **Reentrancy.** Because the private runtime is current-thread, do not
277/// call these methods from inside another `block_on` on the same thread
278/// — that nests runtimes and panics. Async embedders must hop to a
279/// blocking thread (`tokio::task::spawn_blocking` or a dedicated OS
280/// thread) before using a `Connection`.
281pub trait Connection: Send {
282 /// Execute a statement that may not return rows (INSERT, UPDATE, CREATE, etc.).
283 /// Blocks until the statement completes.
284 fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError>;
285
286 /// Execute a SELECT-like query and return all rows. Blocks until the
287 /// full result set is buffered in memory.
288 fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError>;
289
290 /// Open a streaming read cursor for `sql`, returning a [`RowCursor`]
291 /// that pulls rows from a native database cursor at bounded memory.
292 ///
293 /// Use this — not [`query`](Self::query) — to ingest a large result
294 /// under a fixed memory budget: the cursor never materializes the
295 /// whole result, and it applies the connection's per-cell / per-row
296 /// [`SizeGuards`] as each row is decoded. The
297 /// returned cursor borrows the connection for its
298 /// lifetime (the async drivers' row stream is tied to the connection
299 /// handle), so the connection cannot be used for another statement
300 /// until the cursor is dropped. Blocks on each batch; see
301 /// [`RowCursor`] for the bounded-memory and reentrancy contract.
302 fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError>;
303
304 /// The size guards currently applied to this connection's reads
305 /// (per-cell, per-row, per-result byte caps). The default
306 /// implementation reports [`SizeGuards::default`]; the concrete
307 /// [`SyncConnection`](crate::sync::SyncConnection) tracks the real
308 /// configured value.
309 fn size_guards(&self) -> SizeGuards {
310 SizeGuards::default()
311 }
312
313 /// Install new size guards for subsequent reads. Lets a host raise or
314 /// lower the per-cell / per-row / per-result caps (e.g. the CLI
315 /// wiring its `[limits]` config). The default implementation is a
316 /// no-op for wrappers that hold no guard state; the concrete
317 /// [`SyncConnection`](crate::sync::SyncConnection) stores it.
318 fn set_size_guards(&mut self, guards: SizeGuards) {
319 let _ = guards;
320 }
321
322 /// Execute one or more statements, one result per statement. Backends
323 /// that natively support multi-resultsets (Postgres, MSSQL) split the
324 /// batch; others fall back to single-statement behavior. Blocks until
325 /// the batch completes.
326 fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError>;
327
328 /// Test connectivity (ping / `SELECT 1`). Blocks on a round-trip.
329 fn ping(&mut self) -> Result<(), SqlError>;
330
331 /// List tables in the given schema (or default schema if `None`).
332 fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError>;
333
334 /// List the schemas / databases / owners visible on this
335 /// connection, ordered by name. The entry whose `is_default` is
336 /// set is the schema unqualified objects resolve against (PG
337 /// `current_schema()`, MySQL `DATABASE()`, MSSQL `SCHEMA_NAME()`,
338 /// Oracle `USER`, SQLite `main`). Blocks on a round-trip.
339 fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError>;
340
341 /// Describe the columns of a single table.
342 fn describe_table(
343 &mut self,
344 schema: Option<&str>,
345 table: &str,
346 ) -> Result<QueryResult, SqlError>;
347
348 /// Return the column names of `table`'s primary key, in key position
349 /// order. Returns an empty `Vec` when the table has no declared PK.
350 /// Must not infer a PK from unique indexes — only declared
351 /// primary-key constraints.
352 fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError>;
353
354 /// Return every foreign-key edge in `schema` (or the default schema
355 /// if `None`). Used by schema-level copy to order tables — parents
356 /// before children.
357 fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError>;
358
359 /// Insert `target.rows` into `target.table` using the backend's
360 /// native bulk loader. Returns the number of rows accepted, or
361 /// [`SqlError::BulkUnavailable`] when the backend has no native loader
362 /// so the caller can fall back to the generic INSERT path. Blocks
363 /// until the whole batch has streamed to the server.
364 fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError>;
365}
366
367/// Forwarding impl so a boxed connection is itself a [`Connection`].
368///
369/// Lets call sites pass `&mut Box<dyn Connection>` where a `&mut dyn
370/// Connection` is expected (the cross-backend copy path and the test
371/// helpers both hold owned `Box<dyn Connection>` handles). Each method
372/// simply delegates to the boxed value.
373impl Connection for Box<dyn Connection> {
374 fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
375 (**self).execute(sql)
376 }
377 fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
378 (**self).query(sql)
379 }
380 fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError> {
381 (**self).query_cursor(sql)
382 }
383 fn size_guards(&self) -> SizeGuards {
384 (**self).size_guards()
385 }
386 fn set_size_guards(&mut self, guards: SizeGuards) {
387 (**self).set_size_guards(guards)
388 }
389 fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
390 (**self).execute_multi(sql)
391 }
392 fn ping(&mut self) -> Result<(), SqlError> {
393 (**self).ping()
394 }
395 fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
396 (**self).list_tables(schema)
397 }
398 fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
399 (**self).list_schemas()
400 }
401 fn describe_table(
402 &mut self,
403 schema: Option<&str>,
404 table: &str,
405 ) -> Result<QueryResult, SqlError> {
406 (**self).describe_table(schema, table)
407 }
408 fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError> {
409 (**self).primary_key(schema, table)
410 }
411 fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError> {
412 (**self).list_foreign_keys(schema)
413 }
414 fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
415 (**self).bulk_insert_rows(target)
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use crate::url::DatabaseUrl;
423 use secrecy::ExposeSecret;
424
425 /// A caller-resolved secret in `ConnectOptions::password` wins over
426 /// the password embedded in the URL — this is the precedence rule
427 /// that lets an embedder hand ferrule-sql a credential it resolved
428 /// (keyring, prompt) instead of baking it into the URL.
429 #[test]
430 fn effective_password_prefers_opts_over_url() {
431 let url = DatabaseUrl::parse("postgres://user:url_pw@localhost/db").unwrap();
432 let opts = ConnectOptions {
433 password: Some(SecretString::new("resolved_pw".into())),
434 ..Default::default()
435 };
436 let got = opts.effective_password(&url).expect("a password");
437 assert_eq!(got.expose_secret(), "resolved_pw");
438 }
439
440 /// With no caller-supplied secret, ferrule-sql falls back to the
441 /// URL's password component, so the URL-embedded path keeps working.
442 #[test]
443 fn effective_password_falls_back_to_url() {
444 let url = DatabaseUrl::parse("postgres://user:url_pw@localhost/db").unwrap();
445 let opts = ConnectOptions::default();
446 let got = opts.effective_password(&url).expect("a password");
447 assert_eq!(got.expose_secret(), "url_pw");
448 }
449
450 /// No secret anywhere yields `None` — backends then connect without
451 /// a password (e.g. trust/peer auth or a passwordless local socket).
452 #[test]
453 fn effective_password_none_when_absent_everywhere() {
454 let url = DatabaseUrl::parse("postgres://user@localhost/db").unwrap();
455 let opts = ConnectOptions::default();
456 assert!(opts.effective_password(&url).is_none());
457 }
458}