Skip to main content

reifydb_column/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::value::column::{
7	ColumnWithName, array::Column, buffer::ColumnBuffer, columns::Columns, mask::RowMask,
8};
9use reifydb_type::{
10	Result,
11	fragment::Fragment,
12	value::{datetime::DateTime, row_number::RowNumber},
13};
14
15use crate::{
16	compute,
17	predicate::{self, Predicate},
18	selection::Selection,
19	snapshot::{ColumnBlock, ColumnChunks, Schema, Snapshot},
20};
21
22pub struct SnapshotReader {
23	snapshot: Arc<Snapshot>,
24	batch_size: usize,
25	offset: usize,
26	row_count: usize,
27	predicate: Option<Predicate>,
28}
29
30impl SnapshotReader {
31	pub fn new(snapshot: Arc<Snapshot>, batch_size: usize) -> Self {
32		let row_count = snapshot.block.columns.first().map(|c| c.len()).unwrap_or(0);
33		Self {
34			snapshot,
35			batch_size,
36			offset: 0,
37			row_count,
38			predicate: None,
39		}
40	}
41
42	// Attach a predicate evaluated per-batch with full pushdown: the predicate
43	// runs against a sliced view of each batch's chunks (encoding-respecting),
44	// then `compute::filter` runs per chunk on the compressed/encoded slices,
45	// and only surviving rows get canonicalized into the output `ColumnBuffer`.
46	pub fn with_predicate(mut self, predicate: Predicate) -> Self {
47		self.predicate = Some(predicate);
48		self
49	}
50
51	pub fn row_count(&self) -> usize {
52		self.row_count
53	}
54
55	fn read_next_batch(&mut self) -> Result<Option<Columns>> {
56		let start = self.offset;
57		let end = (start + self.batch_size).min(self.row_count);
58		self.offset = end;
59
60		let block = &self.snapshot.block;
61		let schema = &block.schema;
62
63		let Some(predicate) = self.predicate.as_ref() else {
64			return Ok(Some(materialize_full(block, start, end)?));
65		};
66
67		let view = block.view_range(start, end)?;
68		let selection = predicate::evaluate(&view, predicate)?;
69		match selection {
70			Selection::None_ => Ok(None),
71			Selection::All => Ok(Some(materialize_view_full(schema, &view, start, end)?)),
72			Selection::Mask(mask) => Ok(Some(materialize_filtered(schema, &view, start, &mask)?)),
73		}
74	}
75}
76
77// Materialize a contiguous batch range without filtering (no-predicate path).
78fn materialize_full(block: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
79	let len = end - start;
80	let mut columns: Vec<ColumnWithName> = Vec::with_capacity(block.schema.len());
81	for (i, (name, _ty, _nullable)) in block.schema.iter().enumerate() {
82		let data = read_range(&block.columns[i], start, end)?;
83		columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
84	}
85	let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
86	let ts = DateTime::default();
87	Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
88}
89
90// Materialize a precomputed batch view when the predicate selected every row
91// (`Selection::All`). Skips the redundant re-slice that `materialize_full`
92// would do on the source block.
93fn materialize_view_full(schema: &Schema, view: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
94	let len = end - start;
95	let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
96	for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
97		let data = concat_view_chunks(&view.columns[i])?;
98		columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
99	}
100	let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
101	let ts = DateTime::default();
102	Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
103}
104
105// Pushdown materialization: for each user column, walk the view's already-sliced
106// chunks and apply `compute::filter` per chunk (encoding-specialized - compressed
107// encodings can filter without canonicalizing the chunk first); only the survivors
108// get canonicalized into the output `ColumnBuffer`. System columns are filtered
109// in tandem using the same mask so all column lengths agree.
110fn materialize_filtered(schema: &Schema, view: &ColumnBlock, batch_start: usize, mask: &RowMask) -> Result<Columns> {
111	let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
112	for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
113		let data = filter_view_column(&view.columns[i], mask)?;
114		columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
115	}
116
117	let kept = mask.popcount();
118	let mut row_numbers: Vec<RowNumber> = Vec::with_capacity(kept);
119	for i in 0..mask.len() {
120		if mask.get(i) {
121			row_numbers.push(RowNumber((batch_start + i) as u64));
122		}
123	}
124	let ts = DateTime::default();
125	Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; kept], vec![ts; kept]))
126}
127
128// Filter one view-column chunk-by-chunk using the batch-wide mask. Each chunk
129// gets the corresponding slice of the mask; `compute::filter` runs on the
130// encoded chunk (not the full canonical block), and the surviving canonicalized
131// rows extend the output buffer. The caller (Selection::Mask path) guarantees
132// at least one bit is set across the whole mask, so `out` is always populated.
133fn filter_view_column(view_chunks: &ColumnChunks, mask: &RowMask) -> Result<ColumnBuffer> {
134	let mut chunk_offset = 0usize;
135	let mut out: Option<ColumnBuffer> = None;
136	for chunk in &view_chunks.chunks {
137		let chunk_len = chunk.len();
138		let chunk_mask = mask.slice(chunk_offset, chunk_offset + chunk_len);
139		chunk_offset += chunk_len;
140		if chunk_mask.popcount() == 0 {
141			continue;
142		}
143		let filtered: Column = compute::filter(chunk, &chunk_mask)?;
144		let buf = filtered.to_canonical()?.to_column_buffer()?;
145		match &mut out {
146			None => out = Some(buf),
147			Some(o) => o.extend(buf)?,
148		}
149	}
150	Ok(out.expect("Selection::Mask guarantees at least one row survives"))
151}
152
153// Concatenate all chunks of a view-column into a single ColumnBuffer (no
154// filtering). Used by the `Selection::All` path.
155fn concat_view_chunks(view_chunks: &ColumnChunks) -> Result<ColumnBuffer> {
156	let mut iter = view_chunks.chunks.iter();
157	let first =
158		iter.next().expect("concat_view_chunks called with empty chunks").to_canonical()?.to_column_buffer()?;
159	let mut out = first;
160	for chunk in iter {
161		out.extend(chunk.to_canonical()?.to_column_buffer()?)?;
162	}
163	Ok(out)
164}
165
166// Materialize the [start, end) range across however many chunks contribute.
167// Per chunk: `Column::slice` honors encoding-specific slice paths (compressed
168// encodings can produce a compressed slice without canonicalizing the whole
169// chunk); we only canonicalize each slice at the projection boundary, where
170// `ColumnBuffer` is the required output type. `ColumnBuffer::extend` handles
171// promoting a bare buffer to `Option` when later chunks introduce nones.
172fn read_range(column_chunks: &ColumnChunks, start: usize, end: usize) -> Result<ColumnBuffer> {
173	let ranges = column_chunks.iter_range_chunks(start, end);
174	let mut iter = ranges.into_iter();
175	let (first_idx, first_s, first_e) = iter.next().expect("read_range called with empty range");
176	let first = column_chunks.chunks[first_idx].slice(first_s, first_e)?.to_canonical()?.to_column_buffer()?;
177	let mut out = first;
178	for (idx, s, e) in iter {
179		let buf = column_chunks.chunks[idx].slice(s, e)?.to_canonical()?.to_column_buffer()?;
180		out.extend(buf)?;
181	}
182	Ok(out)
183}
184
185impl Iterator for SnapshotReader {
186	type Item = Result<Columns>;
187
188	fn next(&mut self) -> Option<Self::Item> {
189		loop {
190			if self.offset >= self.row_count {
191				return None;
192			}
193			match self.read_next_batch() {
194				Ok(Some(c)) => return Some(Ok(c)),
195				Ok(None) => continue,
196				Err(e) => return Some(Err(e)),
197			}
198		}
199	}
200}
201
202#[cfg(test)]
203mod tests {
204	use reifydb_core::{
205		common::CommitVersion,
206		interface::catalog::id::TableId,
207		value::column::array::{Column, canonical::Canonical},
208	};
209	use reifydb_runtime::context::clock::Clock;
210	use reifydb_type::value::r#type::Type;
211
212	use super::*;
213	use crate::snapshot::{ColumnBlock, ColumnChunks, SnapshotId, SnapshotSource};
214
215	fn array_from_column_data(cd: &ColumnBuffer) -> Column {
216		let ca = Canonical::from_column_buffer(cd).unwrap();
217		Column::from_canonical(ca)
218	}
219
220	fn mk_snapshot(rows: usize) -> Arc<Snapshot> {
221		let a_col = ColumnBuffer::int4((0..rows as i32).collect::<Vec<_>>());
222		let b_col = ColumnBuffer::utf8((0..rows).map(|i| format!("row-{i}")).collect::<Vec<_>>());
223
224		let chunked_a = ColumnChunks::single(Type::Int4, false, array_from_column_data(&a_col));
225		let chunked_b = ColumnChunks::single(Type::Utf8, false, array_from_column_data(&b_col));
226
227		let schema = Arc::new(vec![("a".to_string(), Type::Int4, false), ("b".to_string(), Type::Utf8, false)]);
228		let block = ColumnBlock::new(schema, vec![chunked_a, chunked_b]);
229
230		let now = Clock::Real.instant();
231		Arc::new(Snapshot {
232			id: SnapshotId::Table {
233				table_id: TableId(1),
234				commit_version: CommitVersion(1),
235			},
236			source: SnapshotSource::Table {
237				table_id: TableId(1),
238				commit_version: CommitVersion(1),
239			},
240			namespace: "test".to_string(),
241			name: "t".to_string(),
242			created_at: now,
243			block,
244		})
245	}
246
247	#[test]
248	fn reader_returns_none_for_empty_snapshot() {
249		let snap = mk_snapshot(0);
250		let mut reader = SnapshotReader::new(snap, 4);
251		assert!(reader.next().is_none());
252	}
253
254	#[test]
255	fn reader_emits_batches_matching_batch_size() {
256		let snap = mk_snapshot(5);
257		let mut reader = SnapshotReader::new(snap, 2);
258
259		let batch = reader.next().expect("first batch").unwrap();
260		assert_eq!(batch.row_count(), 2);
261		assert_eq!(batch.row_numbers[0], RowNumber(0));
262		assert_eq!(batch.row_numbers[1], RowNumber(1));
263
264		let a = batch.column("a").unwrap();
265		assert_eq!(a.data().get_value(0).to_string(), "0");
266		assert_eq!(a.data().get_value(1).to_string(), "1");
267
268		let b = batch.column("b").unwrap();
269		assert_eq!(b.data().get_value(0).to_string(), "row-0");
270
271		let batch = reader.next().expect("second batch").unwrap();
272		assert_eq!(batch.row_count(), 2);
273		assert_eq!(batch.row_numbers[0], RowNumber(2));
274
275		let batch = reader.next().expect("final partial batch").unwrap();
276		assert_eq!(batch.row_count(), 1);
277		assert_eq!(batch.row_numbers[0], RowNumber(4));
278		assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
279
280		assert!(reader.next().is_none());
281	}
282
283	fn mk_chunked_snapshot(parts: &[&[i32]]) -> Arc<Snapshot> {
284		let chunks: Vec<Column> =
285			parts.iter().map(|p| array_from_column_data(&ColumnBuffer::int4(p.to_vec()))).collect();
286		let chunked_a = ColumnChunks::new(Type::Int4, false, chunks);
287		let schema = Arc::new(vec![("a".to_string(), Type::Int4, false)]);
288		let block = ColumnBlock::new(schema, vec![chunked_a]);
289		let now = Clock::Real.instant();
290		Arc::new(Snapshot {
291			id: SnapshotId::Table {
292				table_id: TableId(1),
293				commit_version: CommitVersion(1),
294			},
295			source: SnapshotSource::Table {
296				table_id: TableId(1),
297				commit_version: CommitVersion(1),
298			},
299			namespace: "test".to_string(),
300			name: "t".to_string(),
301			created_at: now,
302			block,
303		})
304	}
305
306	#[test]
307	fn reader_handles_multi_chunk_column() {
308		let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
309		let mut reader = SnapshotReader::new(snap, 100);
310		assert_eq!(reader.row_count(), 9);
311
312		let batch = reader.next().unwrap().unwrap();
313		assert_eq!(batch.row_count(), 9);
314		let a = batch.column("a").unwrap();
315		let actual: Vec<String> = (0..9).map(|i| a.data().get_value(i).to_string()).collect();
316		assert_eq!(actual, vec!["10", "20", "30", "40", "50", "60", "70", "80", "90"]);
317		assert!(reader.next().is_none());
318	}
319
320	#[test]
321	fn reader_batch_spans_chunk_boundary() {
322		// Chunks [0..3), [3..5), [5..9). Batch size 4 emits batches:
323		//   [0..4) crosses chunk0->chunk1, [4..8) crosses chunk1->chunk2, [8..9) tail.
324		let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
325		let mut reader = SnapshotReader::new(snap, 4);
326
327		let b0 = reader.next().unwrap().unwrap();
328		assert_eq!(b0.row_count(), 4);
329		let a = b0.column("a").unwrap();
330		let v0: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
331		assert_eq!(v0, vec!["10", "20", "30", "40"]);
332
333		let b1 = reader.next().unwrap().unwrap();
334		assert_eq!(b1.row_count(), 4);
335		let a = b1.column("a").unwrap();
336		let v1: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
337		assert_eq!(v1, vec!["50", "60", "70", "80"]);
338
339		let b2 = reader.next().unwrap().unwrap();
340		assert_eq!(b2.row_count(), 1);
341		assert_eq!(b2.column("a").unwrap().data().get_value(0).to_string(), "90");
342		assert!(reader.next().is_none());
343	}
344
345	#[test]
346	fn reader_batch_starts_mid_chunk() {
347		// One chunk of length 10, batch size 3 means batches start mid-chunk.
348		let snap = mk_chunked_snapshot(&[&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]);
349		let mut reader = SnapshotReader::new(snap, 3);
350
351		let b0 = reader.next().unwrap().unwrap();
352		assert_eq!(b0.row_count(), 3);
353		let b1 = reader.next().unwrap().unwrap();
354		assert_eq!(b1.row_count(), 3);
355		let a = b1.column("a").unwrap();
356		assert_eq!(a.data().get_value(0).to_string(), "4");
357		assert_eq!(a.data().get_value(2).to_string(), "6");
358	}
359
360	use reifydb_type::value::Value;
361
362	use crate::predicate::{ColRef, Predicate};
363
364	#[test]
365	fn pushdown_eq_predicate_keeps_only_matching_rows() {
366		// Single chunk: id values 0..5; predicate id == 3 keeps row 3 only.
367		let snap = mk_snapshot(5);
368		let p = Predicate::Eq(ColRef::from("a"), Value::Int4(3));
369		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
370
371		let batch = reader.next().expect("batch").unwrap();
372		assert_eq!(batch.row_count(), 1);
373		assert_eq!(batch.row_numbers[0], RowNumber(3));
374		assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "3");
375		assert_eq!(batch.column("b").unwrap().data().get_value(0).to_string(), "row-3");
376		assert!(reader.next().is_none());
377	}
378
379	#[test]
380	fn pushdown_filters_across_chunk_boundary() {
381		// 3 chunks: [10,20,30] | [40,50] | [60,70,80,90]. Predicate keeps anything
382		// equal to 30 (chunk 0) or 80 (chunk 2). Reader processes the whole snapshot
383		// in one batch (batch_size=100) so the filter spans every chunk.
384		let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
385		let p = Predicate::In(ColRef::from("a"), vec![Value::Int4(30), Value::Int4(80)]);
386		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
387
388		let batch = reader.next().expect("batch").unwrap();
389		assert_eq!(batch.row_count(), 2);
390		let a = batch.column("a").unwrap();
391		assert_eq!(a.data().get_value(0).to_string(), "30");
392		assert_eq!(a.data().get_value(1).to_string(), "80");
393		assert_eq!(batch.row_numbers[0], RowNumber(2));
394		assert_eq!(batch.row_numbers[1], RowNumber(7));
395		assert!(reader.next().is_none());
396	}
397
398	#[test]
399	fn pushdown_skips_empty_batches() {
400		// 6 rows, batch size 2 → batches [0..2), [2..4), [4..6). Predicate id==4 only
401		// matches in batch [4..6); the first two batches return Selection::None_ and
402		// must be skipped by the iterator (consumer never sees them).
403		let snap = mk_snapshot(6);
404		let p = Predicate::Eq(ColRef::from("a"), Value::Int4(4));
405		let mut reader = SnapshotReader::new(snap, 2).with_predicate(p);
406
407		let batch = reader.next().expect("only matching batch").unwrap();
408		assert_eq!(batch.row_count(), 1);
409		assert_eq!(batch.row_numbers[0], RowNumber(4));
410		assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
411		assert!(reader.next().is_none());
412	}
413
414	#[test]
415	fn pushdown_selection_all_passes_batch_through() {
416		// Predicate matches every row in the batch → Selection::All path. Output
417		// must equal the no-predicate batch.
418		let snap = mk_snapshot(5);
419		let p = Predicate::GtEq(ColRef::from("a"), Value::Int4(0));
420		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
421
422		let batch = reader.next().expect("batch").unwrap();
423		assert_eq!(batch.row_count(), 5);
424		let a = batch.column("a").unwrap();
425		let vals: Vec<String> = (0..5).map(|i| a.data().get_value(i).to_string()).collect();
426		assert_eq!(vals, vec!["0", "1", "2", "3", "4"]);
427		assert_eq!(batch.row_numbers[0], RowNumber(0));
428		assert_eq!(batch.row_numbers[4], RowNumber(4));
429	}
430
431	#[test]
432	fn pushdown_is_none_over_multi_chunk_nullable() {
433		// Two nullable chunks; nones at position 1 of each chunk → block rows 1, 4.
434		let mut a = ColumnBuffer::int4_with_capacity(3);
435		a.push::<i32>(10);
436		a.push_none();
437		a.push::<i32>(30);
438		let mut b = ColumnBuffer::int4_with_capacity(3);
439		b.push::<i32>(40);
440		b.push_none();
441		b.push::<i32>(60);
442		let chunks = vec![array_from_column_data(&a), array_from_column_data(&b)];
443		let id_col = ColumnChunks::new(Type::Int4, true, chunks);
444		let schema = Arc::new(vec![("a".to_string(), Type::Int4, true)]);
445		let block = ColumnBlock::new(schema, vec![id_col]);
446		let now = Clock::Real.instant();
447		let snap = Arc::new(Snapshot {
448			id: SnapshotId::Table {
449				table_id: TableId(1),
450				commit_version: CommitVersion(1),
451			},
452			source: SnapshotSource::Table {
453				table_id: TableId(1),
454				commit_version: CommitVersion(1),
455			},
456			namespace: "test".to_string(),
457			name: "t".to_string(),
458			created_at: now,
459			block,
460		});
461
462		let p = Predicate::IsNone(ColRef::from("a"));
463		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
464
465		let batch = reader.next().expect("batch").unwrap();
466		assert_eq!(batch.row_count(), 2);
467		assert_eq!(batch.row_numbers[0], RowNumber(1));
468		assert_eq!(batch.row_numbers[1], RowNumber(4));
469	}
470}