use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use grafeo_common::types::{LogicalType, Value};
use grafeo_common::utils::error::{Error, QueryError, Result};
use grafeo_core::execution::DataChunk;
use grafeo_core::execution::operators::Operator;
use crate::database::QueryResult;
pub(crate) struct StreamGuard<'s> {
counter: &'s AtomicUsize,
}
impl<'s> StreamGuard<'s> {
pub(crate) fn new(counter: &'s AtomicUsize) -> Self {
counter.fetch_add(1, Ordering::AcqRel);
Self { counter }
}
}
impl Drop for StreamGuard<'_> {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::AcqRel);
}
}
pub struct ResultStream<'session> {
operator: Box<dyn Operator>,
columns: Vec<String>,
column_types: Vec<LogicalType>,
deadline: Option<Instant>,
exhausted: bool,
_guard: StreamGuard<'session>,
}
impl<'s> ResultStream<'s> {
pub(crate) fn new(
operator: Box<dyn Operator>,
columns: Vec<String>,
deadline: Option<Instant>,
guard: StreamGuard<'s>,
) -> Self {
let len = columns.len();
Self {
operator,
columns,
column_types: vec![LogicalType::Any; len],
deadline,
exhausted: false,
_guard: guard,
}
}
#[must_use]
pub fn columns(&self) -> &[String] {
&self.columns
}
#[must_use]
pub fn column_types(&self) -> &[LogicalType] {
&self.column_types
}
pub fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
if self.exhausted {
return Ok(None);
}
check_deadline(self.deadline)?;
match self.operator.next() {
Ok(Some(chunk)) => {
refine_column_types(&chunk, &mut self.column_types);
Ok(Some(chunk))
}
Ok(None) => {
self.exhausted = true;
Ok(None)
}
Err(err) => Err(super::convert_operator_error(err)),
}
}
#[must_use]
pub fn into_row_iter(self) -> RowIterator<'s> {
RowIterator {
stream: self,
current: None,
cursor: 0,
}
}
pub fn collect(mut self) -> Result<QueryResult> {
let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
while let Some(chunk) = self.next_chunk()? {
append_chunk(&chunk, &mut result);
}
result.column_types = self.column_types;
Ok(result)
}
}
pub struct RowIterator<'s> {
stream: ResultStream<'s>,
current: Option<(DataChunk, Vec<usize>)>,
cursor: usize,
}
impl RowIterator<'_> {
#[must_use]
pub fn columns(&self) -> &[String] {
self.stream.columns()
}
}
impl Iterator for RowIterator<'_> {
type Item = Result<Vec<Value>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((chunk, indices)) = &self.current {
if self.cursor < indices.len() {
let row_idx = indices[self.cursor];
self.cursor += 1;
return Some(Ok(extract_row(chunk, row_idx)));
}
self.current = None;
self.cursor = 0;
}
match self.stream.next_chunk() {
Ok(Some(chunk)) => {
if chunk.row_count() == 0 {
continue;
}
let indices: Vec<usize> = chunk.selected_indices().collect();
self.current = Some((chunk, indices));
self.cursor = 0;
}
Ok(None) => return None,
Err(err) => return Some(Err(err)),
}
}
}
}
pub struct OwnedResultStream {
operator: Box<dyn Operator>,
columns: Vec<String>,
column_types: Vec<LogicalType>,
deadline: Option<Instant>,
exhausted: bool,
}
impl std::fmt::Debug for OwnedResultStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OwnedResultStream")
.field("columns", &self.columns)
.field("column_types", &self.column_types)
.field("deadline", &self.deadline)
.field("exhausted", &self.exhausted)
.finish_non_exhaustive()
}
}
impl OwnedResultStream {
pub(crate) fn new(
operator: Box<dyn Operator>,
columns: Vec<String>,
deadline: Option<Instant>,
) -> Self {
let len = columns.len();
Self {
operator,
columns,
column_types: vec![LogicalType::Any; len],
deadline,
exhausted: false,
}
}
#[must_use]
pub fn columns(&self) -> &[String] {
&self.columns
}
#[must_use]
pub fn column_types(&self) -> &[LogicalType] {
&self.column_types
}
pub fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
if self.exhausted {
return Ok(None);
}
check_deadline(self.deadline)?;
match self.operator.next() {
Ok(Some(chunk)) => {
refine_column_types(&chunk, &mut self.column_types);
Ok(Some(chunk))
}
Ok(None) => {
self.exhausted = true;
Ok(None)
}
Err(err) => Err(super::convert_operator_error(err)),
}
}
#[must_use]
pub fn into_row_iter(self) -> OwnedRowIterator {
OwnedRowIterator {
stream: self,
current: None,
cursor: 0,
}
}
pub fn collect(mut self) -> Result<QueryResult> {
let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
while let Some(chunk) = self.next_chunk()? {
append_chunk(&chunk, &mut result);
}
result.column_types = self.column_types;
Ok(result)
}
}
pub struct OwnedRowIterator {
stream: OwnedResultStream,
current: Option<(DataChunk, Vec<usize>)>,
cursor: usize,
}
impl OwnedRowIterator {
#[must_use]
pub fn columns(&self) -> &[String] {
self.stream.columns()
}
}
impl Iterator for OwnedRowIterator {
type Item = Result<Vec<Value>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((chunk, indices)) = &self.current {
if self.cursor < indices.len() {
let row_idx = indices[self.cursor];
self.cursor += 1;
return Some(Ok(extract_row(chunk, row_idx)));
}
self.current = None;
self.cursor = 0;
}
match self.stream.next_chunk() {
Ok(Some(chunk)) => {
if chunk.row_count() == 0 {
continue;
}
let indices: Vec<usize> = chunk.selected_indices().collect();
self.current = Some((chunk, indices));
self.cursor = 0;
}
Ok(None) => return None,
Err(err) => return Some(Err(err)),
}
}
}
}
fn check_deadline(deadline: Option<Instant>) -> Result<()> {
#[cfg(not(target_arch = "wasm32"))]
if let Some(d) = deadline
&& Instant::now() >= d
{
return Err(Error::Query(QueryError::timeout()));
}
#[cfg(target_arch = "wasm32")]
let _ = deadline;
Ok(())
}
fn refine_column_types(chunk: &DataChunk, types: &mut Vec<LogicalType>) {
let col_count = chunk.column_count();
if col_count == 0 {
return;
}
if types.len() != col_count {
types.resize(col_count, LogicalType::Any);
}
for (col_idx, slot) in types.iter_mut().enumerate().take(col_count) {
if matches!(slot, LogicalType::Any)
&& let Some(col) = chunk.column(col_idx)
{
*slot = col.data_type().clone();
}
}
}
fn extract_row(chunk: &DataChunk, row_idx: usize) -> Vec<Value> {
let col_count = chunk.column_count();
let mut row = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let value = chunk
.column(col_idx)
.and_then(|col| col.get_value(row_idx))
.unwrap_or(Value::Null);
row.push(value);
}
row
}
fn append_chunk(chunk: &DataChunk, result: &mut QueryResult) {
for row_idx in chunk.selected_indices() {
result.rows.push(extract_row(chunk, row_idx));
}
}