use std::sync::Arc;
use bsql_driver_postgres::arena::release_arena;
use bsql_driver_postgres::{Arena, ColumnDesc, QueryResult};
const STREAM_CHUNK_SIZE: i32 = 64;
pub struct QueryStream {
guard: Option<bsql_driver_postgres::PoolGuard>,
arena: Option<Arena>,
current_result: Option<QueryResult>,
position: usize,
columns: Arc<[ColumnDesc]>,
finished: bool,
needs_execute: bool,
}
impl QueryStream {
pub(crate) fn new(
guard: bsql_driver_postgres::PoolGuard,
arena: Arena,
first_result: QueryResult,
columns: Arc<[ColumnDesc]>,
finished: bool,
) -> Self {
Self {
guard: Some(guard),
arena: Some(arena),
current_result: Some(first_result),
position: 0,
columns,
finished,
needs_execute: !finished, }
}
pub fn next_row(&mut self) -> Option<bsql_driver_postgres::Row<'_>> {
if let Some(ref result) = self.current_result {
if self.position < result.len() {
let arena = self.arena.as_ref()?;
let row = result.row(self.position, arena);
self.position += 1;
return Some(row);
}
}
None
}
pub async fn advance(&mut self) -> Result<bool, crate::error::BsqlError> {
if let Some(ref result) = self.current_result {
if self.position < result.len() {
return Ok(true);
}
}
if self.finished {
return Ok(false);
}
self.fetch_next_chunk().await?;
if let Some(ref result) = self.current_result {
if self.position < result.len() {
return Ok(true);
}
}
Ok(false)
}
pub fn has_more(&self) -> bool {
if let Some(ref result) = self.current_result {
if self.position < result.len() {
return true;
}
}
!self.finished
}
pub async fn fetch_next_chunk(&mut self) -> Result<bool, crate::error::BsqlError> {
if self.finished {
return Ok(false);
}
let guard = self.guard.as_mut().ok_or_else(|| {
crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
"stream guard already taken".into(),
))
})?;
let arena = self.arena.as_mut().ok_or_else(|| {
crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
"stream arena already taken".into(),
))
})?;
arena.reset();
if self.needs_execute {
guard
.streaming_send_execute(STREAM_CHUNK_SIZE)
.await
.map_err(crate::error::BsqlError::from_driver_query)?;
}
let num_cols = self.columns.len();
let mut col_offsets = match self.current_result.as_mut() {
Some(result) => {
let mut v = result.take_col_offsets();
v.clear();
v
}
None => Vec::with_capacity(num_cols * STREAM_CHUNK_SIZE as usize),
};
let more = guard
.streaming_next_chunk(arena, &mut col_offsets)
.await
.map_err(crate::error::BsqlError::from_driver_query)?;
if !more {
self.finished = true;
}
self.needs_execute = more;
if col_offsets.is_empty() && !more {
self.current_result = None;
self.position = 0;
return Ok(false);
}
self.current_result = Some(QueryResult::from_parts(
col_offsets,
num_cols,
Arc::clone(&self.columns),
0,
));
self.position = 0;
Ok(true)
}
pub fn remaining(&self) -> usize {
match self.current_result {
Some(ref result) => result.len().saturating_sub(self.position),
None => 0,
}
}
pub fn columns(&self) -> &[ColumnDesc] {
&self.columns
}
}
impl Drop for QueryStream {
fn drop(&mut self) {
if let Some(arena) = self.arena.take() {
release_arena(arena);
}
if !self.finished {
if let Some(mut guard) = self.guard.take() {
guard.mark_discard();
drop(guard);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bsql_driver_postgres::arena::acquire_arena;
use bsql_driver_postgres::{ColumnDesc, QueryResult};
fn make_result(num_rows: usize, columns: &Arc<[ColumnDesc]>) -> QueryResult {
let num_cols = columns.len();
let col_offsets = vec![(0usize, -1i32); num_rows * num_cols];
QueryResult::from_parts(col_offsets, num_cols, Arc::clone(columns), 0)
}
fn sample_columns(n: usize) -> Arc<[ColumnDesc]> {
(0..n)
.map(|i| ColumnDesc {
name: format!("col{i}").into(),
type_oid: 23,
type_size: 4,
table_oid: 0,
column_id: 0,
})
.collect::<Vec<_>>()
.into()
}
fn make_stream(num_rows: usize, num_cols: usize, finished: bool) -> QueryStream {
let columns = sample_columns(num_cols);
let result = make_result(num_rows, &columns);
let arena = acquire_arena();
QueryStream {
guard: None,
arena: Some(arena),
current_result: Some(result),
position: 0,
columns,
finished,
needs_execute: !finished,
}
}
#[test]
fn next_row_returns_rows() {
let mut stream = make_stream(3, 2, true);
assert!(stream.next_row().is_some());
assert!(stream.next_row().is_some());
assert!(stream.next_row().is_some());
}
#[test]
fn next_row_returns_none_when_exhausted() {
let mut stream = make_stream(2, 1, true);
assert!(stream.next_row().is_some());
assert!(stream.next_row().is_some());
assert!(stream.next_row().is_none());
}
#[test]
fn next_row_returns_none_for_empty_result() {
let mut stream = make_stream(0, 1, true);
assert!(stream.next_row().is_none());
}
#[test]
fn has_more_true_when_rows_in_buffer() {
let stream = make_stream(2, 1, true);
assert!(stream.has_more());
}
#[test]
fn has_more_false_when_exhausted_and_finished() {
let mut stream = make_stream(1, 1, true);
let _ = stream.next_row();
assert!(!stream.has_more());
}
#[test]
fn has_more_true_when_exhausted_but_not_finished() {
let mut stream = make_stream(1, 1, false);
let _ = stream.next_row();
assert!(stream.has_more());
}
#[test]
fn remaining_full_buffer() {
let stream = make_stream(5, 2, true);
assert_eq!(stream.remaining(), 5);
}
#[test]
fn remaining_after_consuming() {
let mut stream = make_stream(3, 1, true);
let _ = stream.next_row();
assert_eq!(stream.remaining(), 2);
let _ = stream.next_row();
assert_eq!(stream.remaining(), 1);
let _ = stream.next_row();
assert_eq!(stream.remaining(), 0);
}
#[test]
fn remaining_empty_result() {
let stream = make_stream(0, 1, true);
assert_eq!(stream.remaining(), 0);
}
#[test]
fn columns_returns_descriptors() {
let stream = make_stream(1, 3, true);
let cols = stream.columns();
assert_eq!(cols.len(), 3);
assert_eq!(&*cols[0].name, "col0");
assert_eq!(&*cols[1].name, "col1");
assert_eq!(&*cols[2].name, "col2");
}
#[test]
fn finished_stream_has_more_false_after_drain() {
let mut stream = make_stream(1, 1, true);
let _ = stream.next_row();
assert!(!stream.has_more());
}
#[tokio::test]
async fn fetch_next_chunk_without_guard_errors() {
let mut stream = make_stream(0, 1, false);
let result = stream.fetch_next_chunk().await;
assert!(result.is_err(), "should error without guard");
}
#[tokio::test]
async fn fetch_next_chunk_when_finished_returns_false() {
let mut stream = make_stream(0, 1, true);
let result = stream.fetch_next_chunk().await.unwrap();
assert!(!result, "finished stream should return false");
}
#[tokio::test]
async fn advance_returns_true_when_rows_available() {
let mut stream = make_stream(2, 1, true);
let has = stream.advance().await.unwrap();
assert!(has);
}
#[tokio::test]
async fn advance_returns_false_when_finished_and_exhausted() {
let mut stream = make_stream(1, 1, true);
let _ = stream.next_row(); let has = stream.advance().await.unwrap();
assert!(!has);
}
#[test]
fn drop_releases_arena() {
let stream = make_stream(3, 2, true);
drop(stream);
let arena = acquire_arena();
bsql_driver_postgres::arena::release_arena(arena);
}
#[tokio::test]
async fn fetch_next_chunk_without_arena_errors() {
let columns = sample_columns(1);
let result = make_result(0, &columns);
let mut stream = QueryStream {
guard: None,
arena: None, current_result: Some(result),
position: 0,
columns,
finished: false,
needs_execute: false,
};
let res = stream.fetch_next_chunk().await;
assert!(res.is_err(), "should error without arena");
}
#[tokio::test]
async fn advance_fetch_fails_propagates_error() {
let mut stream = make_stream(0, 1, false);
let res = stream.advance().await;
assert!(res.is_err(), "advance should propagate fetch error");
}
#[test]
fn remaining_with_none_result() {
let columns = sample_columns(1);
let arena = acquire_arena();
let stream = QueryStream {
guard: None,
arena: Some(arena),
current_result: None,
position: 0,
columns,
finished: true,
needs_execute: false,
};
assert_eq!(stream.remaining(), 0);
}
#[test]
fn has_more_with_none_result_finished() {
let columns = sample_columns(1);
let arena = acquire_arena();
let stream = QueryStream {
guard: None,
arena: Some(arena),
current_result: None,
position: 0,
columns,
finished: true,
needs_execute: false,
};
assert!(!stream.has_more());
}
#[test]
fn columns_zero_columns() {
let stream = make_stream(0, 0, true);
assert_eq!(stream.columns().len(), 0);
}
}