Skip to main content

ferrule_sql/
stream.rs

1//! Bounded-memory streaming reads — the native-cursor counterpart to
2//! the eager [`Connection::query`](crate::Connection::query).
3//!
4//! `query` materializes the whole result into a `Vec<Row>`, which is
5//! fine for the CLI table view but unusable for an embedder ingesting a
6//! multi-million-row table under a fixed memory budget. [`RowCursor`]
7//! pulls rows from a **native driver cursor** one bounded batch at a
8//! time, so peak resident memory is `O(batch + cap)` rather than
9//! `O(total rows)`.
10//!
11//! **Bounded-memory model.** Every backend drives a row producer that is
12//! back-pressured: the async network drivers (`tokio-postgres`
13//! `query_raw`, `mysql_async` `query_iter`, `tiberius` `QueryStream`)
14//! expose a native row stream that only fetches more from the server as
15//! the consumer pulls; the synchronous drivers (`rusqlite`, ODPI-C
16//! `oracle`) run their row-stepping loop on a `spawn_blocking` thread
17//! that feeds a **bounded `tokio::sync::mpsc` channel** — the producer
18//! blocks on `blocking_send` once `cap` rows are in flight, so at most
19//! `cap` rows are ever buffered ahead of the consumer. Either way no
20//! code path collects the full result.
21//!
22//! **Blocking model.** [`RowCursor`] borrows its connection's private
23//! current-thread runtime; each [`next_batch`](RowCursor::next_batch) /
24//! [`Iterator::next`] call drives the producer with `block_on`, blocking
25//! the calling thread until the next row(s) arrive. The same
26//! current-thread reentrancy rule as [`Connection`](crate::Connection)
27//! applies — never pull from a cursor inside another `block_on` on the
28//! same thread.
29//!
30//! **Size guards.** Each decoded row is checked against the connection's
31//! per-cell / per-row [`SizeGuards`] before it is
32//! retained, so a pathological cell fails fast with a structured error
33//! rather than OOMing a streaming ingest.
34
35use crate::error::SqlError;
36use crate::guard::SizeGuards;
37use crate::value::{ColumnInfo, Row};
38use futures_util::stream::{Stream, StreamExt};
39use std::pin::Pin;
40
41/// Default number of rows a streaming cursor keeps in flight between the
42/// producer and the consumer. Caps the bounded channel for the
43/// synchronous (`spawn_blocking`) backends and the default
44/// [`next_batch`](RowCursor::next_batch) chunk; the async backends are
45/// additionally bounded by the driver's own server-side fetch window.
46pub const DEFAULT_CURSOR_CAPACITY: usize = 1024;
47
48/// A boxed, backend-erased stream of decoded rows.
49///
50/// Each backend's `query_stream` builds one of these from its native
51/// cursor. The `'a` lifetime ties an async backend's stream to the
52/// `&mut` borrow of its driver handle (so no self-referential storage /
53/// `unsafe` is needed); the synchronous backends produce a `'static`
54/// stream backed by their bounded channel.
55pub type BoxRowStream<'a> = Pin<Box<dyn Stream<Item = Result<Row, SqlError>> + Send + 'a>>;
56
57/// A streaming read handle over a native database cursor.
58///
59/// Hand back by [`Connection::query_cursor`](crate::Connection::query_cursor).
60/// Pull rows with [`next_batch`](Self::next_batch) (bounded chunk) or by
61/// iterating ([`Iterator`] yields one `Result<Row, SqlError>` at a
62/// time). See the [module docs](self) for the bounded-memory and
63/// blocking contracts.
64pub struct RowCursor<'a> {
65    columns: Vec<ColumnInfo>,
66    /// The private runtime that drives the producer. Borrowed from the
67    /// owning connection; `block_on` on it advances the stream.
68    rt: &'a tokio::runtime::Runtime,
69    stream: BoxRowStream<'a>,
70    /// Per-cell / per-row size ceilings applied to every decoded row.
71    /// The streaming cursor does not apply the *total*-buffer cap (it is
72    /// bounded by design), only the per-cell and per-row caps.
73    guards: SizeGuards,
74    /// 0-based ordinal of the next row to be produced — threaded into the
75    /// [`SizeGuards`] diagnostics so an error names the real row.
76    row_ordinal: u64,
77    /// Set once the underlying stream has yielded `None`, so subsequent
78    /// pulls short-circuit without re-polling an exhausted stream.
79    exhausted: bool,
80}
81
82impl<'a> RowCursor<'a> {
83    /// Construct a cursor from a backend stream plus the runtime that
84    /// drives it. Internal — backends return their stream and the public
85    /// `query_cursor` wires in the runtime.
86    pub(crate) fn new(
87        columns: Vec<ColumnInfo>,
88        rt: &'a tokio::runtime::Runtime,
89        stream: BoxRowStream<'a>,
90        guards: SizeGuards,
91    ) -> Self {
92        Self {
93            columns,
94            rt,
95            stream,
96            guards,
97            row_ordinal: 0,
98            exhausted: false,
99        }
100    }
101
102    /// The column metadata for the streamed result, available before any
103    /// row is pulled.
104    #[must_use]
105    pub fn columns(&self) -> &[ColumnInfo] {
106        &self.columns
107    }
108
109    /// Pull up to `n` rows, blocking until they arrive or the result is
110    /// exhausted. Returns fewer than `n` (possibly empty) at end of
111    /// stream. This call's transient allocation is bounded by `n` rows
112    /// plus the producer's in-flight window — never the full result.
113    ///
114    /// `n` of `0` returns an empty `Vec` without touching the producer.
115    /// The first driver error is returned immediately; callers should
116    /// stop on `Err`.
117    pub fn next_batch(&mut self, n: usize) -> Result<Vec<Row>, SqlError> {
118        if n == 0 || self.exhausted {
119            return Ok(Vec::new());
120        }
121        // Drive the producer cooperatively on the owned runtime. The
122        // closure pulls at most `n` rows and returns the owned batch, so
123        // `block_on` returns as soon as that batch is assembled — the
124        // producer is never run ahead of the consumer beyond its own
125        // bounded window.
126        let columns = &self.columns;
127        let guards = &self.guards;
128        let stream = &mut self.stream;
129        let start_ordinal = self.row_ordinal;
130        let cap = n.min(DEFAULT_CURSOR_CAPACITY);
131        let result: Result<(Vec<Row>, bool), SqlError> = self.rt.block_on(async move {
132            let mut out = Vec::with_capacity(cap);
133            let mut ordinal = start_ordinal;
134            for _ in 0..n {
135                match stream.next().await {
136                    Some(Ok(row)) => {
137                        // Size-check before retaining the row, so an
138                        // oversized cell fails fast instead of buffering.
139                        guards.check_row(ordinal, &row, columns)?;
140                        ordinal += 1;
141                        out.push(row);
142                    }
143                    Some(Err(e)) => return Err(e),
144                    None => return Ok((out, true)),
145                }
146            }
147            Ok((out, false))
148        });
149        match result {
150            Ok((out, reached_end)) => {
151                self.row_ordinal += out.len() as u64;
152                self.exhausted = reached_end;
153                Ok(out)
154            }
155            Err(e) => {
156                // A driver error abandons the batch; mark exhausted so a
157                // careless re-pull does not double-drive a half-consumed
158                // async stream.
159                self.exhausted = true;
160                Err(e)
161            }
162        }
163    }
164}
165
166impl Iterator for RowCursor<'_> {
167    type Item = Result<Row, SqlError>;
168
169    /// Yield the next row, blocking until it arrives. `None` marks
170    /// end-of-stream; an `Err` item marks a driver failure (after which
171    /// iteration stops).
172    fn next(&mut self) -> Option<Self::Item> {
173        if self.exhausted {
174            return None;
175        }
176        match self.next_batch(1) {
177            Ok(mut rows) => rows.pop().map(Ok),
178            Err(e) => Some(Err(e)),
179        }
180    }
181}
182
183/// Build a `'static` [`BoxRowStream`] from a bounded `tokio::sync::mpsc`
184/// receiver. Used by the synchronous backends (`rusqlite`, `oracle`)
185/// whose `spawn_blocking` producer sends decoded rows through the
186/// channel; the channel's bound is the in-flight cap.
187pub(crate) fn channel_stream(
188    rx: tokio::sync::mpsc::Receiver<Result<Row, SqlError>>,
189) -> BoxRowStream<'static> {
190    Box::pin(tokio_stream_from_channel(rx))
191}
192
193/// Adapt a bounded mpsc receiver into a [`Stream`]. Kept as a free
194/// function (rather than pulling in `tokio-stream`) so the dependency
195/// surface stays minimal — `futures_util` is already a backend dep.
196fn tokio_stream_from_channel(
197    mut rx: tokio::sync::mpsc::Receiver<Result<Row, SqlError>>,
198) -> impl Stream<Item = Result<Row, SqlError>> + Send + 'static {
199    futures_util::stream::poll_fn(move |cx| rx.poll_recv(cx))
200}