Skip to main content

narwhal_core/
query_stream.rs

1//! Ergonomic streaming wrapper bundling column headers with an async
2//! row iterator.
3//!
4//! [`crate::Connection`] already exposes two execution paths:
5//!
6//! * [`Connection::execute`](crate::Connection::execute) — materialises
7//!   the entire result on the wire. Used for non-`SELECT` statements
8//!   that report `rows_affected`, and as the historical hot path for
9//!   small interactive queries.
10//! * [`Connection::stream`](crate::Connection::stream) — hands back a
11//!   row-by-row [`DynRowStream`]. Used by the
12//!   TUI's worker (`narwhal-app::run::run_stream`) so a 1 M-row
13//!   `SELECT` does not block until the engine has produced its final
14//!   row.
15//!
16//! [`QueryStream`] sits between the two. It wraps the row stream
17//! together with the column header vector that every consumer needs
18//! up-front, and provides:
19//!
20//! * [`QueryStream::next_row`] for the row-at-a-time loop —
21//!   semantically identical to [`crate::RowStream::next_row`] but
22//!   wrapped in `Option<Result<_>>` instead of `Result<Option<_>>`
23//!   so the canonical `while let Some(row) = s.next_row().await`
24//!   shape works without an extra match.
25//! * [`QueryStream::collect_all`] for the "drain into the old shape"
26//!   bridge that tests, MCP and the export path want.
27//! * [`QueryStream::columns`] / [`QueryStream::rows_yielded`] /
28//!   [`QueryStream::elapsed`] for the TUI live-counter.
29//!
30//! ## Drop / cancellation
31//!
32//! Dropping a half-drained `QueryStream` releases the wrapped
33//! `Box<dyn DynRowStream>` synchronously, which in turn drops the
34//! driver-side cursor / portal / channel and aborts the query.
35//! The dyn-safe [`DynRowStream::close`] is **async** so it cannot run
36//! from `Drop`; explicit cleanup goes through [`QueryStream::close`]
37//! (which is awaitable and surfaces release errors). The contract
38//! every workspace driver upholds:
39//!
40//! 1. `Drop` on the wrapped `DynRowStream` must be sufficient to
41//!    release server-side resources — it may emit a best-effort
42//!    "close" message but it must not block the runtime.
43//! 2. `close()` is the awaitable path when the caller wants to
44//!    surface a server-side release failure (PG portal close,
45//!    `MySQL` `KILL QUERY`, `ClickHouse` HTTP body discard).
46//!
47//! The two methods on [`QueryStream`] that drain on the caller's
48//! behalf ([`QueryStream::collect_all`] and
49//! [`QueryStream::collect_with_limit`]) always invoke `close()` so
50//! the cursor is released through the awaitable path even when the
51//! caller did not see the stream end-of-data signal.
52//!
53//! ## Why no `futures::Stream` impl?
54//!
55//! `QueryStream` deliberately does **not** implement
56//! `futures_core::Stream`. Two reasons:
57//!
58//! 1. The workspace's [`crate::RowStream`] trait already uses a
59//!    bespoke `async fn next_row(&mut self) -> Result<Option<Row>>`
60//!    shape because every driver author works at that boundary, not
61//!    at the lower-level `poll_next(Pin<&mut Self>, &mut Context)`
62//!    boundary that `Stream` exposes. Wrapping it in `Stream` would
63//!    require either self-referential pinning (annoying for callers)
64//!    or a hand-rolled `stream::unfold` adapter (which leaks the
65//!    `self`-by-value semantics into the caller's match arms).
66//! 2. The TUI run worker drives the stream with a `tokio::time::
67//!    timeout` wrap around each `next_row()` call — see
68//!    `narwhal_app::run::run_stream`. Adding a `Stream` impl would
69//!    invite callers to `StreamExt::buffered`-style adapters that
70//!    bypass the bounded-batch contract.
71//!
72//! Callers that genuinely need a `Stream` can build one in three
73//! lines via `futures::stream::unfold(qs, |mut qs| async move {
74//! qs.next_row().await.map(|r| (r, qs)) })`.
75
76use std::time::{Duration, Instant};
77
78use crate::error::Result;
79use crate::schema::{ColumnHeader, QueryResult, Row};
80use crate::stream::DynRowStream;
81
82/// Upfront `Vec::with_capacity` ceiling for [`QueryStream::
83/// collect_with_limit`]. Picked so a million-row `limit` (the cap a
84/// caller might pass to avoid an explicit `take`) does not eagerly
85/// allocate gigabytes; the vector still grows past this if the
86/// stream actually yields more than [`COLLECT_PREALLOC_CAP`] rows.
87const COLLECT_PREALLOC_CAP: usize = 1024;
88
89/// Clamp `Duration::as_millis()` (a `u128`) down to `u64` without
90/// truncating silently. Modern wall-clock queries do not exceed
91/// `u64::MAX` milliseconds (~584 million years), but a saturating
92/// conversion is cheap insurance against a misbehaving driver that
93/// hands back a nonsensical elapsed.
94fn elapsed_ms_saturating(d: Duration) -> u64 {
95    u64::try_from(d.as_millis()).unwrap_or(u64::MAX)
96}
97
98/// Streaming view of a query result.
99///
100/// Constructed by [`crate::Connection::query`]. Owns the underlying
101/// [`DynRowStream`] and lets callers observe schema metadata, drain
102/// the rows, or close the cursor explicitly.
103///
104/// The type is **not** marked `#[non_exhaustive]` because every field
105/// is private; the struct is only ever built through
106/// [`QueryStream::new`] (driver authors / test helpers) or returned
107/// from [`crate::Connection::query`] (consumers). Adding a field is
108/// non-breaking.
109pub struct QueryStream {
110    /// The driver-side row producer. [`DynRowStream::columns`] is
111    /// the single source of truth for the column metadata —
112    /// [`QueryStream`] delegates to it rather than holding its own
113    /// copy, which would force every `Connection::query` call to
114    /// clone the column vector for nothing.
115    inner: Box<dyn DynRowStream>,
116    started: Instant,
117    rows_yielded: usize,
118    /// Becomes `true` once the inner stream has returned `None` or an
119    /// error. Guards against double-polling drivers that don't
120    /// promise fused-semantics after end-of-stream.
121    drained: bool,
122}
123
124impl QueryStream {
125    /// Wrap an existing row stream. Used by the default
126    /// [`Connection::query`](crate::Connection::query) implementation
127    /// and by driver authors that build a richer stream out-of-band.
128    ///
129    /// Column metadata is read on-demand from
130    /// [`DynRowStream::columns`] — the caller does **not** pass it in
131    /// (review fixup M8: prevents the redundant column-vector clone
132    /// the previous shape required).
133    #[must_use]
134    pub fn new(inner: Box<dyn DynRowStream>) -> Self {
135        Self {
136            inner,
137            started: Instant::now(),
138            rows_yielded: 0,
139            drained: false,
140        }
141    }
142
143    /// Column headers describing the shape of every row this stream
144    /// will yield. Safe to call before the first
145    /// [`Self::next_row`] — the headers are materialised eagerly by
146    /// the driver as part of opening the cursor. Delegates to the
147    /// wrapped [`DynRowStream::columns`] so the two views never
148    /// disagree.
149    #[must_use]
150    pub fn columns(&self) -> &[ColumnHeader] {
151        self.inner.columns()
152    }
153
154    /// Number of rows successfully yielded so far. Drives the TUI's
155    /// "streaming · N rows" header.
156    #[must_use]
157    pub const fn rows_yielded(&self) -> usize {
158        self.rows_yielded
159    }
160
161    /// Elapsed wall-clock time since the stream was opened. Drives
162    /// the TUI's live-elapsed indicator.
163    #[must_use]
164    pub fn elapsed(&self) -> Duration {
165        self.started.elapsed()
166    }
167
168    /// Advance the stream by one row.
169    ///
170    /// Returns `None` once the underlying stream reports end-of-data
171    /// **or** a previous call returned an error. The fused shape lets
172    /// callers loop with `while let Some(row) = s.next_row().await`
173    /// without worrying about double-polling.
174    pub async fn next_row(&mut self) -> Option<Result<Row>> {
175        if self.drained {
176            return None;
177        }
178        match self.inner.next_row().await {
179            Ok(Some(row)) => {
180                self.rows_yielded += 1;
181                Some(Ok(row))
182            }
183            Ok(None) => {
184                self.drained = true;
185                None
186            }
187            Err(error) => {
188                self.drained = true;
189                Some(Err(error))
190            }
191        }
192    }
193
194    /// Drain the stream into a materialised [`QueryResult`]. Used by
195    /// tests, the MCP query tool, and the export path when the caller
196    /// genuinely needs the whole shape in memory before continuing.
197    ///
198    /// `elapsed_ms` is filled from the wall-clock between
199    /// [`Connection::query`](crate::Connection::query) returning and
200    /// the last row arriving — useful for "how long did the streamed
201    /// query take" reporting without the caller wiring its own
202    /// timer.
203    ///
204    /// On error any rows already yielded are discarded; the caller
205    /// gets the engine error verbatim. If partial materialisation
206    /// matters, use [`Self::next_row`] in a loop and accumulate
207    /// manually.
208    pub async fn collect_all(mut self) -> Result<QueryResult> {
209        let mut rows = Vec::new();
210        loop {
211            match self.next_row().await {
212                Some(Ok(row)) => rows.push(row),
213                Some(Err(error)) => {
214                    // Best-effort close so the engine releases its
215                    // cursor; we already have the terminal error, so
216                    // any close failure is logged at WARN to make a
217                    // potential cursor leak observable (review fixup
218                    // m6).
219                    let close_result = self.inner.close().await;
220                    if let Err(close_err) = close_result {
221                        tracing::warn!(
222                            target: "narwhal::query_stream",
223                            error = %close_err,
224                            "close-after-error failed (possible cursor leak)",
225                        );
226                    }
227                    return Err(error);
228                }
229                None => break,
230            }
231        }
232        let elapsed_ms = elapsed_ms_saturating(self.started.elapsed());
233        // Columns are read off the inner stream before we close it.
234        let columns = self.inner.columns().to_vec();
235        if let Err(close_err) = self.inner.close().await {
236            tracing::warn!(
237                target: "narwhal::query_stream",
238                error = %close_err,
239                "close after end-of-stream failed (possible cursor leak)",
240            );
241        }
242        Ok(QueryResult {
243            columns,
244            rows,
245            rows_affected: None,
246            elapsed_ms,
247        })
248    }
249
250    /// Drain the stream into a materialised [`QueryResult`] but stop
251    /// once `limit` rows have been accumulated. Subsequent rows
252    /// produced by the engine are discarded and the cursor is
253    /// closed — useful for the MCP tool's hard row cap without
254    /// reaching for `take`-style adapters.
255    ///
256    /// `truncated` in the returned tuple is `true` when the engine
257    /// had more rows to give. Callers should surface this to the
258    /// agent so it knows the response is incomplete.
259    pub async fn collect_with_limit(mut self, limit: usize) -> Result<(QueryResult, bool)> {
260        // Defensive shortcut: limit = 0 means "don't read anything";
261        // we still report whether there *would* have been rows by
262        // peeking once at the inner stream directly (so we never
263        // touch the public `next_row` counter — review fixup M2).
264        if limit == 0 {
265            let truncated = !self.drained && self.peek_has_more().await?;
266            let elapsed_ms = elapsed_ms_saturating(self.started.elapsed());
267            let columns = self.inner.columns().to_vec();
268            if let Err(close_err) = self.inner.close().await {
269                tracing::warn!(
270                    target: "narwhal::query_stream",
271                    error = %close_err,
272                    "close after zero-limit peek failed (possible cursor leak)",
273                );
274            }
275            return Ok((
276                QueryResult {
277                    columns,
278                    rows: Vec::new(),
279                    rows_affected: None,
280                    elapsed_ms,
281                },
282                truncated,
283            ));
284        }
285        let mut rows = Vec::with_capacity(limit.min(COLLECT_PREALLOC_CAP));
286        let mut truncated = false;
287        while rows.len() < limit {
288            match self.next_row().await {
289                Some(Ok(row)) => rows.push(row),
290                Some(Err(error)) => {
291                    if let Err(close_err) = self.inner.close().await {
292                        tracing::warn!(
293                            target: "narwhal::query_stream",
294                            error = %close_err,
295                            "close-after-error failed (possible cursor leak)",
296                        );
297                    }
298                    return Err(error);
299                }
300                None => break,
301            }
302        }
303        // If we exited because we hit the limit and the stream still
304        // has more, set truncated. We peek directly on the inner
305        // stream (bypassing `next_row`) so `rows_yielded()` stays
306        // consistent with `rows.len()` (review fixup M2). The peeked
307        // row is unavoidably discarded — documented contract.
308        if rows.len() == limit && !self.drained {
309            match self.peek_has_more().await {
310                Ok(more) => truncated = more,
311                Err(error) => {
312                    if let Err(close_err) = self.inner.close().await {
313                        tracing::warn!(
314                            target: "narwhal::query_stream",
315                            error = %close_err,
316                            "close-after-error failed (possible cursor leak)",
317                        );
318                    }
319                    return Err(error);
320                }
321            }
322        }
323        let elapsed_ms = elapsed_ms_saturating(self.started.elapsed());
324        let columns = self.inner.columns().to_vec();
325        if let Err(close_err) = self.inner.close().await {
326            tracing::warn!(
327                target: "narwhal::query_stream",
328                error = %close_err,
329                "close after limit drain failed (possible cursor leak)",
330            );
331        }
332        Ok((
333            QueryResult {
334                columns,
335                rows,
336                rows_affected: None,
337                elapsed_ms,
338            },
339            truncated,
340        ))
341    }
342
343    /// Peek directly at the inner stream without touching the public
344    /// counters. Used by [`Self::collect_with_limit`] to decide the
345    /// `truncated` flag while keeping [`Self::rows_yielded`]
346    /// equal to the actually-returned row count (review fixup M2).
347    /// Sets [`Self::drained`] when the peek confirms end-of-data so
348    /// the caller does not have to.
349    async fn peek_has_more(&mut self) -> Result<bool> {
350        match self.inner.next_row().await {
351            Ok(Some(_discarded)) => Ok(true),
352            Ok(None) => {
353                self.drained = true;
354                Ok(false)
355            }
356            Err(error) => {
357                self.drained = true;
358                Err(error)
359            }
360        }
361    }
362
363    /// Explicitly close the stream. Equivalent to dropping it for any
364    /// driver that wires its `Drop` impl to release the cursor, but
365    /// `close()` is awaitable so callers can surface server-side
366    /// release errors. Required by drivers that hold ephemeral
367    /// server-side state (PG portals, `ClickHouse` HTTP body) where the
368    /// async close round-trip must complete before the connection is
369    /// returned to the pool.
370    pub async fn close(self) -> Result<()> {
371        self.inner.close().await
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use crate::error::Error;
379    use crate::future::BoxFuture;
380    use crate::schema::Row;
381    use crate::stream::DynRowStream;
382    use crate::value::Value;
383
384    /// In-memory `DynRowStream` for the round-trip tests below. Yields
385    /// pre-canned rows, then either ends or errors on the (N+1)-th
386    /// `next_row` call.
387    struct VecStream {
388        columns: Vec<ColumnHeader>,
389        rows: std::vec::IntoIter<Row>,
390        terminal: Option<Error>,
391        close_called: std::sync::Arc<std::sync::atomic::AtomicBool>,
392    }
393
394    impl VecStream {
395        fn new(
396            columns: Vec<ColumnHeader>,
397            rows: Vec<Row>,
398            terminal: Option<Error>,
399        ) -> (Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
400            let close_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
401            let stream = Self {
402                columns,
403                rows: rows.into_iter(),
404                terminal,
405                close_called: std::sync::Arc::clone(&close_called),
406            };
407            (stream, close_called)
408        }
409    }
410
411    impl DynRowStream for VecStream {
412        fn columns(&self) -> &[ColumnHeader] {
413            &self.columns
414        }
415
416        fn next_row(&mut self) -> BoxFuture<'_, Result<Option<Row>>> {
417            Box::pin(async move {
418                if let Some(row) = self.rows.next() {
419                    return Ok(Some(row));
420                }
421                if let Some(error) = self.terminal.take() {
422                    return Err(error);
423                }
424                Ok(None)
425            })
426        }
427
428        fn close(self: Box<Self>) -> BoxFuture<'static, Result<()>> {
429            self.close_called
430                .store(true, std::sync::atomic::Ordering::SeqCst);
431            Box::pin(async { Ok(()) })
432        }
433    }
434
435    fn col(name: &str) -> ColumnHeader {
436        ColumnHeader {
437            name: name.to_owned(),
438            data_type: "TEXT".to_owned(),
439        }
440    }
441
442    fn row(values: &[&str]) -> Row {
443        Row(values
444            .iter()
445            .map(|s| Value::String((*s).to_owned()))
446            .collect())
447    }
448
449    #[tokio::test]
450    async fn next_row_yields_then_ends() {
451        let (s, closed) = VecStream::new(vec![col("a")], vec![row(&["1"]), row(&["2"])], None);
452        let mut qs = QueryStream::new(Box::new(s));
453        assert_eq!(qs.rows_yielded(), 0);
454        assert!(qs.next_row().await.unwrap().is_ok());
455        assert_eq!(qs.rows_yielded(), 1);
456        assert!(qs.next_row().await.unwrap().is_ok());
457        assert!(qs.next_row().await.is_none());
458        // Fused: a second post-end call also returns None without
459        // re-polling the inner stream.
460        assert!(qs.next_row().await.is_none());
461        // Drop closes via Drop only if driver wires it; explicit
462        // close required for confirmation.
463        assert!(!closed.load(std::sync::atomic::Ordering::SeqCst));
464        qs.close().await.unwrap();
465        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
466    }
467
468    #[tokio::test]
469    async fn collect_all_round_trips() {
470        let (s, closed) = VecStream::new(
471            vec![col("a"), col("b")],
472            vec![row(&["1", "x"]), row(&["2", "y"]), row(&["3", "z"])],
473            None,
474        );
475        let qs = QueryStream::new(Box::new(s));
476        let qr = qs.collect_all().await.unwrap();
477        assert_eq!(qr.columns.len(), 2);
478        assert_eq!(qr.rows.len(), 3);
479        assert!(qr.rows_affected.is_none());
480        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
481    }
482
483    #[tokio::test]
484    async fn collect_all_propagates_terminal_error() {
485        let err = Error::Query("boom".into());
486        let (s, closed) = VecStream::new(vec![col("a")], vec![row(&["only-row"])], Some(err));
487        let qs = QueryStream::new(Box::new(s));
488        let result = qs.collect_all().await;
489        assert!(matches!(result, Err(Error::Query(_))));
490        // Close fires even on the error path so the cursor leaks
491        // nothing.
492        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
493    }
494
495    #[tokio::test]
496    async fn next_row_fuses_after_error() {
497        let err = Error::Query("boom".into());
498        let (s, _) = VecStream::new(vec![col("a")], vec![], Some(err));
499        let mut qs = QueryStream::new(Box::new(s));
500        assert!(matches!(qs.next_row().await, Some(Err(_))));
501        assert!(qs.next_row().await.is_none());
502        assert!(qs.next_row().await.is_none());
503    }
504
505    #[tokio::test]
506    async fn collect_with_limit_truncates() {
507        let (s, closed) = VecStream::new(
508            vec![col("a")],
509            (0..10).map(|i| row(&[&i.to_string()])).collect(),
510            None,
511        );
512        let qs = QueryStream::new(Box::new(s));
513        let (qr, truncated) = qs.collect_with_limit(3).await.unwrap();
514        assert_eq!(qr.rows.len(), 3);
515        assert!(truncated, "expected truncated=true when engine has more");
516        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
517    }
518
519    #[tokio::test]
520    async fn collect_with_limit_not_truncated_when_exact_fit() {
521        let (s, closed) = VecStream::new(
522            vec![col("a")],
523            vec![row(&["1"]), row(&["2"]), row(&["3"])],
524            None,
525        );
526        let qs = QueryStream::new(Box::new(s));
527        let (qr, truncated) = qs.collect_with_limit(3).await.unwrap();
528        assert_eq!(qr.rows.len(), 3);
529        assert!(
530            !truncated,
531            "expected truncated=false when engine ends at limit"
532        );
533        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
534    }
535
536    #[tokio::test]
537    async fn collect_with_limit_not_truncated_when_under() {
538        let (s, _) = VecStream::new(vec![col("a")], vec![row(&["1"])], None);
539        let qs = QueryStream::new(Box::new(s));
540        let (qr, truncated) = qs.collect_with_limit(10).await.unwrap();
541        assert_eq!(qr.rows.len(), 1);
542        assert!(!truncated);
543    }
544
545    /// Review fixup: defensive `limit = 0` short-circuit. The peek
546    /// path runs once and the resulting [`QueryResult`] is empty;
547    /// the truncated flag reflects whether the engine had rows at
548    /// all.
549    #[tokio::test]
550    async fn collect_with_limit_zero_short_circuits_with_rows() {
551        let (s, closed) = VecStream::new(vec![col("a")], vec![row(&["1"]), row(&["2"])], None);
552        let qs = QueryStream::new(Box::new(s));
553        let (qr, truncated) = qs.collect_with_limit(0).await.unwrap();
554        assert!(qr.rows.is_empty());
555        assert!(truncated, "engine had rows; truncated must be true");
556        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
557    }
558
559    #[tokio::test]
560    async fn collect_with_limit_zero_on_empty_stream() {
561        let (s, closed) = VecStream::new(vec![col("a")], vec![], None);
562        let qs = QueryStream::new(Box::new(s));
563        let (qr, truncated) = qs.collect_with_limit(0).await.unwrap();
564        assert!(qr.rows.is_empty());
565        assert!(!truncated, "empty stream is not truncated");
566        assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
567    }
568
569    /// Review fixup M2: when `collect_with_limit` peeks an extra row
570    /// to set the `truncated` flag, that peek must NOT inflate the
571    /// public counter. We verify by re-running the same fixture
572    /// without the limit and asserting the materialised row count
573    /// matches the limit exactly.
574    #[tokio::test]
575    async fn collect_with_limit_truncated_yields_exactly_limit() {
576        let (s, _) = VecStream::new(
577            vec![col("a")],
578            (0..10).map(|i| row(&[&i.to_string()])).collect(),
579            None,
580        );
581        let qs = QueryStream::new(Box::new(s));
582        let (qr, truncated) = qs.collect_with_limit(3).await.unwrap();
583        assert_eq!(
584            qr.rows.len(),
585            3,
586            "limit cap is hard — no over-collection from the peek"
587        );
588        assert!(truncated);
589    }
590
591    /// Review fixup M8: `columns()` delegates to the inner stream so
592    /// the [`QueryStream`] wrapper holds no redundant copy. Verified
593    /// by constructing with a known column list and reading through
594    /// the [`QueryStream`] API.
595    #[tokio::test]
596    async fn columns_delegates_to_inner() {
597        let inner_cols = vec![col("a"), col("b"), col("c")];
598        let (s, _) = VecStream::new(inner_cols, vec![], None);
599        let qs = QueryStream::new(Box::new(s));
600        assert_eq!(qs.columns().len(), 3);
601        assert_eq!(qs.columns()[0].name, "a");
602        assert_eq!(qs.columns()[2].name, "c");
603    }
604
605    /// Review fixup M8: `collect_all` materialises the columns from
606    /// the inner stream at drain time. Confirms that the
607    /// `inner.columns().to_vec()` path produces the same headers the
608    /// driver advertised.
609    #[tokio::test]
610    async fn collect_all_materialises_columns_from_inner() {
611        let (s, _) = VecStream::new(
612            vec![col("alpha"), col("beta")],
613            vec![row(&["1", "x"])],
614            None,
615        );
616        let qs = QueryStream::new(Box::new(s));
617        let qr = qs.collect_all().await.unwrap();
618        assert_eq!(qr.columns.len(), 2);
619        assert_eq!(qr.columns[0].name, "alpha");
620        assert_eq!(qr.columns[1].name, "beta");
621    }
622
623    #[tokio::test]
624    async fn rows_yielded_tracks_correctly() {
625        let (s, _) = VecStream::new(
626            vec![col("a")],
627            vec![row(&["1"]), row(&["2"]), row(&["3"])],
628            None,
629        );
630        let mut qs = QueryStream::new(Box::new(s));
631        let _ = qs.next_row().await;
632        assert_eq!(qs.rows_yielded(), 1);
633        let _ = qs.next_row().await;
634        let _ = qs.next_row().await;
635        assert_eq!(qs.rows_yielded(), 3);
636        let _ = qs.next_row().await; // None
637        assert_eq!(qs.rows_yielded(), 3);
638    }
639
640    #[tokio::test]
641    async fn drop_releases_without_close() {
642        let (s, closed) = VecStream::new(
643            vec![col("a")],
644            (0..1000).map(|i| row(&[&i.to_string()])).collect(),
645            None,
646        );
647        let mut qs = QueryStream::new(Box::new(s));
648        // Consume two rows then drop mid-stream.
649        let _ = qs.next_row().await;
650        let _ = qs.next_row().await;
651        drop(qs);
652        // VecStream's close is only invoked through the explicit
653        // close path; the rest is up to Drop in real drivers. This
654        // test documents the contract: drop is synchronous and does
655        // NOT call DynRowStream::close.
656        assert!(!closed.load(std::sync::atomic::Ordering::SeqCst));
657    }
658}