Skip to main content

ferrule_sql/
sync.rs

1//! Synchronous wrapper that turns an async backend driver into the
2//! public blocking [`Connection`] API.
3//!
4//! `ferrule-sql`'s drivers (`tokio-postgres`, `mysql_async`, `tiberius`)
5//! are async. Rather than expose that async surface to embedders — many
6//! of which run no runtime of their own — every connection handle owns
7//! one **private current-thread `tokio` runtime** and drives each driver
8//! future to completion with `block_on`. The runtime is created once at
9//! connect time and lives inside [`SyncConnection`] for the connection's
10//! lifetime, so the same runtime that spawned a driver's background I/O
11//! task (e.g. tokio-postgres' connection task) also polls it on every
12//! subsequent call. No `async fn` / `Future` crosses the public boundary.
13
14use crate::connection::{
15    AsyncConnection, BulkInsert, Connection, ExecutionSummary, ForeignKey, QueryResult, SchemaInfo,
16    StatementResult,
17};
18use crate::error::SqlError;
19use crate::guard::SizeGuards;
20use crate::stream::RowCursor;
21
22/// A blocking [`Connection`] backed by an async driver and a private
23/// current-thread runtime.
24///
25/// **Blocking model.** Every method calls `self.rt.block_on(...)` on the
26/// owned runtime, so it blocks the calling thread until the driver
27/// future resolves. **Memory model.** [`query`](Connection::query)
28/// buffers the result but is bounded by
29/// [`size_guards`](Connection::size_guards) — an oversized cell/row or a
30/// result past the total cap fails fast; [`query_cursor`](Connection::query_cursor)
31/// streams at bounded memory. **Reentrancy.** The runtime is
32/// current-thread; do not call from inside another `block_on` on the
33/// same thread (hop to a blocking thread first).
34pub struct SyncConnection {
35    /// The wrapped async connection. Declared **before** `rt` so that
36    /// Rust's declaration-order field drop tears this connection down —
37    /// together with any background I/O task it spawned on the runtime —
38    /// while the runtime is still alive, and only then drops `rt`. A
39    /// driver whose own `Drop` touches the runtime therefore stays sound;
40    /// today none do, so the ordering is defensive but deliberate.
41    inner: Box<dyn AsyncConnection>,
42    /// Per-cell / per-row / per-result byte ceilings applied to every
43    /// read (both the eager `query` and the streaming `query_cursor`).
44    /// Defaults to [`SizeGuards::default`]; override with
45    /// [`set_size_guards`](Connection::set_size_guards).
46    guards: SizeGuards,
47    /// The private current-thread `tokio` runtime that drives every
48    /// driver future via `block_on`. Declared **after** `inner` so it is
49    /// dropped last, outliving the connection it powers.
50    rt: tokio::runtime::Runtime,
51}
52
53impl SyncConnection {
54    /// Wrap an async connection plus the runtime that must drive it.
55    ///
56    /// The runtime passed here MUST be the same one used to establish
57    /// `inner` (and to spawn any driver background task), so that those
58    /// tasks keep being polled on later `block_on` calls.
59    #[must_use]
60    pub(crate) fn new(rt: tokio::runtime::Runtime, inner: Box<dyn AsyncConnection>) -> Self {
61        Self {
62            rt,
63            inner,
64            guards: SizeGuards::default(),
65        }
66    }
67}
68
69impl Connection for SyncConnection {
70    fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
71        let inner = &mut self.inner;
72        self.rt.block_on(inner.execute(sql))
73    }
74
75    /// Eager read, routed through the **native cursor** and collected
76    /// into a fully-materialized [`QueryResult`]. Building the eager
77    /// result on top of the streaming producer keeps a single decode
78    /// path shared with [`query_cursor`](Self::query_cursor); for the
79    /// network backends it also means the rows are pulled from the
80    /// server's cursor rather than pre-buffered by the driver.
81    fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
82        let inner = &mut self.inner;
83        let guards = self.guards;
84        self.rt.block_on(async move {
85            use futures_util::stream::StreamExt;
86            let (columns, mut stream) = inner.query_stream(sql).await?;
87            let mut rows = Vec::new();
88            let mut total: usize = 0;
89            let mut ordinal: u64 = 0;
90            while let Some(item) = stream.next().await {
91                let row = item?;
92                // Per-cell / per-row caps: fail fast before retaining the
93                // row. Total-buffer cap: bound the eager result so the
94                // CLI table path cannot collect an unbounded `Vec<Row>`.
95                guards.check_row(ordinal, &row, &columns)?;
96                if guards.caps_total() {
97                    let row_bytes: usize = row.iter().map(crate::value::Value::byte_size).sum();
98                    total = total.saturating_add(row_bytes);
99                    if total > guards.max_total_buffered_bytes {
100                        return Err(SqlError::BufferTooLarge {
101                            rows_buffered: ordinal,
102                            cap: guards.max_total_buffered_bytes,
103                        });
104                    }
105                }
106                ordinal += 1;
107                rows.push(row);
108            }
109            Ok(QueryResult { columns, rows })
110        })
111    }
112
113    fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError> {
114        // Split-borrow the disjoint fields: the stream borrows `inner`,
115        // the cursor drives it through `rt`. No self-referential storage
116        // and no `unsafe` — `RowCursor` holds both borrows for its
117        // lifetime, which is why it exclusively borrows the connection.
118        let rt = &self.rt;
119        let inner = &mut self.inner;
120        let guards = self.guards;
121        let (columns, stream) = rt.block_on(inner.query_stream(sql))?;
122        Ok(RowCursor::new(columns, rt, stream, guards))
123    }
124
125    fn size_guards(&self) -> SizeGuards {
126        self.guards
127    }
128
129    fn set_size_guards(&mut self, guards: SizeGuards) {
130        self.guards = guards;
131    }
132
133    fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
134        let inner = &mut self.inner;
135        self.rt.block_on(inner.execute_multi(sql))
136    }
137
138    fn ping(&mut self) -> Result<(), SqlError> {
139        let inner = &mut self.inner;
140        self.rt.block_on(inner.ping())
141    }
142
143    fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
144        let inner = &mut self.inner;
145        self.rt.block_on(inner.list_tables(schema))
146    }
147
148    fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
149        let inner = &mut self.inner;
150        self.rt.block_on(inner.list_schemas())
151    }
152
153    fn describe_table(
154        &mut self,
155        schema: Option<&str>,
156        table: &str,
157    ) -> Result<QueryResult, SqlError> {
158        let inner = &mut self.inner;
159        self.rt.block_on(inner.describe_table(schema, table))
160    }
161
162    fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError> {
163        let inner = &mut self.inner;
164        self.rt.block_on(inner.primary_key(schema, table))
165    }
166
167    fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError> {
168        let inner = &mut self.inner;
169        self.rt.block_on(inner.list_foreign_keys(schema))
170    }
171
172    fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
173        let inner = &mut self.inner;
174        self.rt.block_on(inner.bulk_insert_rows(target))
175    }
176}