use std::sync::Arc;
use super::*;
use crate::array::{ArrayBuilder, ArrayBuilderImpl, DataChunk, I64ArrayBuilder};
use crate::storage::{Storage, StorageColumnRef, Table, Transaction, TxnIterator};
use crate::v1::binder::BoundExpr;
use crate::v1::optimizer::plan_nodes::PhysicalTableScan;
pub struct TableScanExecutor<S: Storage> {
pub plan: PhysicalTableScan,
pub expr: Option<BoundExpr>,
pub storage: Arc<S>,
}
impl<S: Storage> TableScanExecutor<S> {
fn build_empty_chunk(&self, table: &impl Table) -> Result<DataChunk, ExecutorError> {
let columns = table.columns()?;
let mut col_idx = self
.plan
.logical()
.column_ids()
.iter()
.map(|x| StorageColumnRef::Idx(*x))
.collect_vec();
if self.plan.logical().with_row_handler() {
col_idx.push(StorageColumnRef::RowHandler);
}
let mut builders = self
.plan
.logical()
.column_ids()
.iter()
.map(|&id| columns.iter().find(|col| col.id() == id).unwrap())
.map(|col| ArrayBuilderImpl::new(&col.datatype()))
.collect::<Vec<ArrayBuilderImpl>>();
if self.plan.logical().with_row_handler() {
builders.push(ArrayBuilderImpl::Int64(I64ArrayBuilder::new()));
}
let chunk = builders.into_iter().collect();
Ok(chunk)
}
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute_inner(self) {
let table = self.storage.get_table(self.plan.logical().table_ref_id())?;
let empty_chunk = self.build_empty_chunk(&table)?;
let mut col_idx = self
.plan
.logical()
.column_ids()
.iter()
.map(|x| StorageColumnRef::Idx(*x))
.collect_vec();
if self.plan.logical().with_row_handler() {
col_idx.push(StorageColumnRef::RowHandler);
}
let txn = table.read().await?;
let mut it = txn
.scan(
&[],
&[],
&col_idx,
self.plan.logical().is_sorted(),
false,
self.expr,
)
.await?;
let mut have_chunk = false;
while let Some(x) = it.next_batch(None).await? {
yield x;
have_chunk = true;
}
if !have_chunk {
yield empty_chunk;
}
}
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let handler = tokio::spawn(async move {
let mut stream = self.execute_inner();
while let Some(result) = stream.next().await {
if tx.send(result).await.is_err() {
return;
}
}
});
while let Some(item) = rx.recv().await {
yield item?;
}
handler.await.expect("failed to join scan thread");
}
}