sochdb_query/executor/
scan.rs1use crate::optimizer_integration::StorageBackend;
6use crate::soch_ql::SochValue;
7use super::types::{Row, Schema, ColumnMeta};
8use super::node::PlanNode;
9use sochdb_core::Result;
10use std::sync::Arc;
11
12pub struct SeqScanNode {
21 schema: Schema,
22 storage: Arc<dyn StorageBackend>,
23 table: String,
24 columns: Vec<String>,
25 buffer: Option<Vec<Row>>,
27 pos: usize,
29}
30
31impl SeqScanNode {
32 pub fn new(
33 storage: Arc<dyn StorageBackend>,
34 table: String,
35 columns: Vec<String>,
36 table_alias: Option<&str>,
37 ) -> Self {
38 let tbl = table_alias.unwrap_or(&table);
39 let schema = Schema::new(
40 columns
41 .iter()
42 .map(|c| {
43 if c == "*" {
44 ColumnMeta::new(c.clone())
45 } else {
46 ColumnMeta::qualified(tbl.to_string(), c.clone())
47 }
48 })
49 .collect(),
50 );
51 Self {
52 schema,
53 storage,
54 table,
55 columns,
56 buffer: None,
57 pos: 0,
58 }
59 }
60
61 fn materialize(&mut self) -> Result<()> {
62 if self.buffer.is_some() {
63 return Ok(());
64 }
65
66 let raw_rows = self.storage.table_scan(&self.table, &self.columns, None)?;
67
68 if self.columns.len() == 1 && self.columns[0] == "*" {
70 if let Some(first) = raw_rows.first() {
71 let mut col_names: Vec<String> = first.keys().cloned().collect();
72 col_names.sort(); let tbl = self.schema.columns.first()
74 .and_then(|c| c.table.clone())
75 .unwrap_or_else(|| self.table.clone());
76 self.schema = Schema::new(
77 col_names.iter().map(|c| ColumnMeta::qualified(tbl.clone(), c.clone())).collect(),
78 );
79 self.columns = col_names;
80 }
81 }
82
83 let rows = raw_rows
84 .into_iter()
85 .map(|row_map| self.row_from_map(row_map))
86 .collect();
87
88 self.buffer = Some(rows);
89 Ok(())
90 }
91
92 fn row_from_map(&self, map: std::collections::HashMap<String, SochValue>) -> Row {
93 self.columns
94 .iter()
95 .map(|col| map.get(col).cloned().unwrap_or(SochValue::Null))
96 .collect()
97 }
98}
99
100impl PlanNode for SeqScanNode {
101 fn schema(&self) -> &Schema {
102 &self.schema
103 }
104
105 fn next(&mut self) -> Result<Option<Row>> {
106 self.materialize()?;
107
108 if let Some(buf) = &self.buffer {
109 if self.pos < buf.len() {
110 let row = buf[self.pos].clone();
111 self.pos += 1;
112 Ok(Some(row))
113 } else {
114 Ok(None)
115 }
116 } else {
117 Ok(None)
118 }
119 }
120
121 fn reset(&mut self) -> Result<()> {
122 self.pos = 0;
123 Ok(())
124 }
125}
126
127pub struct IndexSeekNode {
136 schema: Schema,
137 storage: Arc<dyn StorageBackend>,
138 table: String,
139 index: String,
140 key: SochValue,
141 columns: Vec<String>,
142 buffer: Option<Vec<Row>>,
143 pos: usize,
144}
145
146impl IndexSeekNode {
147 pub fn new(
148 storage: Arc<dyn StorageBackend>,
149 table: String,
150 index: String,
151 key: SochValue,
152 columns: Vec<String>,
153 ) -> Self {
154 let schema = Schema::new(
155 columns.iter().map(|c| ColumnMeta::qualified(table.clone(), c.clone())).collect(),
156 );
157 Self {
158 schema,
159 storage,
160 table,
161 index,
162 key,
163 columns,
164 buffer: None,
165 pos: 0,
166 }
167 }
168
169 fn materialize(&mut self) -> Result<()> {
170 if self.buffer.is_some() {
171 return Ok(());
172 }
173
174 let raw_rows = self.storage.secondary_index_seek(&self.table, &self.index, &self.key)?;
175 let rows = raw_rows
176 .into_iter()
177 .map(|row_map| {
178 self.columns
179 .iter()
180 .map(|col| row_map.get(col).cloned().unwrap_or(SochValue::Null))
181 .collect()
182 })
183 .collect();
184
185 self.buffer = Some(rows);
186 Ok(())
187 }
188}
189
190impl PlanNode for IndexSeekNode {
191 fn schema(&self) -> &Schema {
192 &self.schema
193 }
194
195 fn next(&mut self) -> Result<Option<Row>> {
196 self.materialize()?;
197
198 if let Some(buf) = &self.buffer {
199 if self.pos < buf.len() {
200 let row = buf[self.pos].clone();
201 self.pos += 1;
202 Ok(Some(row))
203 } else {
204 Ok(None)
205 }
206 } else {
207 Ok(None)
208 }
209 }
210
211 fn reset(&mut self) -> Result<()> {
212 self.pos = 0;
213 Ok(())
214 }
215}
216
217pub struct ValuesNode {
223 schema: Schema,
224 rows: Vec<Row>,
225 pos: usize,
226}
227
228impl ValuesNode {
229 pub fn new(schema: Schema, rows: Vec<Row>) -> Self {
230 Self { schema, rows, pos: 0 }
231 }
232}
233
234impl PlanNode for ValuesNode {
235 fn schema(&self) -> &Schema {
236 &self.schema
237 }
238
239 fn next(&mut self) -> Result<Option<Row>> {
240 if self.pos < self.rows.len() {
241 let row = self.rows[self.pos].clone();
242 self.pos += 1;
243 Ok(Some(row))
244 } else {
245 Ok(None)
246 }
247 }
248
249 fn reset(&mut self) -> Result<()> {
250 self.pos = 0;
251 Ok(())
252 }
253}
254
255pub struct EmptyNode {
257 schema: Schema,
258}
259
260impl EmptyNode {
261 pub fn new(schema: Schema) -> Self {
262 Self { schema }
263 }
264}
265
266impl PlanNode for EmptyNode {
267 fn schema(&self) -> &Schema {
268 &self.schema
269 }
270
271 fn next(&mut self) -> Result<Option<Row>> {
272 Ok(None)
273 }
274}