use super::{Operator, OperatorResult};
use crate::execution::DataChunk;
use crate::graph::GraphStore;
use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId};
use std::sync::Arc;
pub struct ScanOperator {
store: Arc<dyn GraphStore>,
label: Option<String>,
position: usize,
batch: Vec<NodeId>,
exhausted: bool,
chunk_capacity: usize,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
}
impl ScanOperator {
pub fn new(store: Arc<dyn GraphStore>) -> Self {
Self {
store,
label: None,
position: 0,
batch: Vec::new(),
exhausted: false,
chunk_capacity: 2048,
transaction_id: None,
viewing_epoch: None,
}
}
pub fn with_label(store: Arc<dyn GraphStore>, label: impl Into<String>) -> Self {
Self {
store,
label: Some(label.into()),
position: 0,
batch: Vec::new(),
exhausted: false,
chunk_capacity: 2048,
transaction_id: None,
viewing_epoch: None,
}
}
pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
self.chunk_capacity = capacity;
self
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
fn load_batch(&mut self) {
if !self.batch.is_empty() || self.exhausted {
return;
}
let all_ids = match &self.label {
Some(label) => self.store.nodes_by_label(label),
None if self.viewing_epoch.is_some() => self.store.all_node_ids(),
None => self.store.node_ids(),
};
self.batch = if let Some(epoch) = self.viewing_epoch {
if let Some(tx) = self.transaction_id {
self.store
.filter_visible_node_ids_versioned(&all_ids, epoch, tx)
} else {
self.store.filter_visible_node_ids(&all_ids, epoch)
}
} else {
all_ids
};
if self.batch.is_empty() {
self.exhausted = true;
}
}
}
impl Operator for ScanOperator {
fn next(&mut self) -> OperatorResult {
self.load_batch();
if self.exhausted || self.position >= self.batch.len() {
return Ok(None);
}
let schema = [LogicalType::Node];
let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
let end = (self.position + self.chunk_capacity).min(self.batch.len());
let count = end - self.position;
{
let col = chunk
.column_mut(0)
.expect("column 0 exists: chunk created with single-column schema");
for i in self.position..end {
col.push_node_id(self.batch[i]);
}
}
chunk.set_count(count);
self.position = end;
Ok(Some(chunk))
}
fn reset(&mut self) {
self.position = 0;
self.batch.clear();
self.exhausted = false;
}
fn name(&self) -> &'static str {
"Scan"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::GraphStoreMut;
use crate::graph::lpg::LpgStore;
#[test]
fn test_scan_by_label() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
store.create_node(&["Person"]);
store.create_node(&["Animal"]);
let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
let chunk = scan.next().unwrap().unwrap();
assert_eq!(chunk.row_count(), 2);
let next = scan.next().unwrap();
assert!(next.is_none());
}
#[test]
fn test_scan_reset() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
let chunk1 = scan.next().unwrap().unwrap();
assert_eq!(chunk1.row_count(), 1);
scan.reset();
let chunk2 = scan.next().unwrap().unwrap();
assert_eq!(chunk2.row_count(), 1);
}
#[test]
fn test_full_scan() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
store.create_node(&["Person"]);
store.create_node(&["Animal"]);
store.create_node(&["Place"]);
let mut scan = ScanOperator::new(store.clone() as Arc<dyn GraphStore>);
let chunk = scan.next().unwrap().unwrap();
assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
let next = scan.next().unwrap();
assert!(next.is_none());
}
#[test]
fn test_scan_with_mvcc_context() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
let epoch1 = EpochId::new(1);
store.create_node_versioned(&["Person"], epoch1, TransactionId::SYSTEM);
store.create_node_versioned(&["Person"], epoch1, TransactionId::SYSTEM);
let epoch5 = EpochId::new(5);
store.create_node_versioned(&["Person"], epoch5, TransactionId::SYSTEM);
let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
.with_transaction_context(EpochId::new(3), None);
let chunk = scan.next().unwrap().unwrap();
assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
let mut scan_all = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
.with_transaction_context(EpochId::new(5), None);
let chunk_all = scan_all.next().unwrap().unwrap();
assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
}
}