1use std::sync::Arc;
5
6use reifydb_core::value::column::{
7 ColumnWithName, array::Column, buffer::ColumnBuffer, columns::Columns, mask::RowMask,
8};
9use reifydb_type::{
10 Result,
11 fragment::Fragment,
12 value::{datetime::DateTime, row_number::RowNumber},
13};
14
15use crate::{
16 compute,
17 predicate::{self, Predicate},
18 selection::Selection,
19 snapshot::{ColumnBlock, ColumnChunks, Schema, Snapshot},
20};
21
22pub struct SnapshotReader {
23 snapshot: Arc<Snapshot>,
24 batch_size: usize,
25 offset: usize,
26 row_count: usize,
27 predicate: Option<Predicate>,
28}
29
30impl SnapshotReader {
31 pub fn new(snapshot: Arc<Snapshot>, batch_size: usize) -> Self {
32 let row_count = snapshot.block.columns.first().map(|c| c.len()).unwrap_or(0);
33 Self {
34 snapshot,
35 batch_size,
36 offset: 0,
37 row_count,
38 predicate: None,
39 }
40 }
41
42 pub fn with_predicate(mut self, predicate: Predicate) -> Self {
47 self.predicate = Some(predicate);
48 self
49 }
50
51 pub fn row_count(&self) -> usize {
52 self.row_count
53 }
54
55 fn read_next_batch(&mut self) -> Result<Option<Columns>> {
56 let start = self.offset;
57 let end = (start + self.batch_size).min(self.row_count);
58 self.offset = end;
59
60 let block = &self.snapshot.block;
61 let schema = &block.schema;
62
63 let Some(predicate) = self.predicate.as_ref() else {
64 return Ok(Some(materialize_full(block, start, end)?));
65 };
66
67 let view = block.view_range(start, end)?;
68 let selection = predicate::evaluate(&view, predicate)?;
69 match selection {
70 Selection::None_ => Ok(None),
71 Selection::All => Ok(Some(materialize_view_full(schema, &view, start, end)?)),
72 Selection::Mask(mask) => Ok(Some(materialize_filtered(schema, &view, start, &mask)?)),
73 }
74 }
75}
76
77fn materialize_full(block: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
79 let len = end - start;
80 let mut columns: Vec<ColumnWithName> = Vec::with_capacity(block.schema.len());
81 for (i, (name, _ty, _nullable)) in block.schema.iter().enumerate() {
82 let data = read_range(&block.columns[i], start, end)?;
83 columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
84 }
85 let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
86 let ts = DateTime::default();
87 Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
88}
89
90fn materialize_view_full(schema: &Schema, view: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
94 let len = end - start;
95 let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
96 for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
97 let data = concat_view_chunks(&view.columns[i])?;
98 columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
99 }
100 let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
101 let ts = DateTime::default();
102 Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
103}
104
105fn materialize_filtered(schema: &Schema, view: &ColumnBlock, batch_start: usize, mask: &RowMask) -> Result<Columns> {
111 let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
112 for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
113 let data = filter_view_column(&view.columns[i], mask)?;
114 columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
115 }
116
117 let kept = mask.popcount();
118 let mut row_numbers: Vec<RowNumber> = Vec::with_capacity(kept);
119 for i in 0..mask.len() {
120 if mask.get(i) {
121 row_numbers.push(RowNumber((batch_start + i) as u64));
122 }
123 }
124 let ts = DateTime::default();
125 Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; kept], vec![ts; kept]))
126}
127
128fn filter_view_column(view_chunks: &ColumnChunks, mask: &RowMask) -> Result<ColumnBuffer> {
134 let mut chunk_offset = 0usize;
135 let mut out: Option<ColumnBuffer> = None;
136 for chunk in &view_chunks.chunks {
137 let chunk_len = chunk.len();
138 let chunk_mask = mask.slice(chunk_offset, chunk_offset + chunk_len);
139 chunk_offset += chunk_len;
140 if chunk_mask.popcount() == 0 {
141 continue;
142 }
143 let filtered: Column = compute::filter(chunk, &chunk_mask)?;
144 let buf = filtered.to_canonical()?.to_column_buffer()?;
145 match &mut out {
146 None => out = Some(buf),
147 Some(o) => o.extend(buf)?,
148 }
149 }
150 Ok(out.expect("Selection::Mask guarantees at least one row survives"))
151}
152
153fn concat_view_chunks(view_chunks: &ColumnChunks) -> Result<ColumnBuffer> {
156 let mut iter = view_chunks.chunks.iter();
157 let first =
158 iter.next().expect("concat_view_chunks called with empty chunks").to_canonical()?.to_column_buffer()?;
159 let mut out = first;
160 for chunk in iter {
161 out.extend(chunk.to_canonical()?.to_column_buffer()?)?;
162 }
163 Ok(out)
164}
165
166fn read_range(column_chunks: &ColumnChunks, start: usize, end: usize) -> Result<ColumnBuffer> {
173 let ranges = column_chunks.iter_range_chunks(start, end);
174 let mut iter = ranges.into_iter();
175 let (first_idx, first_s, first_e) = iter.next().expect("read_range called with empty range");
176 let first = column_chunks.chunks[first_idx].slice(first_s, first_e)?.to_canonical()?.to_column_buffer()?;
177 let mut out = first;
178 for (idx, s, e) in iter {
179 let buf = column_chunks.chunks[idx].slice(s, e)?.to_canonical()?.to_column_buffer()?;
180 out.extend(buf)?;
181 }
182 Ok(out)
183}
184
185impl Iterator for SnapshotReader {
186 type Item = Result<Columns>;
187
188 fn next(&mut self) -> Option<Self::Item> {
189 loop {
190 if self.offset >= self.row_count {
191 return None;
192 }
193 match self.read_next_batch() {
194 Ok(Some(c)) => return Some(Ok(c)),
195 Ok(None) => continue,
196 Err(e) => return Some(Err(e)),
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use reifydb_core::{
205 common::CommitVersion,
206 interface::catalog::id::TableId,
207 value::column::array::{Column, canonical::Canonical},
208 };
209 use reifydb_runtime::context::clock::Clock;
210 use reifydb_type::value::r#type::Type;
211
212 use super::*;
213 use crate::snapshot::{ColumnBlock, ColumnChunks, SnapshotId, SnapshotSource};
214
215 fn array_from_column_data(cd: &ColumnBuffer) -> Column {
216 let ca = Canonical::from_column_buffer(cd).unwrap();
217 Column::from_canonical(ca)
218 }
219
220 fn mk_snapshot(rows: usize) -> Arc<Snapshot> {
221 let a_col = ColumnBuffer::int4((0..rows as i32).collect::<Vec<_>>());
222 let b_col = ColumnBuffer::utf8((0..rows).map(|i| format!("row-{i}")).collect::<Vec<_>>());
223
224 let chunked_a = ColumnChunks::single(Type::Int4, false, array_from_column_data(&a_col));
225 let chunked_b = ColumnChunks::single(Type::Utf8, false, array_from_column_data(&b_col));
226
227 let schema = Arc::new(vec![("a".to_string(), Type::Int4, false), ("b".to_string(), Type::Utf8, false)]);
228 let block = ColumnBlock::new(schema, vec![chunked_a, chunked_b]);
229
230 let now = Clock::Real.instant();
231 Arc::new(Snapshot {
232 id: SnapshotId::Table {
233 table_id: TableId(1),
234 commit_version: CommitVersion(1),
235 },
236 source: SnapshotSource::Table {
237 table_id: TableId(1),
238 commit_version: CommitVersion(1),
239 },
240 namespace: "test".to_string(),
241 name: "t".to_string(),
242 created_at: now,
243 block,
244 })
245 }
246
247 #[test]
248 fn reader_returns_none_for_empty_snapshot() {
249 let snap = mk_snapshot(0);
250 let mut reader = SnapshotReader::new(snap, 4);
251 assert!(reader.next().is_none());
252 }
253
254 #[test]
255 fn reader_emits_batches_matching_batch_size() {
256 let snap = mk_snapshot(5);
257 let mut reader = SnapshotReader::new(snap, 2);
258
259 let batch = reader.next().expect("first batch").unwrap();
260 assert_eq!(batch.row_count(), 2);
261 assert_eq!(batch.row_numbers[0], RowNumber(0));
262 assert_eq!(batch.row_numbers[1], RowNumber(1));
263
264 let a = batch.column("a").unwrap();
265 assert_eq!(a.data().get_value(0).to_string(), "0");
266 assert_eq!(a.data().get_value(1).to_string(), "1");
267
268 let b = batch.column("b").unwrap();
269 assert_eq!(b.data().get_value(0).to_string(), "row-0");
270
271 let batch = reader.next().expect("second batch").unwrap();
272 assert_eq!(batch.row_count(), 2);
273 assert_eq!(batch.row_numbers[0], RowNumber(2));
274
275 let batch = reader.next().expect("final partial batch").unwrap();
276 assert_eq!(batch.row_count(), 1);
277 assert_eq!(batch.row_numbers[0], RowNumber(4));
278 assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
279
280 assert!(reader.next().is_none());
281 }
282
283 fn mk_chunked_snapshot(parts: &[&[i32]]) -> Arc<Snapshot> {
284 let chunks: Vec<Column> =
285 parts.iter().map(|p| array_from_column_data(&ColumnBuffer::int4(p.to_vec()))).collect();
286 let chunked_a = ColumnChunks::new(Type::Int4, false, chunks);
287 let schema = Arc::new(vec![("a".to_string(), Type::Int4, false)]);
288 let block = ColumnBlock::new(schema, vec![chunked_a]);
289 let now = Clock::Real.instant();
290 Arc::new(Snapshot {
291 id: SnapshotId::Table {
292 table_id: TableId(1),
293 commit_version: CommitVersion(1),
294 },
295 source: SnapshotSource::Table {
296 table_id: TableId(1),
297 commit_version: CommitVersion(1),
298 },
299 namespace: "test".to_string(),
300 name: "t".to_string(),
301 created_at: now,
302 block,
303 })
304 }
305
306 #[test]
307 fn reader_handles_multi_chunk_column() {
308 let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
309 let mut reader = SnapshotReader::new(snap, 100);
310 assert_eq!(reader.row_count(), 9);
311
312 let batch = reader.next().unwrap().unwrap();
313 assert_eq!(batch.row_count(), 9);
314 let a = batch.column("a").unwrap();
315 let actual: Vec<String> = (0..9).map(|i| a.data().get_value(i).to_string()).collect();
316 assert_eq!(actual, vec!["10", "20", "30", "40", "50", "60", "70", "80", "90"]);
317 assert!(reader.next().is_none());
318 }
319
320 #[test]
321 fn reader_batch_spans_chunk_boundary() {
322 let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
325 let mut reader = SnapshotReader::new(snap, 4);
326
327 let b0 = reader.next().unwrap().unwrap();
328 assert_eq!(b0.row_count(), 4);
329 let a = b0.column("a").unwrap();
330 let v0: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
331 assert_eq!(v0, vec!["10", "20", "30", "40"]);
332
333 let b1 = reader.next().unwrap().unwrap();
334 assert_eq!(b1.row_count(), 4);
335 let a = b1.column("a").unwrap();
336 let v1: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
337 assert_eq!(v1, vec!["50", "60", "70", "80"]);
338
339 let b2 = reader.next().unwrap().unwrap();
340 assert_eq!(b2.row_count(), 1);
341 assert_eq!(b2.column("a").unwrap().data().get_value(0).to_string(), "90");
342 assert!(reader.next().is_none());
343 }
344
345 #[test]
346 fn reader_batch_starts_mid_chunk() {
347 let snap = mk_chunked_snapshot(&[&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]);
349 let mut reader = SnapshotReader::new(snap, 3);
350
351 let b0 = reader.next().unwrap().unwrap();
352 assert_eq!(b0.row_count(), 3);
353 let b1 = reader.next().unwrap().unwrap();
354 assert_eq!(b1.row_count(), 3);
355 let a = b1.column("a").unwrap();
356 assert_eq!(a.data().get_value(0).to_string(), "4");
357 assert_eq!(a.data().get_value(2).to_string(), "6");
358 }
359
360 use reifydb_type::value::Value;
361
362 use crate::predicate::{ColRef, Predicate};
363
364 #[test]
365 fn pushdown_eq_predicate_keeps_only_matching_rows() {
366 let snap = mk_snapshot(5);
368 let p = Predicate::Eq(ColRef::from("a"), Value::Int4(3));
369 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
370
371 let batch = reader.next().expect("batch").unwrap();
372 assert_eq!(batch.row_count(), 1);
373 assert_eq!(batch.row_numbers[0], RowNumber(3));
374 assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "3");
375 assert_eq!(batch.column("b").unwrap().data().get_value(0).to_string(), "row-3");
376 assert!(reader.next().is_none());
377 }
378
379 #[test]
380 fn pushdown_filters_across_chunk_boundary() {
381 let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
385 let p = Predicate::In(ColRef::from("a"), vec![Value::Int4(30), Value::Int4(80)]);
386 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
387
388 let batch = reader.next().expect("batch").unwrap();
389 assert_eq!(batch.row_count(), 2);
390 let a = batch.column("a").unwrap();
391 assert_eq!(a.data().get_value(0).to_string(), "30");
392 assert_eq!(a.data().get_value(1).to_string(), "80");
393 assert_eq!(batch.row_numbers[0], RowNumber(2));
394 assert_eq!(batch.row_numbers[1], RowNumber(7));
395 assert!(reader.next().is_none());
396 }
397
398 #[test]
399 fn pushdown_skips_empty_batches() {
400 let snap = mk_snapshot(6);
404 let p = Predicate::Eq(ColRef::from("a"), Value::Int4(4));
405 let mut reader = SnapshotReader::new(snap, 2).with_predicate(p);
406
407 let batch = reader.next().expect("only matching batch").unwrap();
408 assert_eq!(batch.row_count(), 1);
409 assert_eq!(batch.row_numbers[0], RowNumber(4));
410 assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
411 assert!(reader.next().is_none());
412 }
413
414 #[test]
415 fn pushdown_selection_all_passes_batch_through() {
416 let snap = mk_snapshot(5);
419 let p = Predicate::GtEq(ColRef::from("a"), Value::Int4(0));
420 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
421
422 let batch = reader.next().expect("batch").unwrap();
423 assert_eq!(batch.row_count(), 5);
424 let a = batch.column("a").unwrap();
425 let vals: Vec<String> = (0..5).map(|i| a.data().get_value(i).to_string()).collect();
426 assert_eq!(vals, vec!["0", "1", "2", "3", "4"]);
427 assert_eq!(batch.row_numbers[0], RowNumber(0));
428 assert_eq!(batch.row_numbers[4], RowNumber(4));
429 }
430
431 #[test]
432 fn pushdown_is_none_over_multi_chunk_nullable() {
433 let mut a = ColumnBuffer::int4_with_capacity(3);
435 a.push::<i32>(10);
436 a.push_none();
437 a.push::<i32>(30);
438 let mut b = ColumnBuffer::int4_with_capacity(3);
439 b.push::<i32>(40);
440 b.push_none();
441 b.push::<i32>(60);
442 let chunks = vec![array_from_column_data(&a), array_from_column_data(&b)];
443 let id_col = ColumnChunks::new(Type::Int4, true, chunks);
444 let schema = Arc::new(vec![("a".to_string(), Type::Int4, true)]);
445 let block = ColumnBlock::new(schema, vec![id_col]);
446 let now = Clock::Real.instant();
447 let snap = Arc::new(Snapshot {
448 id: SnapshotId::Table {
449 table_id: TableId(1),
450 commit_version: CommitVersion(1),
451 },
452 source: SnapshotSource::Table {
453 table_id: TableId(1),
454 commit_version: CommitVersion(1),
455 },
456 namespace: "test".to_string(),
457 name: "t".to_string(),
458 created_at: now,
459 block,
460 });
461
462 let p = Predicate::IsNone(ColRef::from("a"));
463 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
464
465 let batch = reader.next().expect("batch").unwrap();
466 assert_eq!(batch.row_count(), 2);
467 assert_eq!(batch.row_numbers[0], RowNumber(1));
468 assert_eq!(batch.row_numbers[1], RowNumber(4));
469 }
470}