reifydb-engine 0.4.12

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::sync::Arc;

use reifydb_core::{
	encoded::{key::EncodedKey, shape::RowShape},
	interface::catalog::{id::IndexId, table::Table},
	value::column::{columns::Columns, headers::ColumnHeaders},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{fragment::Fragment, value::r#type::Type};

use crate::{
	Result,
	vm::volcano::query::{QueryContext, QueryNode},
};

pub(crate) struct IndexScanNode {
	_table: Table, // FIXME needs to work with different sources
	_index_id: IndexId,
	context: Option<Arc<QueryContext>>,
	headers: ColumnHeaders,
	_storage_types: Vec<Type>,
	_shape: Option<RowShape>,
	_last_key: Option<EncodedKey>,
	_exhausted: bool,
}

impl IndexScanNode {
	pub fn new(table: Table, index_id: IndexId, context: Arc<QueryContext>) -> Result<Self> {
		let storage_types = table.columns.iter().map(|c| c.constraint.get_type()).collect::<Vec<_>>();

		let headers = ColumnHeaders {
			columns: table.columns.iter().map(|col| Fragment::internal(&col.name)).collect(),
		};

		Ok(Self {
			_table: table,
			_index_id: index_id,
			context: Some(context),
			headers,
			_storage_types: storage_types,
			_shape: None,
			_last_key: None,
			_exhausted: false,
		})
	}
}

impl QueryNode for IndexScanNode {
	fn initialize<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &QueryContext) -> Result<()> {
		// Already has context from constructor
		Ok(())
	}

	fn next<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
		debug_assert!(self.context.is_some(), "IndexScanNode::next() called before initialize()");
		unimplemented!()
		// let ctx = self.context.as_ref().unwrap();
		//
		// if self.exhausted {
		// 	return Ok(None);
		// }
		//
		// let batch_size = ctx.batch_size;
		//
		// // Create range for scanning index entries
		// let shape_id: ShapeId = self.table.id.into();
		// let base_range = IndexEntryKey::index_range(shape_id, self.index_id);
		//
		// let range = if let Some(ref last_key) = self.last_key {
		// 	let end = match base_range.end {
		// 		Included(key) => Included(key),
		// 		Excluded(key) => Excluded(key),
		// 		Unbounded => unreachable!("Index range should have bounds"),
		// 	};
		// 	EncodedKeyRange::new(Excluded(last_key.clone()), end)
		// } else {
		// 	base_range
		// };
		//
		// let mut batch_rows = Vec::new();
		// let mut row_numbers = Vec::new();
		// let mut rows_collected = 0;
		// let mut new_last_key = None;
		//
		// // Scan index entries
		// let index_entries: Vec<_> = rx.range(range)?.into_iter().collect();
		//
		// for entry in index_entries.into_iter() {
		// 	let row_number_shape = RowShape::new(&[Uint8]);
		//
		// 	let row_number = row_number_layout.get_u64(&entry.encoded, 0);
		//
		// 	let shape: ShapeId = self.table.id.into();
		// 	let row_key = RowKey {
		// 		source,
		// 		encoded: RowNumber(row_number),
		// 	};
		//
		// 	let row_key_encoded = row_key.encode();
		//
		// 	if let Some(row_data) = rx.get(&row_key_encoded)? {
		// 		batch_rows.push(row_data.encoded);
		// 		row_numbers.push(RowNumber(row_number));
		// 		new_last_key = Some(entry.key);
		// 		rows_collected += 1;
		//
		// 		if rows_collected >= batch_size {
		// 			break;
		// 		}
		// 	}
		// }
		//
		// if batch_rows.is_empty() {
		// 	self.exhausted = true;
		// 	return Ok(None);
		// }
		//
		// self.last_key = new_last_key;
		//
		// let mut columns = Columns::from_table(&self.table);
		// columns.append_rows(&self.row_layout, batch_rows.into_iter())?;
		//
		// // Add the RowNumber column to the columns if requested
		// if ctx.preserve_row_numbers {
		// 	// TODO: Update IndexScanNode to use ResolvedTable instead of Table
		// 	let row_number_column = Column::( {
		// 		source: Fragment::internal(&self.table.name),
		// 		name: Fragment::internal(ROW_NUMBER_COLUMN_NAME),
		// 		data: ColumnData::row_number(row_numbers),
		// 	});
		// 	columns.0.push(row_number_column);
		// }
		//
		// Ok(Some(Batch {
		// 	columns,
		// }))
	}

	fn headers(&self) -> Option<ColumnHeaders> {
		Some(self.headers.clone())
	}
}