use crate::error::{DbxError, DbxResult};
use crate::sql::executor::operators::physical_operator::PhysicalOperator;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use arrow::ipc::reader::StreamReader;
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
pub struct GridExchangeOperator {
schema: Arc<Schema>,
receiver: Receiver<DbxResult<Option<Vec<u8>>>>,
}
impl GridExchangeOperator {
pub fn new(schema: Arc<Schema>, receiver: Receiver<DbxResult<Option<Vec<u8>>>>) -> Self {
Self { schema, receiver }
}
}
impl PhysicalOperator for GridExchangeOperator {
fn schema(&self) -> &Schema {
&self.schema
}
fn next(&mut self) -> DbxResult<Option<RecordBatch>> {
match self.receiver.blocking_recv() {
Some(Ok(Some(raw_bytes))) => {
let cursor = Cursor::new(raw_bytes);
let mut reader =
StreamReader::try_new(cursor, None).map_err(|e| DbxError::SqlExecution {
message: format!("Arrow IPC stream init failed: {}", e),
context: "GridExchangeOperator".to_string(),
})?;
if let Some(result) = reader.next() {
let batch = result.map_err(|e| DbxError::SqlExecution {
message: format!("Arrow IPC parse failed: {}", e),
context: "GridExchangeOperator".to_string(),
})?;
Ok(Some(batch))
} else {
Err(DbxError::SqlExecution {
message: "Empty IPC stream received".to_string(),
context: "GridExchangeOperator".to_string(),
})
}
}
Some(Ok(None)) => Ok(None), Some(Err(e)) => Err(e),
None => {
Err(DbxError::SqlExecution {
message: "Grid exchange channel closed unexpectedly".to_string(),
context: "GridExchangeOperator".to_string(),
})
}
}
}
fn reset(&mut self) -> DbxResult<()> {
Err(DbxError::SqlExecution {
message: "Reset is not supported on GridExchangeOperator".to_string(),
context: "GridExchangeOperator".to_string(),
})
}
}