narwhal_core/query_stream.rs
1//! Ergonomic streaming wrapper bundling column headers with an async
2//! row iterator.
3//!
4//! [`crate::Connection`] already exposes two execution paths:
5//!
6//! * [`Connection::execute`](crate::Connection::execute) — materialises
7//! the entire result on the wire. Used for non-`SELECT` statements
8//! that report `rows_affected`, and as the historical hot path for
9//! small interactive queries.
10//! * [`Connection::stream`](crate::Connection::stream) — hands back a
11//! row-by-row [`DynRowStream`]. Used by the
12//! TUI's worker (`narwhal-app::run::run_stream`) so a 1 M-row
13//! `SELECT` does not block until the engine has produced its final
14//! row.
15//!
16//! [`QueryStream`] sits between the two. It wraps the row stream
17//! together with the column header vector that every consumer needs
18//! up-front, and provides:
19//!
20//! * [`QueryStream::next_row`] for the row-at-a-time loop —
21//! semantically identical to [`crate::RowStream::next_row`] but
22//! wrapped in `Option<Result<_>>` instead of `Result<Option<_>>`
23//! so the canonical `while let Some(row) = s.next_row().await`
24//! shape works without an extra match.
25//! * [`QueryStream::collect_all`] for the "drain into the old shape"
26//! bridge that tests, MCP and the export path want.
27//! * [`QueryStream::columns`] / [`QueryStream::rows_yielded`] /
28//! [`QueryStream::elapsed`] for the TUI live-counter.
29//!
30//! ## Drop / cancellation
31//!
32//! Dropping a half-drained `QueryStream` releases the wrapped
33//! `Box<dyn DynRowStream>` synchronously, which in turn drops the
34//! driver-side cursor / portal / channel and aborts the query.
35//! The dyn-safe [`DynRowStream::close`] is **async** so it cannot run
36//! from `Drop`; explicit cleanup goes through [`QueryStream::close`]
37//! (which is awaitable and surfaces release errors). The contract
38//! every workspace driver upholds:
39//!
40//! 1. `Drop` on the wrapped `DynRowStream` must be sufficient to
41//! release server-side resources — it may emit a best-effort
42//! "close" message but it must not block the runtime.
43//! 2. `close()` is the awaitable path when the caller wants to
44//! surface a server-side release failure (PG portal close,
45//! `MySQL` `KILL QUERY`, `ClickHouse` HTTP body discard).
46//!
47//! The two methods on [`QueryStream`] that drain on the caller's
48//! behalf ([`QueryStream::collect_all`] and
49//! [`QueryStream::collect_with_limit`]) always invoke `close()` so
50//! the cursor is released through the awaitable path even when the
51//! caller did not see the stream end-of-data signal.
52//!
53//! ## Why no `futures::Stream` impl?
54//!
55//! `QueryStream` deliberately does **not** implement
56//! `futures_core::Stream`. Two reasons:
57//!
58//! 1. The workspace's [`crate::RowStream`] trait already uses a
59//! bespoke `async fn next_row(&mut self) -> Result<Option<Row>>`
60//! shape because every driver author works at that boundary, not
61//! at the lower-level `poll_next(Pin<&mut Self>, &mut Context)`
62//! boundary that `Stream` exposes. Wrapping it in `Stream` would
63//! require either self-referential pinning (annoying for callers)
64//! or a hand-rolled `stream::unfold` adapter (which leaks the
65//! `self`-by-value semantics into the caller's match arms).
66//! 2. The TUI run worker drives the stream with a `tokio::time::
67//! timeout` wrap around each `next_row()` call — see
68//! `narwhal_app::run::run_stream`. Adding a `Stream` impl would
69//! invite callers to `StreamExt::buffered`-style adapters that
70//! bypass the bounded-batch contract.
71//!
72//! Callers that genuinely need a `Stream` can build one in three
73//! lines via `futures::stream::unfold(qs, |mut qs| async move {
74//! qs.next_row().await.map(|r| (r, qs)) })`.
75
76use std::time::{Duration, Instant};
77
78use crate::error::Result;
79use crate::schema::{ColumnHeader, QueryResult, Row};
80use crate::stream::DynRowStream;
81
82/// Upfront `Vec::with_capacity` ceiling for [`QueryStream::
83/// collect_with_limit`]. Picked so a million-row `limit` (the cap a
84/// caller might pass to avoid an explicit `take`) does not eagerly
85/// allocate gigabytes; the vector still grows past this if the
86/// stream actually yields more than [`COLLECT_PREALLOC_CAP`] rows.
87const COLLECT_PREALLOC_CAP: usize = 1024;
88
89/// Clamp `Duration::as_millis()` (a `u128`) down to `u64` without
90/// truncating silently. Modern wall-clock queries do not exceed
91/// `u64::MAX` milliseconds (~584 million years), but a saturating
92/// conversion is cheap insurance against a misbehaving driver that
93/// hands back a nonsensical elapsed.
94fn elapsed_ms_saturating(d: Duration) -> u64 {
95 u64::try_from(d.as_millis()).unwrap_or(u64::MAX)
96}
97
98/// Streaming view of a query result.
99///
100/// Constructed by [`crate::Connection::query`]. Owns the underlying
101/// [`DynRowStream`] and lets callers observe schema metadata, drain
102/// the rows, or close the cursor explicitly.
103///
104/// The type is **not** marked `#[non_exhaustive]` because every field
105/// is private; the struct is only ever built through
106/// [`QueryStream::new`] (driver authors / test helpers) or returned
107/// from [`crate::Connection::query`] (consumers). Adding a field is
108/// non-breaking.
109pub struct QueryStream {
110 /// The driver-side row producer. [`DynRowStream::columns`] is
111 /// the single source of truth for the column metadata —
112 /// [`QueryStream`] delegates to it rather than holding its own
113 /// copy, which would force every `Connection::query` call to
114 /// clone the column vector for nothing.
115 inner: Box<dyn DynRowStream>,
116 started: Instant,
117 rows_yielded: usize,
118 /// Becomes `true` once the inner stream has returned `None` or an
119 /// error. Guards against double-polling drivers that don't
120 /// promise fused-semantics after end-of-stream.
121 drained: bool,
122}
123
124impl QueryStream {
125 /// Wrap an existing row stream. Used by the default
126 /// [`Connection::query`](crate::Connection::query) implementation
127 /// and by driver authors that build a richer stream out-of-band.
128 ///
129 /// Column metadata is read on-demand from
130 /// [`DynRowStream::columns`] — the caller does **not** pass it in
131 /// (review fixup M8: prevents the redundant column-vector clone
132 /// the previous shape required).
133 #[must_use]
134 pub fn new(inner: Box<dyn DynRowStream>) -> Self {
135 Self {
136 inner,
137 started: Instant::now(),
138 rows_yielded: 0,
139 drained: false,
140 }
141 }
142
143 /// Column headers describing the shape of every row this stream
144 /// will yield. Safe to call before the first
145 /// [`Self::next_row`] — the headers are materialised eagerly by
146 /// the driver as part of opening the cursor. Delegates to the
147 /// wrapped [`DynRowStream::columns`] so the two views never
148 /// disagree.
149 #[must_use]
150 pub fn columns(&self) -> &[ColumnHeader] {
151 self.inner.columns()
152 }
153
154 /// Number of rows successfully yielded so far. Drives the TUI's
155 /// "streaming · N rows" header.
156 #[must_use]
157 pub const fn rows_yielded(&self) -> usize {
158 self.rows_yielded
159 }
160
161 /// Elapsed wall-clock time since the stream was opened. Drives
162 /// the TUI's live-elapsed indicator.
163 #[must_use]
164 pub fn elapsed(&self) -> Duration {
165 self.started.elapsed()
166 }
167
168 /// Advance the stream by one row.
169 ///
170 /// Returns `None` once the underlying stream reports end-of-data
171 /// **or** a previous call returned an error. The fused shape lets
172 /// callers loop with `while let Some(row) = s.next_row().await`
173 /// without worrying about double-polling.
174 pub async fn next_row(&mut self) -> Option<Result<Row>> {
175 if self.drained {
176 return None;
177 }
178 match self.inner.next_row().await {
179 Ok(Some(row)) => {
180 self.rows_yielded += 1;
181 Some(Ok(row))
182 }
183 Ok(None) => {
184 self.drained = true;
185 None
186 }
187 Err(error) => {
188 self.drained = true;
189 Some(Err(error))
190 }
191 }
192 }
193
194 /// Drain the stream into a materialised [`QueryResult`]. Used by
195 /// tests, the MCP query tool, and the export path when the caller
196 /// genuinely needs the whole shape in memory before continuing.
197 ///
198 /// `elapsed_ms` is filled from the wall-clock between
199 /// [`Connection::query`](crate::Connection::query) returning and
200 /// the last row arriving — useful for "how long did the streamed
201 /// query take" reporting without the caller wiring its own
202 /// timer.
203 ///
204 /// On error any rows already yielded are discarded; the caller
205 /// gets the engine error verbatim. If partial materialisation
206 /// matters, use [`Self::next_row`] in a loop and accumulate
207 /// manually.
208 pub async fn collect_all(mut self) -> Result<QueryResult> {
209 let mut rows = Vec::new();
210 loop {
211 match self.next_row().await {
212 Some(Ok(row)) => rows.push(row),
213 Some(Err(error)) => {
214 // Best-effort close so the engine releases its
215 // cursor; we already have the terminal error, so
216 // any close failure is logged at WARN to make a
217 // potential cursor leak observable (review fixup
218 // m6).
219 let close_result = self.inner.close().await;
220 if let Err(close_err) = close_result {
221 tracing::warn!(
222 target: "narwhal::query_stream",
223 error = %close_err,
224 "close-after-error failed (possible cursor leak)",
225 );
226 }
227 return Err(error);
228 }
229 None => break,
230 }
231 }
232 let elapsed_ms = elapsed_ms_saturating(self.started.elapsed());
233 // Columns are read off the inner stream before we close it.
234 let columns = self.inner.columns().to_vec();
235 if let Err(close_err) = self.inner.close().await {
236 tracing::warn!(
237 target: "narwhal::query_stream",
238 error = %close_err,
239 "close after end-of-stream failed (possible cursor leak)",
240 );
241 }
242 Ok(QueryResult {
243 columns,
244 rows,
245 rows_affected: None,
246 elapsed_ms,
247 })
248 }
249
250 /// Drain the stream into a materialised [`QueryResult`] but stop
251 /// once `limit` rows have been accumulated. Subsequent rows
252 /// produced by the engine are discarded and the cursor is
253 /// closed — useful for the MCP tool's hard row cap without
254 /// reaching for `take`-style adapters.
255 ///
256 /// `truncated` in the returned tuple is `true` when the engine
257 /// had more rows to give. Callers should surface this to the
258 /// agent so it knows the response is incomplete.
259 pub async fn collect_with_limit(mut self, limit: usize) -> Result<(QueryResult, bool)> {
260 // Defensive shortcut: limit = 0 means "don't read anything";
261 // we still report whether there *would* have been rows by
262 // peeking once at the inner stream directly (so we never
263 // touch the public `next_row` counter — review fixup M2).
264 if limit == 0 {
265 let truncated = !self.drained && self.peek_has_more().await?;
266 let elapsed_ms = elapsed_ms_saturating(self.started.elapsed());
267 let columns = self.inner.columns().to_vec();
268 if let Err(close_err) = self.inner.close().await {
269 tracing::warn!(
270 target: "narwhal::query_stream",
271 error = %close_err,
272 "close after zero-limit peek failed (possible cursor leak)",
273 );
274 }
275 return Ok((
276 QueryResult {
277 columns,
278 rows: Vec::new(),
279 rows_affected: None,
280 elapsed_ms,
281 },
282 truncated,
283 ));
284 }
285 let mut rows = Vec::with_capacity(limit.min(COLLECT_PREALLOC_CAP));
286 let mut truncated = false;
287 while rows.len() < limit {
288 match self.next_row().await {
289 Some(Ok(row)) => rows.push(row),
290 Some(Err(error)) => {
291 if let Err(close_err) = self.inner.close().await {
292 tracing::warn!(
293 target: "narwhal::query_stream",
294 error = %close_err,
295 "close-after-error failed (possible cursor leak)",
296 );
297 }
298 return Err(error);
299 }
300 None => break,
301 }
302 }
303 // If we exited because we hit the limit and the stream still
304 // has more, set truncated. We peek directly on the inner
305 // stream (bypassing `next_row`) so `rows_yielded()` stays
306 // consistent with `rows.len()` (review fixup M2). The peeked
307 // row is unavoidably discarded — documented contract.
308 if rows.len() == limit && !self.drained {
309 match self.peek_has_more().await {
310 Ok(more) => truncated = more,
311 Err(error) => {
312 if let Err(close_err) = self.inner.close().await {
313 tracing::warn!(
314 target: "narwhal::query_stream",
315 error = %close_err,
316 "close-after-error failed (possible cursor leak)",
317 );
318 }
319 return Err(error);
320 }
321 }
322 }
323 let elapsed_ms = elapsed_ms_saturating(self.started.elapsed());
324 let columns = self.inner.columns().to_vec();
325 if let Err(close_err) = self.inner.close().await {
326 tracing::warn!(
327 target: "narwhal::query_stream",
328 error = %close_err,
329 "close after limit drain failed (possible cursor leak)",
330 );
331 }
332 Ok((
333 QueryResult {
334 columns,
335 rows,
336 rows_affected: None,
337 elapsed_ms,
338 },
339 truncated,
340 ))
341 }
342
343 /// Peek directly at the inner stream without touching the public
344 /// counters. Used by [`Self::collect_with_limit`] to decide the
345 /// `truncated` flag while keeping [`Self::rows_yielded`]
346 /// equal to the actually-returned row count (review fixup M2).
347 /// Sets [`Self::drained`] when the peek confirms end-of-data so
348 /// the caller does not have to.
349 async fn peek_has_more(&mut self) -> Result<bool> {
350 match self.inner.next_row().await {
351 Ok(Some(_discarded)) => Ok(true),
352 Ok(None) => {
353 self.drained = true;
354 Ok(false)
355 }
356 Err(error) => {
357 self.drained = true;
358 Err(error)
359 }
360 }
361 }
362
363 /// Explicitly close the stream. Equivalent to dropping it for any
364 /// driver that wires its `Drop` impl to release the cursor, but
365 /// `close()` is awaitable so callers can surface server-side
366 /// release errors. Required by drivers that hold ephemeral
367 /// server-side state (PG portals, `ClickHouse` HTTP body) where the
368 /// async close round-trip must complete before the connection is
369 /// returned to the pool.
370 pub async fn close(self) -> Result<()> {
371 self.inner.close().await
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use crate::error::Error;
379 use crate::future::BoxFuture;
380 use crate::schema::Row;
381 use crate::stream::DynRowStream;
382 use crate::value::Value;
383
384 /// In-memory `DynRowStream` for the round-trip tests below. Yields
385 /// pre-canned rows, then either ends or errors on the (N+1)-th
386 /// `next_row` call.
387 struct VecStream {
388 columns: Vec<ColumnHeader>,
389 rows: std::vec::IntoIter<Row>,
390 terminal: Option<Error>,
391 close_called: std::sync::Arc<std::sync::atomic::AtomicBool>,
392 }
393
394 impl VecStream {
395 fn new(
396 columns: Vec<ColumnHeader>,
397 rows: Vec<Row>,
398 terminal: Option<Error>,
399 ) -> (Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
400 let close_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
401 let stream = Self {
402 columns,
403 rows: rows.into_iter(),
404 terminal,
405 close_called: std::sync::Arc::clone(&close_called),
406 };
407 (stream, close_called)
408 }
409 }
410
411 impl DynRowStream for VecStream {
412 fn columns(&self) -> &[ColumnHeader] {
413 &self.columns
414 }
415
416 fn next_row(&mut self) -> BoxFuture<'_, Result<Option<Row>>> {
417 Box::pin(async move {
418 if let Some(row) = self.rows.next() {
419 return Ok(Some(row));
420 }
421 if let Some(error) = self.terminal.take() {
422 return Err(error);
423 }
424 Ok(None)
425 })
426 }
427
428 fn close(self: Box<Self>) -> BoxFuture<'static, Result<()>> {
429 self.close_called
430 .store(true, std::sync::atomic::Ordering::SeqCst);
431 Box::pin(async { Ok(()) })
432 }
433 }
434
435 fn col(name: &str) -> ColumnHeader {
436 ColumnHeader {
437 name: name.to_owned(),
438 data_type: "TEXT".to_owned(),
439 }
440 }
441
442 fn row(values: &[&str]) -> Row {
443 Row(values
444 .iter()
445 .map(|s| Value::String((*s).to_owned()))
446 .collect())
447 }
448
449 #[tokio::test]
450 async fn next_row_yields_then_ends() {
451 let (s, closed) = VecStream::new(vec![col("a")], vec![row(&["1"]), row(&["2"])], None);
452 let mut qs = QueryStream::new(Box::new(s));
453 assert_eq!(qs.rows_yielded(), 0);
454 assert!(qs.next_row().await.unwrap().is_ok());
455 assert_eq!(qs.rows_yielded(), 1);
456 assert!(qs.next_row().await.unwrap().is_ok());
457 assert!(qs.next_row().await.is_none());
458 // Fused: a second post-end call also returns None without
459 // re-polling the inner stream.
460 assert!(qs.next_row().await.is_none());
461 // Drop closes via Drop only if driver wires it; explicit
462 // close required for confirmation.
463 assert!(!closed.load(std::sync::atomic::Ordering::SeqCst));
464 qs.close().await.unwrap();
465 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
466 }
467
468 #[tokio::test]
469 async fn collect_all_round_trips() {
470 let (s, closed) = VecStream::new(
471 vec![col("a"), col("b")],
472 vec![row(&["1", "x"]), row(&["2", "y"]), row(&["3", "z"])],
473 None,
474 );
475 let qs = QueryStream::new(Box::new(s));
476 let qr = qs.collect_all().await.unwrap();
477 assert_eq!(qr.columns.len(), 2);
478 assert_eq!(qr.rows.len(), 3);
479 assert!(qr.rows_affected.is_none());
480 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
481 }
482
483 #[tokio::test]
484 async fn collect_all_propagates_terminal_error() {
485 let err = Error::Query("boom".into());
486 let (s, closed) = VecStream::new(vec![col("a")], vec![row(&["only-row"])], Some(err));
487 let qs = QueryStream::new(Box::new(s));
488 let result = qs.collect_all().await;
489 assert!(matches!(result, Err(Error::Query(_))));
490 // Close fires even on the error path so the cursor leaks
491 // nothing.
492 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
493 }
494
495 #[tokio::test]
496 async fn next_row_fuses_after_error() {
497 let err = Error::Query("boom".into());
498 let (s, _) = VecStream::new(vec![col("a")], vec![], Some(err));
499 let mut qs = QueryStream::new(Box::new(s));
500 assert!(matches!(qs.next_row().await, Some(Err(_))));
501 assert!(qs.next_row().await.is_none());
502 assert!(qs.next_row().await.is_none());
503 }
504
505 #[tokio::test]
506 async fn collect_with_limit_truncates() {
507 let (s, closed) = VecStream::new(
508 vec![col("a")],
509 (0..10).map(|i| row(&[&i.to_string()])).collect(),
510 None,
511 );
512 let qs = QueryStream::new(Box::new(s));
513 let (qr, truncated) = qs.collect_with_limit(3).await.unwrap();
514 assert_eq!(qr.rows.len(), 3);
515 assert!(truncated, "expected truncated=true when engine has more");
516 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
517 }
518
519 #[tokio::test]
520 async fn collect_with_limit_not_truncated_when_exact_fit() {
521 let (s, closed) = VecStream::new(
522 vec![col("a")],
523 vec![row(&["1"]), row(&["2"]), row(&["3"])],
524 None,
525 );
526 let qs = QueryStream::new(Box::new(s));
527 let (qr, truncated) = qs.collect_with_limit(3).await.unwrap();
528 assert_eq!(qr.rows.len(), 3);
529 assert!(
530 !truncated,
531 "expected truncated=false when engine ends at limit"
532 );
533 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
534 }
535
536 #[tokio::test]
537 async fn collect_with_limit_not_truncated_when_under() {
538 let (s, _) = VecStream::new(vec![col("a")], vec![row(&["1"])], None);
539 let qs = QueryStream::new(Box::new(s));
540 let (qr, truncated) = qs.collect_with_limit(10).await.unwrap();
541 assert_eq!(qr.rows.len(), 1);
542 assert!(!truncated);
543 }
544
545 /// Review fixup: defensive `limit = 0` short-circuit. The peek
546 /// path runs once and the resulting [`QueryResult`] is empty;
547 /// the truncated flag reflects whether the engine had rows at
548 /// all.
549 #[tokio::test]
550 async fn collect_with_limit_zero_short_circuits_with_rows() {
551 let (s, closed) = VecStream::new(vec![col("a")], vec![row(&["1"]), row(&["2"])], None);
552 let qs = QueryStream::new(Box::new(s));
553 let (qr, truncated) = qs.collect_with_limit(0).await.unwrap();
554 assert!(qr.rows.is_empty());
555 assert!(truncated, "engine had rows; truncated must be true");
556 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
557 }
558
559 #[tokio::test]
560 async fn collect_with_limit_zero_on_empty_stream() {
561 let (s, closed) = VecStream::new(vec![col("a")], vec![], None);
562 let qs = QueryStream::new(Box::new(s));
563 let (qr, truncated) = qs.collect_with_limit(0).await.unwrap();
564 assert!(qr.rows.is_empty());
565 assert!(!truncated, "empty stream is not truncated");
566 assert!(closed.load(std::sync::atomic::Ordering::SeqCst));
567 }
568
569 /// Review fixup M2: when `collect_with_limit` peeks an extra row
570 /// to set the `truncated` flag, that peek must NOT inflate the
571 /// public counter. We verify by re-running the same fixture
572 /// without the limit and asserting the materialised row count
573 /// matches the limit exactly.
574 #[tokio::test]
575 async fn collect_with_limit_truncated_yields_exactly_limit() {
576 let (s, _) = VecStream::new(
577 vec![col("a")],
578 (0..10).map(|i| row(&[&i.to_string()])).collect(),
579 None,
580 );
581 let qs = QueryStream::new(Box::new(s));
582 let (qr, truncated) = qs.collect_with_limit(3).await.unwrap();
583 assert_eq!(
584 qr.rows.len(),
585 3,
586 "limit cap is hard — no over-collection from the peek"
587 );
588 assert!(truncated);
589 }
590
591 /// Review fixup M8: `columns()` delegates to the inner stream so
592 /// the [`QueryStream`] wrapper holds no redundant copy. Verified
593 /// by constructing with a known column list and reading through
594 /// the [`QueryStream`] API.
595 #[tokio::test]
596 async fn columns_delegates_to_inner() {
597 let inner_cols = vec![col("a"), col("b"), col("c")];
598 let (s, _) = VecStream::new(inner_cols, vec![], None);
599 let qs = QueryStream::new(Box::new(s));
600 assert_eq!(qs.columns().len(), 3);
601 assert_eq!(qs.columns()[0].name, "a");
602 assert_eq!(qs.columns()[2].name, "c");
603 }
604
605 /// Review fixup M8: `collect_all` materialises the columns from
606 /// the inner stream at drain time. Confirms that the
607 /// `inner.columns().to_vec()` path produces the same headers the
608 /// driver advertised.
609 #[tokio::test]
610 async fn collect_all_materialises_columns_from_inner() {
611 let (s, _) = VecStream::new(
612 vec![col("alpha"), col("beta")],
613 vec![row(&["1", "x"])],
614 None,
615 );
616 let qs = QueryStream::new(Box::new(s));
617 let qr = qs.collect_all().await.unwrap();
618 assert_eq!(qr.columns.len(), 2);
619 assert_eq!(qr.columns[0].name, "alpha");
620 assert_eq!(qr.columns[1].name, "beta");
621 }
622
623 #[tokio::test]
624 async fn rows_yielded_tracks_correctly() {
625 let (s, _) = VecStream::new(
626 vec![col("a")],
627 vec![row(&["1"]), row(&["2"]), row(&["3"])],
628 None,
629 );
630 let mut qs = QueryStream::new(Box::new(s));
631 let _ = qs.next_row().await;
632 assert_eq!(qs.rows_yielded(), 1);
633 let _ = qs.next_row().await;
634 let _ = qs.next_row().await;
635 assert_eq!(qs.rows_yielded(), 3);
636 let _ = qs.next_row().await; // None
637 assert_eq!(qs.rows_yielded(), 3);
638 }
639
640 #[tokio::test]
641 async fn drop_releases_without_close() {
642 let (s, closed) = VecStream::new(
643 vec![col("a")],
644 (0..1000).map(|i| row(&[&i.to_string()])).collect(),
645 None,
646 );
647 let mut qs = QueryStream::new(Box::new(s));
648 // Consume two rows then drop mid-stream.
649 let _ = qs.next_row().await;
650 let _ = qs.next_row().await;
651 drop(qs);
652 // VecStream's close is only invoked through the explicit
653 // close path; the rest is up to Drop in real drivers. This
654 // test documents the contract: drop is synchronous and does
655 // NOT call DynRowStream::close.
656 assert!(!closed.load(std::sync::atomic::Ordering::SeqCst));
657 }
658}