Skip to main content

sochdb_query/executor/
scan.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2
3//! Table scan and index seek operators.
4
5use crate::optimizer_integration::StorageBackend;
6use crate::soch_ql::SochValue;
7use super::types::{Row, Schema, ColumnMeta};
8use super::node::PlanNode;
9use sochdb_core::Result;
10use std::sync::Arc;
11
12// ============================================================================
13// SeqScanNode — Full table scan
14// ============================================================================
15
16/// Full sequential table scan operator.
17///
18/// Materializes all rows from `StorageBackend::table_scan()` on first call,
19/// then returns one row at a time via `next()`.
20pub struct SeqScanNode {
21    schema: Schema,
22    storage: Arc<dyn StorageBackend>,
23    table: String,
24    columns: Vec<String>,
25    /// Materialized row buffer (lazily populated).
26    buffer: Option<Vec<Row>>,
27    /// Current position in buffer.
28    pos: usize,
29}
30
31impl SeqScanNode {
32    pub fn new(
33        storage: Arc<dyn StorageBackend>,
34        table: String,
35        columns: Vec<String>,
36        table_alias: Option<&str>,
37    ) -> Self {
38        let tbl = table_alias.unwrap_or(&table);
39        let schema = Schema::new(
40            columns
41                .iter()
42                .map(|c| {
43                    if c == "*" {
44                        ColumnMeta::new(c.clone())
45                    } else {
46                        ColumnMeta::qualified(tbl.to_string(), c.clone())
47                    }
48                })
49                .collect(),
50        );
51        Self {
52            schema,
53            storage,
54            table,
55            columns,
56            buffer: None,
57            pos: 0,
58        }
59    }
60
61    fn materialize(&mut self) -> Result<()> {
62        if self.buffer.is_some() {
63            return Ok(());
64        }
65
66        let raw_rows = self.storage.table_scan(&self.table, &self.columns, None)?;
67
68        // If columns is ["*"], discover actual columns from first row
69        if self.columns.len() == 1 && self.columns[0] == "*" {
70            if let Some(first) = raw_rows.first() {
71                let mut col_names: Vec<String> = first.keys().cloned().collect();
72                col_names.sort(); // Deterministic column order
73                let tbl = self.schema.columns.first()
74                    .and_then(|c| c.table.clone())
75                    .unwrap_or_else(|| self.table.clone());
76                self.schema = Schema::new(
77                    col_names.iter().map(|c| ColumnMeta::qualified(tbl.clone(), c.clone())).collect(),
78                );
79                self.columns = col_names;
80            }
81        }
82
83        let rows = raw_rows
84            .into_iter()
85            .map(|row_map| self.row_from_map(row_map))
86            .collect();
87
88        self.buffer = Some(rows);
89        Ok(())
90    }
91
92    fn row_from_map(&self, map: std::collections::HashMap<String, SochValue>) -> Row {
93        self.columns
94            .iter()
95            .map(|col| map.get(col).cloned().unwrap_or(SochValue::Null))
96            .collect()
97    }
98}
99
100impl PlanNode for SeqScanNode {
101    fn schema(&self) -> &Schema {
102        &self.schema
103    }
104
105    fn next(&mut self) -> Result<Option<Row>> {
106        self.materialize()?;
107
108        if let Some(buf) = &self.buffer {
109            if self.pos < buf.len() {
110                let row = buf[self.pos].clone();
111                self.pos += 1;
112                Ok(Some(row))
113            } else {
114                Ok(None)
115            }
116        } else {
117            Ok(None)
118        }
119    }
120
121    fn reset(&mut self) -> Result<()> {
122        self.pos = 0;
123        Ok(())
124    }
125}
126
127// ============================================================================
128// IndexSeekNode — Index-based lookup
129// ============================================================================
130
131/// Index-based seek operator.
132///
133/// Uses `StorageBackend::secondary_index_seek()` to retrieve rows matching
134/// a specific key, then iterates over results.
135pub struct IndexSeekNode {
136    schema: Schema,
137    storage: Arc<dyn StorageBackend>,
138    table: String,
139    index: String,
140    key: SochValue,
141    columns: Vec<String>,
142    buffer: Option<Vec<Row>>,
143    pos: usize,
144}
145
146impl IndexSeekNode {
147    pub fn new(
148        storage: Arc<dyn StorageBackend>,
149        table: String,
150        index: String,
151        key: SochValue,
152        columns: Vec<String>,
153    ) -> Self {
154        let schema = Schema::new(
155            columns.iter().map(|c| ColumnMeta::qualified(table.clone(), c.clone())).collect(),
156        );
157        Self {
158            schema,
159            storage,
160            table,
161            index,
162            key,
163            columns,
164            buffer: None,
165            pos: 0,
166        }
167    }
168
169    fn materialize(&mut self) -> Result<()> {
170        if self.buffer.is_some() {
171            return Ok(());
172        }
173
174        let raw_rows = self.storage.secondary_index_seek(&self.table, &self.index, &self.key)?;
175        let rows = raw_rows
176            .into_iter()
177            .map(|row_map| {
178                self.columns
179                    .iter()
180                    .map(|col| row_map.get(col).cloned().unwrap_or(SochValue::Null))
181                    .collect()
182            })
183            .collect();
184
185        self.buffer = Some(rows);
186        Ok(())
187    }
188}
189
190impl PlanNode for IndexSeekNode {
191    fn schema(&self) -> &Schema {
192        &self.schema
193    }
194
195    fn next(&mut self) -> Result<Option<Row>> {
196        self.materialize()?;
197
198        if let Some(buf) = &self.buffer {
199            if self.pos < buf.len() {
200                let row = buf[self.pos].clone();
201                self.pos += 1;
202                Ok(Some(row))
203            } else {
204                Ok(None)
205            }
206        } else {
207            Ok(None)
208        }
209    }
210
211    fn reset(&mut self) -> Result<()> {
212        self.pos = 0;
213        Ok(())
214    }
215}
216
217// ============================================================================
218// ValuesNode — Inline VALUES rows
219// ============================================================================
220
221/// Returns pre-computed rows (for INSERT ... VALUES or subquery materialization).
222pub struct ValuesNode {
223    schema: Schema,
224    rows: Vec<Row>,
225    pos: usize,
226}
227
228impl ValuesNode {
229    pub fn new(schema: Schema, rows: Vec<Row>) -> Self {
230        Self { schema, rows, pos: 0 }
231    }
232}
233
234impl PlanNode for ValuesNode {
235    fn schema(&self) -> &Schema {
236        &self.schema
237    }
238
239    fn next(&mut self) -> Result<Option<Row>> {
240        if self.pos < self.rows.len() {
241            let row = self.rows[self.pos].clone();
242            self.pos += 1;
243            Ok(Some(row))
244        } else {
245            Ok(None)
246        }
247    }
248
249    fn reset(&mut self) -> Result<()> {
250        self.pos = 0;
251        Ok(())
252    }
253}
254
255/// Empty node that produces no rows.
256pub struct EmptyNode {
257    schema: Schema,
258}
259
260impl EmptyNode {
261    pub fn new(schema: Schema) -> Self {
262        Self { schema }
263    }
264}
265
266impl PlanNode for EmptyNode {
267    fn schema(&self) -> &Schema {
268        &self.schema
269    }
270
271    fn next(&mut self) -> Result<Option<Row>> {
272        Ok(None)
273    }
274}