Skip to main content

reifydb_column/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::value::column::{
7	ColumnWithName, buffer::ColumnBuffer, columns::Columns, data::Column, mask::RowMask,
8};
9use reifydb_type::{
10	Result,
11	fragment::Fragment,
12	value::{datetime::DateTime, row_number::RowNumber},
13};
14
15use crate::{
16	compute,
17	predicate::{self, Predicate},
18	selection::Selection,
19	snapshot::{ColumnBlock, ColumnChunks, Schema, Snapshot},
20};
21
22pub struct SnapshotReader {
23	snapshot: Arc<Snapshot>,
24	batch_size: usize,
25	offset: usize,
26	row_count: usize,
27	predicate: Option<Predicate>,
28}
29
30impl SnapshotReader {
31	pub fn new(snapshot: Arc<Snapshot>, batch_size: usize) -> Self {
32		let row_count = snapshot.block.columns.first().map(|c| c.len()).unwrap_or(0);
33		Self {
34			snapshot,
35			batch_size,
36			offset: 0,
37			row_count,
38			predicate: None,
39		}
40	}
41
42	pub fn with_predicate(mut self, predicate: Predicate) -> Self {
43		self.predicate = Some(predicate);
44		self
45	}
46
47	pub fn row_count(&self) -> usize {
48		self.row_count
49	}
50
51	fn read_next_batch(&mut self) -> Result<Option<Columns>> {
52		let start = self.offset;
53		let end = (start + self.batch_size).min(self.row_count);
54		self.offset = end;
55
56		let block = &self.snapshot.block;
57		let schema = &block.schema;
58
59		let Some(predicate) = self.predicate.as_ref() else {
60			return Ok(Some(materialize_full(block, start, end)?));
61		};
62
63		let view = block.view_range(start, end)?;
64		let selection = predicate::evaluate(&view, predicate)?;
65		match selection {
66			Selection::None_ => Ok(None),
67			Selection::All => Ok(Some(materialize_view_full(schema, &view, start, end)?)),
68			Selection::Mask(mask) => Ok(Some(materialize_filtered(schema, &view, start, &mask)?)),
69		}
70	}
71}
72
73fn materialize_full(block: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
74	let len = end - start;
75	let mut columns: Vec<ColumnWithName> = Vec::with_capacity(block.schema.len());
76	for (i, (name, _ty, _nullable)) in block.schema.iter().enumerate() {
77		let data = read_range(&block.columns[i], start, end)?;
78		columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
79	}
80	let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
81	let ts = DateTime::default();
82	Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
83}
84
85fn materialize_view_full(schema: &Schema, view: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
86	let len = end - start;
87	let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
88	for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
89		let data = concat_view_chunks(&view.columns[i])?;
90		columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
91	}
92	let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
93	let ts = DateTime::default();
94	Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
95}
96
97fn materialize_filtered(schema: &Schema, view: &ColumnBlock, batch_start: usize, mask: &RowMask) -> Result<Columns> {
98	let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
99	for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
100		let data = filter_view_column(&view.columns[i], mask)?;
101		columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
102	}
103
104	let kept = mask.popcount();
105	let mut row_numbers: Vec<RowNumber> = Vec::with_capacity(kept);
106	for i in 0..mask.len() {
107		if mask.get(i) {
108			row_numbers.push(RowNumber((batch_start + i) as u64));
109		}
110	}
111	let ts = DateTime::default();
112	Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; kept], vec![ts; kept]))
113}
114
115fn filter_view_column(view_chunks: &ColumnChunks, mask: &RowMask) -> Result<ColumnBuffer> {
116	let mut chunk_offset = 0usize;
117	let mut out: Option<ColumnBuffer> = None;
118	for chunk in &view_chunks.chunks {
119		let chunk_len = chunk.len();
120		let chunk_mask = mask.slice(chunk_offset, chunk_offset + chunk_len);
121		chunk_offset += chunk_len;
122		if chunk_mask.popcount() == 0 {
123			continue;
124		}
125		let filtered: Column = compute::filter(chunk, &chunk_mask)?;
126		let buf = filtered.to_canonical()?.to_column_buffer()?;
127		match &mut out {
128			None => out = Some(buf),
129			Some(o) => o.extend(buf)?,
130		}
131	}
132	Ok(out.expect("Selection::Mask guarantees at least one row survives"))
133}
134
135fn concat_view_chunks(view_chunks: &ColumnChunks) -> Result<ColumnBuffer> {
136	let mut iter = view_chunks.chunks.iter();
137	let first =
138		iter.next().expect("concat_view_chunks called with empty chunks").to_canonical()?.to_column_buffer()?;
139	let mut out = first;
140	for chunk in iter {
141		out.extend(chunk.to_canonical()?.to_column_buffer()?)?;
142	}
143	Ok(out)
144}
145
146fn read_range(column_chunks: &ColumnChunks, start: usize, end: usize) -> Result<ColumnBuffer> {
147	let ranges = column_chunks.iter_range_chunks(start, end);
148	let mut iter = ranges.into_iter();
149	let (first_idx, first_s, first_e) = iter.next().expect("read_range called with empty range");
150	let first = column_chunks.chunks[first_idx].slice(first_s, first_e)?.to_canonical()?.to_column_buffer()?;
151	let mut out = first;
152	for (idx, s, e) in iter {
153		let buf = column_chunks.chunks[idx].slice(s, e)?.to_canonical()?.to_column_buffer()?;
154		out.extend(buf)?;
155	}
156	Ok(out)
157}
158
159impl Iterator for SnapshotReader {
160	type Item = Result<Columns>;
161
162	fn next(&mut self) -> Option<Self::Item> {
163		loop {
164			if self.offset >= self.row_count {
165				return None;
166			}
167			match self.read_next_batch() {
168				Ok(Some(c)) => return Some(Ok(c)),
169				Ok(None) => continue,
170				Err(e) => return Some(Err(e)),
171			}
172		}
173	}
174}
175
176#[cfg(test)]
177mod tests {
178	use reifydb_core::{
179		common::CommitVersion,
180		interface::catalog::id::TableId,
181		value::column::data::{Column, canonical::Canonical},
182	};
183	use reifydb_runtime::context::clock::Clock;
184	use reifydb_type::value::r#type::Type;
185
186	use super::*;
187	use crate::snapshot::{ColumnBlock, ColumnChunks, SnapshotId, SnapshotSource};
188
189	fn array_from_column_data(cd: &ColumnBuffer) -> Column {
190		let ca = Canonical::from_column_buffer(cd).unwrap();
191		Column::from_canonical(ca)
192	}
193
194	fn mk_snapshot(rows: usize) -> Arc<Snapshot> {
195		let a_col = ColumnBuffer::int4((0..rows as i32).collect::<Vec<_>>());
196		let b_col = ColumnBuffer::utf8((0..rows).map(|i| format!("row-{i}")).collect::<Vec<_>>());
197
198		let chunked_a = ColumnChunks::single(Type::Int4, false, array_from_column_data(&a_col));
199		let chunked_b = ColumnChunks::single(Type::Utf8, false, array_from_column_data(&b_col));
200
201		let schema = Arc::new(vec![("a".to_string(), Type::Int4, false), ("b".to_string(), Type::Utf8, false)]);
202		let block = ColumnBlock::new(schema, vec![chunked_a, chunked_b]);
203
204		let now = Clock::Real.instant();
205		Arc::new(Snapshot {
206			id: SnapshotId::Table {
207				table_id: TableId(1),
208				commit_version: CommitVersion(1),
209			},
210			source: SnapshotSource::Table {
211				table_id: TableId(1),
212				commit_version: CommitVersion(1),
213			},
214			namespace: "test".to_string(),
215			name: "t".to_string(),
216			created_at: now,
217			block,
218		})
219	}
220
221	#[test]
222	fn reader_returns_none_for_empty_snapshot() {
223		let snap = mk_snapshot(0);
224		let mut reader = SnapshotReader::new(snap, 4);
225		assert!(reader.next().is_none());
226	}
227
228	#[test]
229	fn reader_emits_batches_matching_batch_size() {
230		let snap = mk_snapshot(5);
231		let mut reader = SnapshotReader::new(snap, 2);
232
233		let batch = reader.next().expect("first batch").unwrap();
234		assert_eq!(batch.row_count(), 2);
235		assert_eq!(batch.row_numbers[0], RowNumber(0));
236		assert_eq!(batch.row_numbers[1], RowNumber(1));
237
238		let a = batch.column("a").unwrap();
239		assert_eq!(a.data().get_value(0).to_string(), "0");
240		assert_eq!(a.data().get_value(1).to_string(), "1");
241
242		let b = batch.column("b").unwrap();
243		assert_eq!(b.data().get_value(0).to_string(), "row-0");
244
245		let batch = reader.next().expect("second batch").unwrap();
246		assert_eq!(batch.row_count(), 2);
247		assert_eq!(batch.row_numbers[0], RowNumber(2));
248
249		let batch = reader.next().expect("final partial batch").unwrap();
250		assert_eq!(batch.row_count(), 1);
251		assert_eq!(batch.row_numbers[0], RowNumber(4));
252		assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
253
254		assert!(reader.next().is_none());
255	}
256
257	fn mk_chunked_snapshot(parts: &[&[i32]]) -> Arc<Snapshot> {
258		let chunks: Vec<Column> =
259			parts.iter().map(|p| array_from_column_data(&ColumnBuffer::int4(p.to_vec()))).collect();
260		let chunked_a = ColumnChunks::new(Type::Int4, false, chunks);
261		let schema = Arc::new(vec![("a".to_string(), Type::Int4, false)]);
262		let block = ColumnBlock::new(schema, vec![chunked_a]);
263		let now = Clock::Real.instant();
264		Arc::new(Snapshot {
265			id: SnapshotId::Table {
266				table_id: TableId(1),
267				commit_version: CommitVersion(1),
268			},
269			source: SnapshotSource::Table {
270				table_id: TableId(1),
271				commit_version: CommitVersion(1),
272			},
273			namespace: "test".to_string(),
274			name: "t".to_string(),
275			created_at: now,
276			block,
277		})
278	}
279
280	#[test]
281	fn reader_handles_multi_chunk_column() {
282		let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
283		let mut reader = SnapshotReader::new(snap, 100);
284		assert_eq!(reader.row_count(), 9);
285
286		let batch = reader.next().unwrap().unwrap();
287		assert_eq!(batch.row_count(), 9);
288		let a = batch.column("a").unwrap();
289		let actual: Vec<String> = (0..9).map(|i| a.data().get_value(i).to_string()).collect();
290		assert_eq!(actual, vec!["10", "20", "30", "40", "50", "60", "70", "80", "90"]);
291		assert!(reader.next().is_none());
292	}
293
294	#[test]
295	fn reader_batch_spans_chunk_boundary() {
296		// Chunks [0..3), [3..5), [5..9). Batch size 4 emits batches:
297		//   [0..4) crosses chunk0->chunk1, [4..8) crosses chunk1->chunk2, [8..9) tail.
298		let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
299		let mut reader = SnapshotReader::new(snap, 4);
300
301		let b0 = reader.next().unwrap().unwrap();
302		assert_eq!(b0.row_count(), 4);
303		let a = b0.column("a").unwrap();
304		let v0: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
305		assert_eq!(v0, vec!["10", "20", "30", "40"]);
306
307		let b1 = reader.next().unwrap().unwrap();
308		assert_eq!(b1.row_count(), 4);
309		let a = b1.column("a").unwrap();
310		let v1: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
311		assert_eq!(v1, vec!["50", "60", "70", "80"]);
312
313		let b2 = reader.next().unwrap().unwrap();
314		assert_eq!(b2.row_count(), 1);
315		assert_eq!(b2.column("a").unwrap().data().get_value(0).to_string(), "90");
316		assert!(reader.next().is_none());
317	}
318
319	#[test]
320	fn reader_batch_starts_mid_chunk() {
321		// One chunk of length 10, batch size 3 means batches start mid-chunk.
322		let snap = mk_chunked_snapshot(&[&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]);
323		let mut reader = SnapshotReader::new(snap, 3);
324
325		let b0 = reader.next().unwrap().unwrap();
326		assert_eq!(b0.row_count(), 3);
327		let b1 = reader.next().unwrap().unwrap();
328		assert_eq!(b1.row_count(), 3);
329		let a = b1.column("a").unwrap();
330		assert_eq!(a.data().get_value(0).to_string(), "4");
331		assert_eq!(a.data().get_value(2).to_string(), "6");
332	}
333
334	use reifydb_type::value::Value;
335
336	use crate::predicate::{ColRef, Predicate};
337
338	#[test]
339	fn pushdown_eq_predicate_keeps_only_matching_rows() {
340		// Single chunk: id values 0..5; predicate id == 3 keeps row 3 only.
341		let snap = mk_snapshot(5);
342		let p = Predicate::Eq(ColRef::from("a"), Value::Int4(3));
343		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
344
345		let batch = reader.next().expect("batch").unwrap();
346		assert_eq!(batch.row_count(), 1);
347		assert_eq!(batch.row_numbers[0], RowNumber(3));
348		assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "3");
349		assert_eq!(batch.column("b").unwrap().data().get_value(0).to_string(), "row-3");
350		assert!(reader.next().is_none());
351	}
352
353	#[test]
354	fn pushdown_filters_across_chunk_boundary() {
355		// 3 chunks: [10,20,30] | [40,50] | [60,70,80,90]. Predicate keeps anything
356		// equal to 30 (chunk 0) or 80 (chunk 2). Reader processes the whole snapshot
357		// in one batch (batch_size=100) so the filter spans every chunk.
358		let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
359		let p = Predicate::In(ColRef::from("a"), vec![Value::Int4(30), Value::Int4(80)]);
360		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
361
362		let batch = reader.next().expect("batch").unwrap();
363		assert_eq!(batch.row_count(), 2);
364		let a = batch.column("a").unwrap();
365		assert_eq!(a.data().get_value(0).to_string(), "30");
366		assert_eq!(a.data().get_value(1).to_string(), "80");
367		assert_eq!(batch.row_numbers[0], RowNumber(2));
368		assert_eq!(batch.row_numbers[1], RowNumber(7));
369		assert!(reader.next().is_none());
370	}
371
372	#[test]
373	fn pushdown_skips_empty_batches() {
374		// 6 rows, batch size 2 → batches [0..2), [2..4), [4..6). Predicate id==4 only
375		// matches in batch [4..6); the first two batches return Selection::None_ and
376		// must be skipped by the iterator (consumer never sees them).
377		let snap = mk_snapshot(6);
378		let p = Predicate::Eq(ColRef::from("a"), Value::Int4(4));
379		let mut reader = SnapshotReader::new(snap, 2).with_predicate(p);
380
381		let batch = reader.next().expect("only matching batch").unwrap();
382		assert_eq!(batch.row_count(), 1);
383		assert_eq!(batch.row_numbers[0], RowNumber(4));
384		assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
385		assert!(reader.next().is_none());
386	}
387
388	#[test]
389	fn pushdown_selection_all_passes_batch_through() {
390		// Predicate matches every row in the batch → Selection::All path. Output
391		// must equal the no-predicate batch.
392		let snap = mk_snapshot(5);
393		let p = Predicate::GtEq(ColRef::from("a"), Value::Int4(0));
394		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
395
396		let batch = reader.next().expect("batch").unwrap();
397		assert_eq!(batch.row_count(), 5);
398		let a = batch.column("a").unwrap();
399		let vals: Vec<String> = (0..5).map(|i| a.data().get_value(i).to_string()).collect();
400		assert_eq!(vals, vec!["0", "1", "2", "3", "4"]);
401		assert_eq!(batch.row_numbers[0], RowNumber(0));
402		assert_eq!(batch.row_numbers[4], RowNumber(4));
403	}
404
405	#[test]
406	fn pushdown_is_none_over_multi_chunk_nullable() {
407		// Two nullable chunks; nones at position 1 of each chunk → block rows 1, 4.
408		let mut a = ColumnBuffer::int4_with_capacity(3);
409		a.push::<i32>(10);
410		a.push_none();
411		a.push::<i32>(30);
412		let mut b = ColumnBuffer::int4_with_capacity(3);
413		b.push::<i32>(40);
414		b.push_none();
415		b.push::<i32>(60);
416		let chunks = vec![array_from_column_data(&a), array_from_column_data(&b)];
417		let id_col = ColumnChunks::new(Type::Int4, true, chunks);
418		let schema = Arc::new(vec![("a".to_string(), Type::Int4, true)]);
419		let block = ColumnBlock::new(schema, vec![id_col]);
420		let now = Clock::Real.instant();
421		let snap = Arc::new(Snapshot {
422			id: SnapshotId::Table {
423				table_id: TableId(1),
424				commit_version: CommitVersion(1),
425			},
426			source: SnapshotSource::Table {
427				table_id: TableId(1),
428				commit_version: CommitVersion(1),
429			},
430			namespace: "test".to_string(),
431			name: "t".to_string(),
432			created_at: now,
433			block,
434		});
435
436		let p = Predicate::IsNone(ColRef::from("a"));
437		let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
438
439		let batch = reader.next().expect("batch").unwrap();
440		assert_eq!(batch.row_count(), 2);
441		assert_eq!(batch.row_numbers[0], RowNumber(1));
442		assert_eq!(batch.row_numbers[1], RowNumber(4));
443	}
444}