1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB
use std::sync::Arc;
use reifydb_core::{
encoded::{key::EncodedKey, shape::RowShape},
interface::catalog::{id::IndexId, table::Table},
value::column::{columns::Columns, headers::ColumnHeaders},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{fragment::Fragment, value::r#type::Type};
use crate::{
Result,
vm::volcano::query::{QueryContext, QueryNode},
};
pub(crate) struct IndexScanNode {
_table: Table, // FIXME needs to work with different sources
_index_id: IndexId,
context: Option<Arc<QueryContext>>,
headers: ColumnHeaders,
_storage_types: Vec<Type>,
_shape: Option<RowShape>,
_last_key: Option<EncodedKey>,
_exhausted: bool,
}
impl IndexScanNode {
pub fn new(table: Table, index_id: IndexId, context: Arc<QueryContext>) -> Result<Self> {
let storage_types = table.columns.iter().map(|c| c.constraint.get_type()).collect::<Vec<_>>();
let headers = ColumnHeaders {
columns: table.columns.iter().map(|col| Fragment::internal(&col.name)).collect(),
};
Ok(Self {
_table: table,
_index_id: index_id,
context: Some(context),
headers,
_storage_types: storage_types,
_shape: None,
_last_key: None,
_exhausted: false,
})
}
}
impl QueryNode for IndexScanNode {
fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
// Already has context from constructor
Ok(())
}
fn next<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
debug_assert!(self.context.is_some(), "IndexScanNode::next() called before initialize()");
unimplemented!()
// let ctx = self.context.as_ref().unwrap();
//
// if self.exhausted {
// return Ok(None);
// }
//
// let batch_size = ctx.batch_size;
//
// // Create range for scanning index entries
// let shape_id: ShapeId = self.table.id.into();
// let base_range = IndexEntryKey::index_range(shape_id, self.index_id);
//
// let range = if let Some(ref last_key) = self.last_key {
// let end = match base_range.end {
// Included(key) => Included(key),
// Excluded(key) => Excluded(key),
// Unbounded => unreachable!("Index range should have bounds"),
// };
// EncodedKeyRange::new(Excluded(last_key.clone()), end)
// } else {
// base_range
// };
//
// let mut batch_rows = Vec::new();
// let mut row_numbers = Vec::new();
// let mut rows_collected = 0;
// let mut new_last_key = None;
//
// // Scan index entries
// let index_entries: Vec<_> = rx.range(range)?.into_iter().collect();
//
// for entry in index_entries.into_iter() {
// let row_number_shape = RowShape::new(&[Uint8]);
//
// let row_number = row_number_layout.get_u64(&entry.encoded, 0);
//
// let shape: ShapeId = self.table.id.into();
// let row_key = RowKey {
// source,
// encoded: RowNumber(row_number),
// };
//
// let row_key_encoded = row_key.encode();
//
// if let Some(row_data) = rx.get(&row_key_encoded)? {
// batch_rows.push(row_data.encoded);
// row_numbers.push(RowNumber(row_number));
// new_last_key = Some(entry.key);
// rows_collected += 1;
//
// if rows_collected >= batch_size {
// break;
// }
// }
// }
//
// if batch_rows.is_empty() {
// self.exhausted = true;
// return Ok(None);
// }
//
// self.last_key = new_last_key;
//
// let mut columns = Columns::from_table(&self.table);
// columns.append_rows(&self.row_layout, batch_rows.into_iter())?;
//
// // Add the RowNumber column to the columns if requested
// if ctx.preserve_row_numbers {
// // TODO: Update IndexScanNode to use ResolvedTable instead of Table
// let row_number_column = Column::( {
// source: Fragment::internal(&self.table.name),
// name: Fragment::internal(ROW_NUMBER_COLUMN_NAME),
// data: ColumnData::row_number(row_numbers),
// });
// columns.0.push(row_number_column);
// }
//
// Ok(Some(Batch {
// columns,
// }))
}
fn headers(&self) -> Option<ColumnHeaders> {
Some(self.headers.clone())
}
}