Skip to main content

graphos_core/execution/operators/
scan.rs

1//! Scan operator for reading data from storage.
2
3use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::lpg::LpgStore;
6use graphos_common::types::{LogicalType, NodeId};
7use std::sync::Arc;
8
9/// A scan operator that reads nodes from storage.
10pub struct ScanOperator {
11    /// The store to scan from.
12    store: Arc<LpgStore>,
13    /// Label filter (None = all nodes).
14    label: Option<String>,
15    /// Current position in the scan.
16    position: usize,
17    /// Batch of node IDs to scan.
18    batch: Vec<NodeId>,
19    /// Whether the scan is exhausted.
20    exhausted: bool,
21    /// Chunk capacity.
22    chunk_capacity: usize,
23}
24
25impl ScanOperator {
26    /// Creates a new scan operator for all nodes.
27    pub fn new(store: Arc<LpgStore>) -> Self {
28        Self {
29            store,
30            label: None,
31            position: 0,
32            batch: Vec::new(),
33            exhausted: false,
34            chunk_capacity: 2048,
35        }
36    }
37
38    /// Creates a new scan operator for nodes with a specific label.
39    pub fn with_label(store: Arc<LpgStore>, label: impl Into<String>) -> Self {
40        Self {
41            store,
42            label: Some(label.into()),
43            position: 0,
44            batch: Vec::new(),
45            exhausted: false,
46            chunk_capacity: 2048,
47        }
48    }
49
50    /// Sets the chunk capacity.
51    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
52        self.chunk_capacity = capacity;
53        self
54    }
55
56    fn load_batch(&mut self) {
57        if !self.batch.is_empty() || self.exhausted {
58            return;
59        }
60
61        self.batch = match &self.label {
62            Some(label) => self.store.nodes_by_label(label),
63            None => {
64                // For full scan, we'd need to iterate all nodes
65                // This is a simplified implementation
66                Vec::new()
67            }
68        };
69
70        if self.batch.is_empty() {
71            self.exhausted = true;
72        }
73    }
74}
75
76impl Operator for ScanOperator {
77    fn next(&mut self) -> OperatorResult {
78        self.load_batch();
79
80        if self.exhausted || self.position >= self.batch.len() {
81            return Ok(None);
82        }
83
84        // Create output chunk with node IDs
85        let schema = [LogicalType::Node];
86        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
87
88        let end = (self.position + self.chunk_capacity).min(self.batch.len());
89        let count = end - self.position;
90
91        {
92            let col = chunk.column_mut(0).unwrap();
93            for i in self.position..end {
94                col.push_node_id(self.batch[i]);
95            }
96        }
97
98        chunk.set_count(count);
99        self.position = end;
100
101        Ok(Some(chunk))
102    }
103
104    fn reset(&mut self) {
105        self.position = 0;
106        self.batch.clear();
107        self.exhausted = false;
108    }
109
110    fn name(&self) -> &'static str {
111        "Scan"
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    #[test]
120    fn test_scan_by_label() {
121        let store = Arc::new(LpgStore::new());
122
123        store.create_node(&["Person"]);
124        store.create_node(&["Person"]);
125        store.create_node(&["Animal"]);
126
127        let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person");
128
129        let chunk = scan.next().unwrap().unwrap();
130        assert_eq!(chunk.row_count(), 2);
131
132        // Should be exhausted
133        let next = scan.next().unwrap();
134        assert!(next.is_none());
135    }
136
137    #[test]
138    fn test_scan_reset() {
139        let store = Arc::new(LpgStore::new());
140        store.create_node(&["Person"]);
141
142        let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person");
143
144        // First scan
145        let chunk1 = scan.next().unwrap().unwrap();
146        assert_eq!(chunk1.row_count(), 1);
147
148        // Reset
149        scan.reset();
150
151        // Second scan should work
152        let chunk2 = scan.next().unwrap().unwrap();
153        assert_eq!(chunk2.row_count(), 1);
154    }
155}