use std::collections::VecDeque;
use crate::error::GqlError;
use crate::proto;
use crate::status;
use crate::types::Value;
pub struct ResultCursor {
stream: tonic::Streaming<proto::ExecuteResponse>,
header: Option<proto::ResultHeader>,
summary: Option<proto::ResultSummary>,
buffered_rows: VecDeque<Vec<Value>>,
done: bool,
}
impl ResultCursor {
pub(crate) fn new(stream: tonic::Streaming<proto::ExecuteResponse>) -> Self {
Self {
stream,
header: None,
summary: None,
buffered_rows: VecDeque::new(),
done: false,
}
}
pub async fn header(&mut self) -> Result<Option<&proto::ResultHeader>, GqlError> {
if self.header.is_some() {
return Ok(self.header.as_ref());
}
self.advance_to_header().await?;
Ok(self.header.as_ref())
}
pub async fn column_names(&mut self) -> Result<Vec<String>, GqlError> {
self.header().await?;
Ok(self
.header
.as_ref()
.map(|h| h.columns.iter().map(|c| c.name.clone()).collect())
.unwrap_or_default())
}
pub async fn next_row(&mut self) -> Result<Option<Vec<Value>>, GqlError> {
if let Some(row) = self.buffered_rows.pop_front() {
return Ok(Some(row));
}
if self.done {
return Ok(None);
}
loop {
if let Some(response) = self.stream.message().await? {
match response.frame {
Some(proto::execute_response::Frame::Header(h)) => {
self.header = Some(h);
}
Some(proto::execute_response::Frame::RowBatch(batch)) => {
let mut rows: VecDeque<Vec<Value>> = batch
.rows
.into_iter()
.map(|r| r.values.into_iter().map(Value::from).collect())
.collect();
if let Some(first) = rows.pop_front() {
self.buffered_rows = rows;
return Ok(Some(first));
}
}
Some(proto::execute_response::Frame::Summary(s)) => {
self.summary = Some(s);
self.done = true;
return Ok(None);
}
None => {}
}
} else {
self.done = true;
return Ok(None);
}
}
}
pub async fn collect_rows(&mut self) -> Result<Vec<Vec<Value>>, GqlError> {
let mut all_rows = Vec::new();
while let Some(row) = self.next_row().await? {
all_rows.push(row);
}
Ok(all_rows)
}
pub async fn summary(&mut self) -> Result<Option<&proto::ResultSummary>, GqlError> {
if self.summary.is_some() {
return Ok(self.summary.as_ref());
}
while !self.done {
self.next_row().await?;
}
Ok(self.summary.as_ref())
}
pub async fn is_success(&mut self) -> Result<bool, GqlError> {
let summary = self.summary().await?;
Ok(summary
.and_then(|s| s.status.as_ref())
.is_some_and(|s| status::is_success(&s.code)))
}
pub async fn rows_affected(&mut self) -> Result<i64, GqlError> {
let summary = self.summary().await?;
Ok(summary.map_or(0, |s| s.rows_affected))
}
async fn advance_to_header(&mut self) -> Result<(), GqlError> {
while !self.done {
if let Some(response) = self.stream.message().await? {
match response.frame {
Some(proto::execute_response::Frame::Header(h)) => {
self.header = Some(h);
return Ok(());
}
Some(proto::execute_response::Frame::RowBatch(batch)) => {
let rows: VecDeque<Vec<Value>> = batch
.rows
.into_iter()
.map(|r| r.values.into_iter().map(Value::from).collect())
.collect();
self.buffered_rows.extend(rows);
}
Some(proto::execute_response::Frame::Summary(s)) => {
self.summary = Some(s);
self.done = true;
return Ok(());
}
None => {}
}
} else {
self.done = true;
return Ok(());
}
}
Ok(())
}
}