1use std::sync::Arc;
13
14use bsql_driver_postgres::arena::release_arena;
15use bsql_driver_postgres::{Arena, ColumnDesc, QueryResult};
16
17const STREAM_CHUNK_SIZE: i32 = 64;
23
24pub struct QueryStream {
45 guard: Option<bsql_driver_postgres::PoolGuard>,
47 arena: Option<Arena>,
48 current_result: Option<QueryResult>,
50 position: usize,
52 columns: Arc<[ColumnDesc]>,
56 finished: bool,
58 needs_execute: bool,
62}
63
64impl QueryStream {
65 pub(crate) fn new(
71 guard: bsql_driver_postgres::PoolGuard,
72 arena: Arena,
73 first_result: QueryResult,
74 columns: Arc<[ColumnDesc]>,
75 finished: bool,
76 ) -> Self {
77 Self {
78 guard: Some(guard),
79 arena: Some(arena),
80 current_result: Some(first_result),
81 position: 0,
82 columns,
83 finished,
84 needs_execute: !finished, }
86 }
87
88 pub fn next_row(&mut self) -> Option<bsql_driver_postgres::Row<'_>> {
100 if let Some(ref result) = self.current_result {
102 if self.position < result.len() {
103 let arena = self.arena.as_ref()?;
104 let row = result.row(self.position, arena);
105 self.position += 1;
106 return Some(row);
107 }
108 }
109
110 None
113 }
114
115 pub async fn advance(&mut self) -> Result<bool, crate::error::BsqlError> {
132 if let Some(ref result) = self.current_result {
134 if self.position < result.len() {
135 return Ok(true);
136 }
137 }
138
139 if self.finished {
141 return Ok(false);
142 }
143
144 self.fetch_next_chunk().await?;
146
147 if let Some(ref result) = self.current_result {
149 if self.position < result.len() {
150 return Ok(true);
151 }
152 }
153
154 Ok(false)
155 }
156
157 pub fn has_more(&self) -> bool {
160 if let Some(ref result) = self.current_result {
161 if self.position < result.len() {
162 return true;
163 }
164 }
165 !self.finished
166 }
167
168 pub async fn fetch_next_chunk(&mut self) -> Result<bool, crate::error::BsqlError> {
177 if self.finished {
178 return Ok(false);
179 }
180
181 let guard = self.guard.as_mut().ok_or_else(|| {
182 crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
183 "stream guard already taken".into(),
184 ))
185 })?;
186
187 let arena = self.arena.as_mut().ok_or_else(|| {
188 crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
189 "stream arena already taken".into(),
190 ))
191 })?;
192
193 arena.reset();
195
196 if self.needs_execute {
198 guard
199 .streaming_send_execute(STREAM_CHUNK_SIZE)
200 .map_err(crate::error::BsqlError::from_driver_query)?;
201 }
202
203 let num_cols = self.columns.len();
204
205 let mut col_offsets = match self.current_result.as_mut() {
207 Some(result) => {
208 let mut v = result.take_col_offsets();
209 v.clear();
210 v
211 }
212 None => Vec::with_capacity(num_cols * STREAM_CHUNK_SIZE as usize),
213 };
214
215 let more = guard
216 .streaming_next_chunk(arena, &mut col_offsets)
217 .map_err(crate::error::BsqlError::from_driver_query)?;
218
219 if !more {
220 self.finished = true;
221 }
222 self.needs_execute = more; if col_offsets.is_empty() && !more {
225 self.current_result = None;
226 self.position = 0;
227 return Ok(false);
228 }
229
230 self.current_result = Some(QueryResult::from_parts(
233 col_offsets,
234 num_cols,
235 Arc::clone(&self.columns),
236 0,
237 ));
238 self.position = 0;
239
240 Ok(true)
241 }
242
243 pub fn remaining(&self) -> usize {
245 match self.current_result {
246 Some(ref result) => result.len().saturating_sub(self.position),
247 None => 0,
248 }
249 }
250
251 pub fn columns(&self) -> &[ColumnDesc] {
253 &self.columns
254 }
255}
256
257impl Drop for QueryStream {
258 fn drop(&mut self) {
259 if let Some(arena) = self.arena.take() {
260 release_arena(arena);
261 }
262 if !self.finished {
268 if let Some(mut guard) = self.guard.take() {
269 guard.mark_discard();
270 drop(guard);
271 }
272 }
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use bsql_driver_postgres::arena::acquire_arena;
280 use bsql_driver_postgres::{ColumnDesc, QueryResult};
281
282 fn make_result(num_rows: usize, columns: &Arc<[ColumnDesc]>) -> QueryResult {
285 let num_cols = columns.len();
286 let col_offsets = vec![(0usize, -1i32); num_rows * num_cols];
287 QueryResult::from_parts(col_offsets, num_cols, Arc::clone(columns), 0)
288 }
289
290 fn sample_columns(n: usize) -> Arc<[ColumnDesc]> {
291 (0..n)
292 .map(|i| ColumnDesc {
293 name: format!("col{i}").into(),
294 type_oid: 23,
295 type_size: 4,
296 table_oid: 0,
297 column_id: 0,
298 })
299 .collect::<Vec<_>>()
300 .into()
301 }
302
303 fn make_stream(num_rows: usize, num_cols: usize, finished: bool) -> QueryStream {
306 let columns = sample_columns(num_cols);
307 let result = make_result(num_rows, &columns);
308 let arena = acquire_arena();
309 QueryStream {
310 guard: None,
311 arena: Some(arena),
312 current_result: Some(result),
313 position: 0,
314 columns,
315 finished,
316 needs_execute: !finished,
317 }
318 }
319
320 #[test]
323 fn next_row_returns_rows() {
324 let mut stream = make_stream(3, 2, true);
325 assert!(stream.next_row().is_some());
326 assert!(stream.next_row().is_some());
327 assert!(stream.next_row().is_some());
328 }
329
330 #[test]
331 fn next_row_returns_none_when_exhausted() {
332 let mut stream = make_stream(2, 1, true);
333 assert!(stream.next_row().is_some());
334 assert!(stream.next_row().is_some());
335 assert!(stream.next_row().is_none());
336 }
337
338 #[test]
339 fn next_row_returns_none_for_empty_result() {
340 let mut stream = make_stream(0, 1, true);
341 assert!(stream.next_row().is_none());
342 }
343
344 #[test]
347 fn has_more_true_when_rows_in_buffer() {
348 let stream = make_stream(2, 1, true);
349 assert!(stream.has_more());
350 }
351
352 #[test]
353 fn has_more_false_when_exhausted_and_finished() {
354 let mut stream = make_stream(1, 1, true);
355 let _ = stream.next_row();
356 assert!(!stream.has_more());
357 }
358
359 #[test]
360 fn has_more_true_when_exhausted_but_not_finished() {
361 let mut stream = make_stream(1, 1, false);
362 let _ = stream.next_row();
363 assert!(stream.has_more());
365 }
366
367 #[test]
370 fn remaining_full_buffer() {
371 let stream = make_stream(5, 2, true);
372 assert_eq!(stream.remaining(), 5);
373 }
374
375 #[test]
376 fn remaining_after_consuming() {
377 let mut stream = make_stream(3, 1, true);
378 let _ = stream.next_row();
379 assert_eq!(stream.remaining(), 2);
380 let _ = stream.next_row();
381 assert_eq!(stream.remaining(), 1);
382 let _ = stream.next_row();
383 assert_eq!(stream.remaining(), 0);
384 }
385
386 #[test]
387 fn remaining_empty_result() {
388 let stream = make_stream(0, 1, true);
389 assert_eq!(stream.remaining(), 0);
390 }
391
392 #[test]
395 fn columns_returns_descriptors() {
396 let stream = make_stream(1, 3, true);
397 let cols = stream.columns();
398 assert_eq!(cols.len(), 3);
399 assert_eq!(&*cols[0].name, "col0");
400 assert_eq!(&*cols[1].name, "col1");
401 assert_eq!(&*cols[2].name, "col2");
402 }
403
404 #[test]
407 fn finished_stream_has_more_false_after_drain() {
408 let mut stream = make_stream(1, 1, true);
409 let _ = stream.next_row();
410 assert!(!stream.has_more());
411 }
412
413 #[tokio::test]
416 async fn fetch_next_chunk_without_guard_errors() {
417 let mut stream = make_stream(0, 1, false);
418 let result = stream.fetch_next_chunk().await;
419 assert!(result.is_err(), "should error without guard");
420 }
421
422 #[tokio::test]
423 async fn fetch_next_chunk_when_finished_returns_false() {
424 let mut stream = make_stream(0, 1, true);
425 let result = stream.fetch_next_chunk().await.unwrap();
426 assert!(!result, "finished stream should return false");
427 }
428
429 #[tokio::test]
432 async fn advance_returns_true_when_rows_available() {
433 let mut stream = make_stream(2, 1, true);
434 let has = stream.advance().await.unwrap();
435 assert!(has);
436 }
437
438 #[tokio::test]
439 async fn advance_returns_false_when_finished_and_exhausted() {
440 let mut stream = make_stream(1, 1, true);
441 let _ = stream.next_row(); let has = stream.advance().await.unwrap();
443 assert!(!has);
444 }
445
446 #[test]
449 fn drop_releases_arena() {
450 let stream = make_stream(3, 2, true);
451 drop(stream);
452 let arena = acquire_arena();
454 bsql_driver_postgres::arena::release_arena(arena);
455 }
456
457 #[tokio::test]
460 async fn fetch_next_chunk_without_arena_errors() {
461 let columns = sample_columns(1);
462 let result = make_result(0, &columns);
463 let mut stream = QueryStream {
464 guard: None,
465 arena: None, current_result: Some(result),
467 position: 0,
468 columns,
469 finished: false,
470 needs_execute: false,
471 };
472 let res = stream.fetch_next_chunk().await;
473 assert!(res.is_err(), "should error without arena");
474 }
475
476 #[tokio::test]
479 async fn advance_fetch_fails_propagates_error() {
480 let mut stream = make_stream(0, 1, false);
482 let res = stream.advance().await;
483 assert!(res.is_err(), "advance should propagate fetch error");
484 }
485
486 #[test]
489 fn remaining_with_none_result() {
490 let columns = sample_columns(1);
491 let arena = acquire_arena();
492 let stream = QueryStream {
493 guard: None,
494 arena: Some(arena),
495 current_result: None,
496 position: 0,
497 columns,
498 finished: true,
499 needs_execute: false,
500 };
501 assert_eq!(stream.remaining(), 0);
502 }
503
504 #[test]
507 fn has_more_with_none_result_finished() {
508 let columns = sample_columns(1);
509 let arena = acquire_arena();
510 let stream = QueryStream {
511 guard: None,
512 arena: Some(arena),
513 current_result: None,
514 position: 0,
515 columns,
516 finished: true,
517 needs_execute: false,
518 };
519 assert!(!stream.has_more());
520 }
521
522 #[test]
525 fn columns_zero_columns() {
526 let stream = make_stream(0, 0, true);
527 assert_eq!(stream.columns().len(), 0);
528 }
529
530 #[test]
533 fn make_stream_finished_true_needs_execute_false() {
534 let stream = make_stream(3, 2, true);
535 assert!(
536 !stream.needs_execute,
537 "finished stream should not need execute"
538 );
539 assert!(stream.finished);
540 }
541
542 #[test]
543 fn make_stream_not_finished_needs_execute_true() {
544 let stream = make_stream(3, 2, false);
545 assert!(
546 stream.needs_execute,
547 "unfinished stream should need execute"
548 );
549 assert!(!stream.finished);
550 }
551
552 #[test]
553 fn make_stream_zero_rows_zero_cols_remaining_zero() {
554 let stream = make_stream(0, 0, true);
555 assert_eq!(stream.remaining(), 0);
556 assert!(!stream.has_more());
557 }
558
559 #[test]
562 fn remaining_and_has_more_consistency() {
563 let mut stream = make_stream(3, 1, true);
564 assert_eq!(stream.remaining(), 3);
566 assert!(stream.has_more());
567 let _ = stream.next_row();
569 assert_eq!(stream.remaining(), 2);
570 assert!(stream.has_more());
571 let _ = stream.next_row();
573 let _ = stream.next_row();
574 assert_eq!(stream.remaining(), 0);
575 assert!(!stream.has_more());
576 }
577
578 #[test]
579 fn remaining_and_has_more_when_not_finished() {
580 let mut stream = make_stream(1, 1, false);
581 assert_eq!(stream.remaining(), 1);
583 assert!(stream.has_more());
584 let _ = stream.next_row();
586 assert_eq!(stream.remaining(), 0);
587 assert!(
588 stream.has_more(),
589 "unfinished stream should report has_more even with empty buffer"
590 );
591 }
592
593 #[test]
596 fn drop_finished_stream_does_not_panic() {
597 let mut stream = make_stream(1, 1, true);
598 let _ = stream.next_row();
599 drop(stream);
601 }
602
603 #[test]
604 fn drop_unfinished_stream_does_not_panic() {
605 let stream = make_stream(5, 2, false);
607 drop(stream);
608 }
609
610 #[tokio::test]
613 async fn advance_does_not_consume_row() {
614 let mut stream = make_stream(2, 1, true);
615 assert!(stream.advance().await.unwrap());
616 assert_eq!(stream.remaining(), 2);
618 let _ = stream.next_row();
620 assert_eq!(stream.remaining(), 1);
621 }
622}