ferrule_sql/sync.rs
1//! Synchronous wrapper that turns an async backend driver into the
2//! public blocking [`Connection`] API.
3//!
4//! `ferrule-sql`'s drivers (`tokio-postgres`, `mysql_async`, `tiberius`)
5//! are async. Rather than expose that async surface to embedders — many
6//! of which run no runtime of their own — every connection handle owns
7//! one **private current-thread `tokio` runtime** and drives each driver
8//! future to completion with `block_on`. The runtime is created once at
9//! connect time and lives inside [`SyncConnection`] for the connection's
10//! lifetime, so the same runtime that spawned a driver's background I/O
11//! task (e.g. tokio-postgres' connection task) also polls it on every
12//! subsequent call. No `async fn` / `Future` crosses the public boundary.
13
14use crate::connection::{
15 AsyncConnection, BulkInsert, Connection, ExecutionSummary, ForeignKey, QueryResult, SchemaInfo,
16 StatementResult,
17};
18use crate::error::SqlError;
19use crate::guard::SizeGuards;
20use crate::stream::RowCursor;
21
22/// A blocking [`Connection`] backed by an async driver and a private
23/// current-thread runtime.
24///
25/// **Blocking model.** Every method calls `self.rt.block_on(...)` on the
26/// owned runtime, so it blocks the calling thread until the driver
27/// future resolves. **Memory model.** [`query`](Connection::query)
28/// buffers the result but is bounded by
29/// [`size_guards`](Connection::size_guards) — an oversized cell/row or a
30/// result past the total cap fails fast; [`query_cursor`](Connection::query_cursor)
31/// streams at bounded memory. **Reentrancy.** The runtime is
32/// current-thread; do not call from inside another `block_on` on the
33/// same thread (hop to a blocking thread first).
34pub struct SyncConnection {
35 /// The wrapped async connection. Declared **before** `rt` so that
36 /// Rust's declaration-order field drop tears this connection down —
37 /// together with any background I/O task it spawned on the runtime —
38 /// while the runtime is still alive, and only then drops `rt`. A
39 /// driver whose own `Drop` touches the runtime therefore stays sound;
40 /// today none do, so the ordering is defensive but deliberate.
41 inner: Box<dyn AsyncConnection>,
42 /// Per-cell / per-row / per-result byte ceilings applied to every
43 /// read (both the eager `query` and the streaming `query_cursor`).
44 /// Defaults to [`SizeGuards::default`]; override with
45 /// [`set_size_guards`](Connection::set_size_guards).
46 guards: SizeGuards,
47 /// The private current-thread `tokio` runtime that drives every
48 /// driver future via `block_on`. Declared **after** `inner` so it is
49 /// dropped last, outliving the connection it powers.
50 rt: tokio::runtime::Runtime,
51}
52
53impl SyncConnection {
54 /// Wrap an async connection plus the runtime that must drive it.
55 ///
56 /// The runtime passed here MUST be the same one used to establish
57 /// `inner` (and to spawn any driver background task), so that those
58 /// tasks keep being polled on later `block_on` calls.
59 #[must_use]
60 pub(crate) fn new(rt: tokio::runtime::Runtime, inner: Box<dyn AsyncConnection>) -> Self {
61 Self {
62 rt,
63 inner,
64 guards: SizeGuards::default(),
65 }
66 }
67}
68
69impl Connection for SyncConnection {
70 fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
71 let inner = &mut self.inner;
72 self.rt.block_on(inner.execute(sql))
73 }
74
75 /// Eager read, routed through the **native cursor** and collected
76 /// into a fully-materialized [`QueryResult`]. Building the eager
77 /// result on top of the streaming producer keeps a single decode
78 /// path shared with [`query_cursor`](Self::query_cursor); for the
79 /// network backends it also means the rows are pulled from the
80 /// server's cursor rather than pre-buffered by the driver.
81 fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
82 let inner = &mut self.inner;
83 let guards = self.guards;
84 self.rt.block_on(async move {
85 use futures_util::stream::StreamExt;
86 let (columns, mut stream) = inner.query_stream(sql).await?;
87 let mut rows = Vec::new();
88 let mut total: usize = 0;
89 let mut ordinal: u64 = 0;
90 while let Some(item) = stream.next().await {
91 let row = item?;
92 // Per-cell / per-row caps: fail fast before retaining the
93 // row. Total-buffer cap: bound the eager result so the
94 // CLI table path cannot collect an unbounded `Vec<Row>`.
95 guards.check_row(ordinal, &row, &columns)?;
96 if guards.caps_total() {
97 let row_bytes: usize = row.iter().map(crate::value::Value::byte_size).sum();
98 total = total.saturating_add(row_bytes);
99 if total > guards.max_total_buffered_bytes {
100 return Err(SqlError::BufferTooLarge {
101 rows_buffered: ordinal,
102 cap: guards.max_total_buffered_bytes,
103 });
104 }
105 }
106 ordinal += 1;
107 rows.push(row);
108 }
109 Ok(QueryResult { columns, rows })
110 })
111 }
112
113 fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError> {
114 // Split-borrow the disjoint fields: the stream borrows `inner`,
115 // the cursor drives it through `rt`. No self-referential storage
116 // and no `unsafe` — `RowCursor` holds both borrows for its
117 // lifetime, which is why it exclusively borrows the connection.
118 let rt = &self.rt;
119 let inner = &mut self.inner;
120 let guards = self.guards;
121 let (columns, stream) = rt.block_on(inner.query_stream(sql))?;
122 Ok(RowCursor::new(columns, rt, stream, guards))
123 }
124
125 fn size_guards(&self) -> SizeGuards {
126 self.guards
127 }
128
129 fn set_size_guards(&mut self, guards: SizeGuards) {
130 self.guards = guards;
131 }
132
133 fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
134 let inner = &mut self.inner;
135 self.rt.block_on(inner.execute_multi(sql))
136 }
137
138 fn ping(&mut self) -> Result<(), SqlError> {
139 let inner = &mut self.inner;
140 self.rt.block_on(inner.ping())
141 }
142
143 fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
144 let inner = &mut self.inner;
145 self.rt.block_on(inner.list_tables(schema))
146 }
147
148 fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
149 let inner = &mut self.inner;
150 self.rt.block_on(inner.list_schemas())
151 }
152
153 fn describe_table(
154 &mut self,
155 schema: Option<&str>,
156 table: &str,
157 ) -> Result<QueryResult, SqlError> {
158 let inner = &mut self.inner;
159 self.rt.block_on(inner.describe_table(schema, table))
160 }
161
162 fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError> {
163 let inner = &mut self.inner;
164 self.rt.block_on(inner.primary_key(schema, table))
165 }
166
167 fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError> {
168 let inner = &mut self.inner;
169 self.rt.block_on(inner.list_foreign_keys(schema))
170 }
171
172 fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
173 let inner = &mut self.inner;
174 self.rt.block_on(inner.bulk_insert_rows(target))
175 }
176}