1use std::sync::Arc;
13
14use bsql_driver_postgres::arena::release_arena;
15use bsql_driver_postgres::{Arena, ColumnDesc, QueryResult};
16
17const STREAM_CHUNK_SIZE: i32 = 64;
23
24pub struct QueryStream {
45 guard: Option<bsql_driver_postgres::PoolGuard>,
47 arena: Option<Arena>,
48 current_result: Option<QueryResult>,
50 position: usize,
52 columns: Arc<[ColumnDesc]>,
56 finished: bool,
58 needs_execute: bool,
62}
63
64impl QueryStream {
65 pub(crate) fn new(
71 guard: bsql_driver_postgres::PoolGuard,
72 arena: Arena,
73 first_result: QueryResult,
74 columns: Arc<[ColumnDesc]>,
75 finished: bool,
76 ) -> Self {
77 Self {
78 guard: Some(guard),
79 arena: Some(arena),
80 current_result: Some(first_result),
81 position: 0,
82 columns,
83 finished,
84 needs_execute: !finished, }
86 }
87
88 pub fn next_row(&mut self) -> Option<bsql_driver_postgres::Row<'_>> {
100 if let Some(ref result) = self.current_result {
102 if self.position < result.len() {
103 let arena = self.arena.as_ref()?;
104 let row = result.row(self.position, arena);
105 self.position += 1;
106 return Some(row);
107 }
108 }
109
110 None
113 }
114
115 pub async fn advance(&mut self) -> Result<bool, crate::error::BsqlError> {
132 if let Some(ref result) = self.current_result {
134 if self.position < result.len() {
135 return Ok(true);
136 }
137 }
138
139 if self.finished {
141 return Ok(false);
142 }
143
144 self.fetch_next_chunk().await?;
146
147 if let Some(ref result) = self.current_result {
149 if self.position < result.len() {
150 return Ok(true);
151 }
152 }
153
154 Ok(false)
155 }
156
157 pub fn has_more(&self) -> bool {
160 if let Some(ref result) = self.current_result {
161 if self.position < result.len() {
162 return true;
163 }
164 }
165 !self.finished
166 }
167
168 pub async fn fetch_next_chunk(&mut self) -> Result<bool, crate::error::BsqlError> {
177 if self.finished {
178 return Ok(false);
179 }
180
181 let guard = self.guard.as_mut().ok_or_else(|| {
182 crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
183 "stream guard already taken".into(),
184 ))
185 })?;
186
187 let arena = self.arena.as_mut().ok_or_else(|| {
188 crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
189 "stream arena already taken".into(),
190 ))
191 })?;
192
193 arena.reset();
195
196 if self.needs_execute {
198 guard
199 .streaming_send_execute(STREAM_CHUNK_SIZE)
200 .await
201 .map_err(crate::error::BsqlError::from_driver_query)?;
202 }
203
204 let num_cols = self.columns.len();
205
206 let mut col_offsets = match self.current_result.as_mut() {
208 Some(result) => {
209 let mut v = result.take_col_offsets();
210 v.clear();
211 v
212 }
213 None => Vec::with_capacity(num_cols * STREAM_CHUNK_SIZE as usize),
214 };
215
216 let more = guard
217 .streaming_next_chunk(arena, &mut col_offsets)
218 .await
219 .map_err(crate::error::BsqlError::from_driver_query)?;
220
221 if !more {
222 self.finished = true;
223 }
224 self.needs_execute = more; if col_offsets.is_empty() && !more {
227 self.current_result = None;
228 self.position = 0;
229 return Ok(false);
230 }
231
232 self.current_result = Some(QueryResult::from_parts(
235 col_offsets,
236 num_cols,
237 Arc::clone(&self.columns),
238 0,
239 ));
240 self.position = 0;
241
242 Ok(true)
243 }
244
245 pub fn remaining(&self) -> usize {
247 match self.current_result {
248 Some(ref result) => result.len().saturating_sub(self.position),
249 None => 0,
250 }
251 }
252
253 pub fn columns(&self) -> &[ColumnDesc] {
255 &self.columns
256 }
257}
258
259impl Drop for QueryStream {
260 fn drop(&mut self) {
261 if let Some(arena) = self.arena.take() {
262 release_arena(arena);
263 }
264 if !self.finished {
270 if let Some(mut guard) = self.guard.take() {
271 guard.mark_discard();
272 drop(guard);
273 }
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use bsql_driver_postgres::arena::acquire_arena;
282 use bsql_driver_postgres::{ColumnDesc, QueryResult};
283
284 fn make_result(num_rows: usize, columns: &Arc<[ColumnDesc]>) -> QueryResult {
287 let num_cols = columns.len();
288 let col_offsets = vec![(0usize, -1i32); num_rows * num_cols];
289 QueryResult::from_parts(col_offsets, num_cols, Arc::clone(columns), 0)
290 }
291
292 fn sample_columns(n: usize) -> Arc<[ColumnDesc]> {
293 (0..n)
294 .map(|i| ColumnDesc {
295 name: format!("col{i}").into(),
296 type_oid: 23,
297 type_size: 4,
298 table_oid: 0,
299 column_id: 0,
300 })
301 .collect::<Vec<_>>()
302 .into()
303 }
304
305 fn make_stream(num_rows: usize, num_cols: usize, finished: bool) -> QueryStream {
308 let columns = sample_columns(num_cols);
309 let result = make_result(num_rows, &columns);
310 let arena = acquire_arena();
311 QueryStream {
312 guard: None,
313 arena: Some(arena),
314 current_result: Some(result),
315 position: 0,
316 columns,
317 finished,
318 needs_execute: !finished,
319 }
320 }
321
322 #[test]
325 fn next_row_returns_rows() {
326 let mut stream = make_stream(3, 2, true);
327 assert!(stream.next_row().is_some());
328 assert!(stream.next_row().is_some());
329 assert!(stream.next_row().is_some());
330 }
331
332 #[test]
333 fn next_row_returns_none_when_exhausted() {
334 let mut stream = make_stream(2, 1, true);
335 assert!(stream.next_row().is_some());
336 assert!(stream.next_row().is_some());
337 assert!(stream.next_row().is_none());
338 }
339
340 #[test]
341 fn next_row_returns_none_for_empty_result() {
342 let mut stream = make_stream(0, 1, true);
343 assert!(stream.next_row().is_none());
344 }
345
346 #[test]
349 fn has_more_true_when_rows_in_buffer() {
350 let stream = make_stream(2, 1, true);
351 assert!(stream.has_more());
352 }
353
354 #[test]
355 fn has_more_false_when_exhausted_and_finished() {
356 let mut stream = make_stream(1, 1, true);
357 let _ = stream.next_row();
358 assert!(!stream.has_more());
359 }
360
361 #[test]
362 fn has_more_true_when_exhausted_but_not_finished() {
363 let mut stream = make_stream(1, 1, false);
364 let _ = stream.next_row();
365 assert!(stream.has_more());
367 }
368
369 #[test]
372 fn remaining_full_buffer() {
373 let stream = make_stream(5, 2, true);
374 assert_eq!(stream.remaining(), 5);
375 }
376
377 #[test]
378 fn remaining_after_consuming() {
379 let mut stream = make_stream(3, 1, true);
380 let _ = stream.next_row();
381 assert_eq!(stream.remaining(), 2);
382 let _ = stream.next_row();
383 assert_eq!(stream.remaining(), 1);
384 let _ = stream.next_row();
385 assert_eq!(stream.remaining(), 0);
386 }
387
388 #[test]
389 fn remaining_empty_result() {
390 let stream = make_stream(0, 1, true);
391 assert_eq!(stream.remaining(), 0);
392 }
393
394 #[test]
397 fn columns_returns_descriptors() {
398 let stream = make_stream(1, 3, true);
399 let cols = stream.columns();
400 assert_eq!(cols.len(), 3);
401 assert_eq!(&*cols[0].name, "col0");
402 assert_eq!(&*cols[1].name, "col1");
403 assert_eq!(&*cols[2].name, "col2");
404 }
405
406 #[test]
409 fn finished_stream_has_more_false_after_drain() {
410 let mut stream = make_stream(1, 1, true);
411 let _ = stream.next_row();
412 assert!(!stream.has_more());
413 }
414
415 #[tokio::test]
418 async fn fetch_next_chunk_without_guard_errors() {
419 let mut stream = make_stream(0, 1, false);
420 let result = stream.fetch_next_chunk().await;
421 assert!(result.is_err(), "should error without guard");
422 }
423
424 #[tokio::test]
425 async fn fetch_next_chunk_when_finished_returns_false() {
426 let mut stream = make_stream(0, 1, true);
427 let result = stream.fetch_next_chunk().await.unwrap();
428 assert!(!result, "finished stream should return false");
429 }
430
431 #[tokio::test]
434 async fn advance_returns_true_when_rows_available() {
435 let mut stream = make_stream(2, 1, true);
436 let has = stream.advance().await.unwrap();
437 assert!(has);
438 }
439
440 #[tokio::test]
441 async fn advance_returns_false_when_finished_and_exhausted() {
442 let mut stream = make_stream(1, 1, true);
443 let _ = stream.next_row(); let has = stream.advance().await.unwrap();
445 assert!(!has);
446 }
447
448 #[test]
451 fn drop_releases_arena() {
452 let stream = make_stream(3, 2, true);
453 drop(stream);
454 let arena = acquire_arena();
456 bsql_driver_postgres::arena::release_arena(arena);
457 }
458
459 #[tokio::test]
462 async fn fetch_next_chunk_without_arena_errors() {
463 let columns = sample_columns(1);
464 let result = make_result(0, &columns);
465 let mut stream = QueryStream {
466 guard: None,
467 arena: None, current_result: Some(result),
469 position: 0,
470 columns,
471 finished: false,
472 needs_execute: false,
473 };
474 let res = stream.fetch_next_chunk().await;
475 assert!(res.is_err(), "should error without arena");
476 }
477
478 #[tokio::test]
481 async fn advance_fetch_fails_propagates_error() {
482 let mut stream = make_stream(0, 1, false);
484 let res = stream.advance().await;
485 assert!(res.is_err(), "advance should propagate fetch error");
486 }
487
488 #[test]
491 fn remaining_with_none_result() {
492 let columns = sample_columns(1);
493 let arena = acquire_arena();
494 let stream = QueryStream {
495 guard: None,
496 arena: Some(arena),
497 current_result: None,
498 position: 0,
499 columns,
500 finished: true,
501 needs_execute: false,
502 };
503 assert_eq!(stream.remaining(), 0);
504 }
505
506 #[test]
509 fn has_more_with_none_result_finished() {
510 let columns = sample_columns(1);
511 let arena = acquire_arena();
512 let stream = QueryStream {
513 guard: None,
514 arena: Some(arena),
515 current_result: None,
516 position: 0,
517 columns,
518 finished: true,
519 needs_execute: false,
520 };
521 assert!(!stream.has_more());
522 }
523
524 #[test]
527 fn columns_zero_columns() {
528 let stream = make_stream(0, 0, true);
529 assert_eq!(stream.columns().len(), 0);
530 }
531}