ferrule_sql/stream.rs
1//! Bounded-memory streaming reads — the native-cursor counterpart to
2//! the eager [`Connection::query`](crate::Connection::query).
3//!
4//! `query` materializes the whole result into a `Vec<Row>`, which is
5//! fine for the CLI table view but unusable for an embedder ingesting a
6//! multi-million-row table under a fixed memory budget. [`RowCursor`]
7//! pulls rows from a **native driver cursor** one bounded batch at a
8//! time, so peak resident memory is `O(batch + cap)` rather than
9//! `O(total rows)`.
10//!
11//! **Bounded-memory model.** Every backend drives a row producer that is
12//! back-pressured: the async network drivers (`tokio-postgres`
13//! `query_raw`, `mysql_async` `query_iter`, `tiberius` `QueryStream`)
14//! expose a native row stream that only fetches more from the server as
15//! the consumer pulls; the synchronous drivers (`rusqlite`, ODPI-C
16//! `oracle`) run their row-stepping loop on a `spawn_blocking` thread
17//! that feeds a **bounded `tokio::sync::mpsc` channel** — the producer
18//! blocks on `blocking_send` once `cap` rows are in flight, so at most
19//! `cap` rows are ever buffered ahead of the consumer. Either way no
20//! code path collects the full result.
21//!
22//! **Blocking model.** [`RowCursor`] borrows its connection's private
23//! current-thread runtime; each [`next_batch`](RowCursor::next_batch) /
24//! [`Iterator::next`] call drives the producer with `block_on`, blocking
25//! the calling thread until the next row(s) arrive. The same
26//! current-thread reentrancy rule as [`Connection`](crate::Connection)
27//! applies — never pull from a cursor inside another `block_on` on the
28//! same thread.
29//!
30//! **Size guards.** Each decoded row is checked against the connection's
31//! per-cell / per-row [`SizeGuards`] before it is
32//! retained, so a pathological cell fails fast with a structured error
33//! rather than OOMing a streaming ingest.
34
35use crate::error::SqlError;
36use crate::guard::SizeGuards;
37use crate::value::{ColumnInfo, Row};
38use futures_util::stream::{Stream, StreamExt};
39use std::pin::Pin;
40
41/// Default number of rows a streaming cursor keeps in flight between the
42/// producer and the consumer. Caps the bounded channel for the
43/// synchronous (`spawn_blocking`) backends and the default
44/// [`next_batch`](RowCursor::next_batch) chunk; the async backends are
45/// additionally bounded by the driver's own server-side fetch window.
46pub const DEFAULT_CURSOR_CAPACITY: usize = 1024;
47
48/// A boxed, backend-erased stream of decoded rows.
49///
50/// Each backend's `query_stream` builds one of these from its native
51/// cursor. The `'a` lifetime ties an async backend's stream to the
52/// `&mut` borrow of its driver handle (so no self-referential storage /
53/// `unsafe` is needed); the synchronous backends produce a `'static`
54/// stream backed by their bounded channel.
55pub type BoxRowStream<'a> = Pin<Box<dyn Stream<Item = Result<Row, SqlError>> + Send + 'a>>;
56
57/// A streaming read handle over a native database cursor.
58///
59/// Hand back by [`Connection::query_cursor`](crate::Connection::query_cursor).
60/// Pull rows with [`next_batch`](Self::next_batch) (bounded chunk) or by
61/// iterating ([`Iterator`] yields one `Result<Row, SqlError>` at a
62/// time). See the [module docs](self) for the bounded-memory and
63/// blocking contracts.
64pub struct RowCursor<'a> {
65 columns: Vec<ColumnInfo>,
66 /// The private runtime that drives the producer. Borrowed from the
67 /// owning connection; `block_on` on it advances the stream.
68 rt: &'a tokio::runtime::Runtime,
69 stream: BoxRowStream<'a>,
70 /// Per-cell / per-row size ceilings applied to every decoded row.
71 /// The streaming cursor does not apply the *total*-buffer cap (it is
72 /// bounded by design), only the per-cell and per-row caps.
73 guards: SizeGuards,
74 /// 0-based ordinal of the next row to be produced — threaded into the
75 /// [`SizeGuards`] diagnostics so an error names the real row.
76 row_ordinal: u64,
77 /// Set once the underlying stream has yielded `None`, so subsequent
78 /// pulls short-circuit without re-polling an exhausted stream.
79 exhausted: bool,
80}
81
82impl<'a> RowCursor<'a> {
83 /// Construct a cursor from a backend stream plus the runtime that
84 /// drives it. Internal — backends return their stream and the public
85 /// `query_cursor` wires in the runtime.
86 pub(crate) fn new(
87 columns: Vec<ColumnInfo>,
88 rt: &'a tokio::runtime::Runtime,
89 stream: BoxRowStream<'a>,
90 guards: SizeGuards,
91 ) -> Self {
92 Self {
93 columns,
94 rt,
95 stream,
96 guards,
97 row_ordinal: 0,
98 exhausted: false,
99 }
100 }
101
102 /// The column metadata for the streamed result, available before any
103 /// row is pulled.
104 #[must_use]
105 pub fn columns(&self) -> &[ColumnInfo] {
106 &self.columns
107 }
108
109 /// Pull up to `n` rows, blocking until they arrive or the result is
110 /// exhausted. Returns fewer than `n` (possibly empty) at end of
111 /// stream. This call's transient allocation is bounded by `n` rows
112 /// plus the producer's in-flight window — never the full result.
113 ///
114 /// `n` of `0` returns an empty `Vec` without touching the producer.
115 /// The first driver error is returned immediately; callers should
116 /// stop on `Err`.
117 pub fn next_batch(&mut self, n: usize) -> Result<Vec<Row>, SqlError> {
118 if n == 0 || self.exhausted {
119 return Ok(Vec::new());
120 }
121 // Drive the producer cooperatively on the owned runtime. The
122 // closure pulls at most `n` rows and returns the owned batch, so
123 // `block_on` returns as soon as that batch is assembled — the
124 // producer is never run ahead of the consumer beyond its own
125 // bounded window.
126 let columns = &self.columns;
127 let guards = &self.guards;
128 let stream = &mut self.stream;
129 let start_ordinal = self.row_ordinal;
130 let cap = n.min(DEFAULT_CURSOR_CAPACITY);
131 let result: Result<(Vec<Row>, bool), SqlError> = self.rt.block_on(async move {
132 let mut out = Vec::with_capacity(cap);
133 let mut ordinal = start_ordinal;
134 for _ in 0..n {
135 match stream.next().await {
136 Some(Ok(row)) => {
137 // Size-check before retaining the row, so an
138 // oversized cell fails fast instead of buffering.
139 guards.check_row(ordinal, &row, columns)?;
140 ordinal += 1;
141 out.push(row);
142 }
143 Some(Err(e)) => return Err(e),
144 None => return Ok((out, true)),
145 }
146 }
147 Ok((out, false))
148 });
149 match result {
150 Ok((out, reached_end)) => {
151 self.row_ordinal += out.len() as u64;
152 self.exhausted = reached_end;
153 Ok(out)
154 }
155 Err(e) => {
156 // A driver error abandons the batch; mark exhausted so a
157 // careless re-pull does not double-drive a half-consumed
158 // async stream.
159 self.exhausted = true;
160 Err(e)
161 }
162 }
163 }
164}
165
166impl Iterator for RowCursor<'_> {
167 type Item = Result<Row, SqlError>;
168
169 /// Yield the next row, blocking until it arrives. `None` marks
170 /// end-of-stream; an `Err` item marks a driver failure (after which
171 /// iteration stops).
172 fn next(&mut self) -> Option<Self::Item> {
173 if self.exhausted {
174 return None;
175 }
176 match self.next_batch(1) {
177 Ok(mut rows) => rows.pop().map(Ok),
178 Err(e) => Some(Err(e)),
179 }
180 }
181}
182
183/// Build a `'static` [`BoxRowStream`] from a bounded `tokio::sync::mpsc`
184/// receiver. Used by the synchronous backends (`rusqlite`, `oracle`)
185/// whose `spawn_blocking` producer sends decoded rows through the
186/// channel; the channel's bound is the in-flight cap.
187pub(crate) fn channel_stream(
188 rx: tokio::sync::mpsc::Receiver<Result<Row, SqlError>>,
189) -> BoxRowStream<'static> {
190 Box::pin(tokio_stream_from_channel(rx))
191}
192
193/// Adapt a bounded mpsc receiver into a [`Stream`]. Kept as a free
194/// function (rather than pulling in `tokio-stream`) so the dependency
195/// surface stays minimal — `futures_util` is already a backend dep.
196fn tokio_stream_from_channel(
197 mut rx: tokio::sync::mpsc::Receiver<Result<Row, SqlError>>,
198) -> impl Stream<Item = Result<Row, SqlError>> + Send + 'static {
199 futures_util::stream::poll_fn(move |cx| rx.poll_recv(cx))
200}