1use std::sync::Arc;
5
6use reifydb_core::value::column::{
7 ColumnWithName, buffer::ColumnBuffer, columns::Columns, data::Column, mask::RowMask,
8};
9use reifydb_type::{
10 Result,
11 fragment::Fragment,
12 value::{datetime::DateTime, row_number::RowNumber},
13};
14
15use crate::{
16 compute,
17 predicate::{self, Predicate},
18 selection::Selection,
19 snapshot::{ColumnBlock, ColumnChunks, Schema, Snapshot},
20};
21
22pub struct SnapshotReader {
23 snapshot: Arc<Snapshot>,
24 batch_size: usize,
25 offset: usize,
26 row_count: usize,
27 predicate: Option<Predicate>,
28}
29
30impl SnapshotReader {
31 pub fn new(snapshot: Arc<Snapshot>, batch_size: usize) -> Self {
32 let row_count = snapshot.block.columns.first().map(|c| c.len()).unwrap_or(0);
33 Self {
34 snapshot,
35 batch_size,
36 offset: 0,
37 row_count,
38 predicate: None,
39 }
40 }
41
42 pub fn with_predicate(mut self, predicate: Predicate) -> Self {
43 self.predicate = Some(predicate);
44 self
45 }
46
47 pub fn row_count(&self) -> usize {
48 self.row_count
49 }
50
51 fn read_next_batch(&mut self) -> Result<Option<Columns>> {
52 let start = self.offset;
53 let end = (start + self.batch_size).min(self.row_count);
54 self.offset = end;
55
56 let block = &self.snapshot.block;
57 let schema = &block.schema;
58
59 let Some(predicate) = self.predicate.as_ref() else {
60 return Ok(Some(materialize_full(block, start, end)?));
61 };
62
63 let view = block.view_range(start, end)?;
64 let selection = predicate::evaluate(&view, predicate)?;
65 match selection {
66 Selection::None_ => Ok(None),
67 Selection::All => Ok(Some(materialize_view_full(schema, &view, start, end)?)),
68 Selection::Mask(mask) => Ok(Some(materialize_filtered(schema, &view, start, &mask)?)),
69 }
70 }
71}
72
73fn materialize_full(block: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
74 let len = end - start;
75 let mut columns: Vec<ColumnWithName> = Vec::with_capacity(block.schema.len());
76 for (i, (name, _ty, _nullable)) in block.schema.iter().enumerate() {
77 let data = read_range(&block.columns[i], start, end)?;
78 columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
79 }
80 let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
81 let ts = DateTime::default();
82 Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
83}
84
85fn materialize_view_full(schema: &Schema, view: &ColumnBlock, start: usize, end: usize) -> Result<Columns> {
86 let len = end - start;
87 let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
88 for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
89 let data = concat_view_chunks(&view.columns[i])?;
90 columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
91 }
92 let row_numbers: Vec<RowNumber> = (start..end).map(|i| RowNumber(i as u64)).collect();
93 let ts = DateTime::default();
94 Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; len], vec![ts; len]))
95}
96
97fn materialize_filtered(schema: &Schema, view: &ColumnBlock, batch_start: usize, mask: &RowMask) -> Result<Columns> {
98 let mut columns: Vec<ColumnWithName> = Vec::with_capacity(schema.len());
99 for (i, (name, _ty, _nullable)) in schema.iter().enumerate() {
100 let data = filter_view_column(&view.columns[i], mask)?;
101 columns.push(ColumnWithName::new(Fragment::internal(name.clone()), data));
102 }
103
104 let kept = mask.popcount();
105 let mut row_numbers: Vec<RowNumber> = Vec::with_capacity(kept);
106 for i in 0..mask.len() {
107 if mask.get(i) {
108 row_numbers.push(RowNumber((batch_start + i) as u64));
109 }
110 }
111 let ts = DateTime::default();
112 Ok(Columns::with_system_columns(columns, row_numbers, vec![ts; kept], vec![ts; kept]))
113}
114
115fn filter_view_column(view_chunks: &ColumnChunks, mask: &RowMask) -> Result<ColumnBuffer> {
116 let mut chunk_offset = 0usize;
117 let mut out: Option<ColumnBuffer> = None;
118 for chunk in &view_chunks.chunks {
119 let chunk_len = chunk.len();
120 let chunk_mask = mask.slice(chunk_offset, chunk_offset + chunk_len);
121 chunk_offset += chunk_len;
122 if chunk_mask.popcount() == 0 {
123 continue;
124 }
125 let filtered: Column = compute::filter(chunk, &chunk_mask)?;
126 let buf = filtered.to_canonical()?.to_column_buffer()?;
127 match &mut out {
128 None => out = Some(buf),
129 Some(o) => o.extend(buf)?,
130 }
131 }
132 Ok(out.expect("Selection::Mask guarantees at least one row survives"))
133}
134
135fn concat_view_chunks(view_chunks: &ColumnChunks) -> Result<ColumnBuffer> {
136 let mut iter = view_chunks.chunks.iter();
137 let first =
138 iter.next().expect("concat_view_chunks called with empty chunks").to_canonical()?.to_column_buffer()?;
139 let mut out = first;
140 for chunk in iter {
141 out.extend(chunk.to_canonical()?.to_column_buffer()?)?;
142 }
143 Ok(out)
144}
145
146fn read_range(column_chunks: &ColumnChunks, start: usize, end: usize) -> Result<ColumnBuffer> {
147 let ranges = column_chunks.iter_range_chunks(start, end);
148 let mut iter = ranges.into_iter();
149 let (first_idx, first_s, first_e) = iter.next().expect("read_range called with empty range");
150 let first = column_chunks.chunks[first_idx].slice(first_s, first_e)?.to_canonical()?.to_column_buffer()?;
151 let mut out = first;
152 for (idx, s, e) in iter {
153 let buf = column_chunks.chunks[idx].slice(s, e)?.to_canonical()?.to_column_buffer()?;
154 out.extend(buf)?;
155 }
156 Ok(out)
157}
158
159impl Iterator for SnapshotReader {
160 type Item = Result<Columns>;
161
162 fn next(&mut self) -> Option<Self::Item> {
163 loop {
164 if self.offset >= self.row_count {
165 return None;
166 }
167 match self.read_next_batch() {
168 Ok(Some(c)) => return Some(Ok(c)),
169 Ok(None) => continue,
170 Err(e) => return Some(Err(e)),
171 }
172 }
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use reifydb_core::{
179 common::CommitVersion,
180 interface::catalog::id::TableId,
181 value::column::data::{Column, canonical::Canonical},
182 };
183 use reifydb_runtime::context::clock::Clock;
184 use reifydb_type::value::r#type::Type;
185
186 use super::*;
187 use crate::snapshot::{ColumnBlock, ColumnChunks, SnapshotId, SnapshotSource};
188
189 fn array_from_column_data(cd: &ColumnBuffer) -> Column {
190 let ca = Canonical::from_column_buffer(cd).unwrap();
191 Column::from_canonical(ca)
192 }
193
194 fn mk_snapshot(rows: usize) -> Arc<Snapshot> {
195 let a_col = ColumnBuffer::int4((0..rows as i32).collect::<Vec<_>>());
196 let b_col = ColumnBuffer::utf8((0..rows).map(|i| format!("row-{i}")).collect::<Vec<_>>());
197
198 let chunked_a = ColumnChunks::single(Type::Int4, false, array_from_column_data(&a_col));
199 let chunked_b = ColumnChunks::single(Type::Utf8, false, array_from_column_data(&b_col));
200
201 let schema = Arc::new(vec![("a".to_string(), Type::Int4, false), ("b".to_string(), Type::Utf8, false)]);
202 let block = ColumnBlock::new(schema, vec![chunked_a, chunked_b]);
203
204 let now = Clock::Real.instant();
205 Arc::new(Snapshot {
206 id: SnapshotId::Table {
207 table_id: TableId(1),
208 commit_version: CommitVersion(1),
209 },
210 source: SnapshotSource::Table {
211 table_id: TableId(1),
212 commit_version: CommitVersion(1),
213 },
214 namespace: "test".to_string(),
215 name: "t".to_string(),
216 created_at: now,
217 block,
218 })
219 }
220
221 #[test]
222 fn reader_returns_none_for_empty_snapshot() {
223 let snap = mk_snapshot(0);
224 let mut reader = SnapshotReader::new(snap, 4);
225 assert!(reader.next().is_none());
226 }
227
228 #[test]
229 fn reader_emits_batches_matching_batch_size() {
230 let snap = mk_snapshot(5);
231 let mut reader = SnapshotReader::new(snap, 2);
232
233 let batch = reader.next().expect("first batch").unwrap();
234 assert_eq!(batch.row_count(), 2);
235 assert_eq!(batch.row_numbers[0], RowNumber(0));
236 assert_eq!(batch.row_numbers[1], RowNumber(1));
237
238 let a = batch.column("a").unwrap();
239 assert_eq!(a.data().get_value(0).to_string(), "0");
240 assert_eq!(a.data().get_value(1).to_string(), "1");
241
242 let b = batch.column("b").unwrap();
243 assert_eq!(b.data().get_value(0).to_string(), "row-0");
244
245 let batch = reader.next().expect("second batch").unwrap();
246 assert_eq!(batch.row_count(), 2);
247 assert_eq!(batch.row_numbers[0], RowNumber(2));
248
249 let batch = reader.next().expect("final partial batch").unwrap();
250 assert_eq!(batch.row_count(), 1);
251 assert_eq!(batch.row_numbers[0], RowNumber(4));
252 assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
253
254 assert!(reader.next().is_none());
255 }
256
257 fn mk_chunked_snapshot(parts: &[&[i32]]) -> Arc<Snapshot> {
258 let chunks: Vec<Column> =
259 parts.iter().map(|p| array_from_column_data(&ColumnBuffer::int4(p.to_vec()))).collect();
260 let chunked_a = ColumnChunks::new(Type::Int4, false, chunks);
261 let schema = Arc::new(vec![("a".to_string(), Type::Int4, false)]);
262 let block = ColumnBlock::new(schema, vec![chunked_a]);
263 let now = Clock::Real.instant();
264 Arc::new(Snapshot {
265 id: SnapshotId::Table {
266 table_id: TableId(1),
267 commit_version: CommitVersion(1),
268 },
269 source: SnapshotSource::Table {
270 table_id: TableId(1),
271 commit_version: CommitVersion(1),
272 },
273 namespace: "test".to_string(),
274 name: "t".to_string(),
275 created_at: now,
276 block,
277 })
278 }
279
280 #[test]
281 fn reader_handles_multi_chunk_column() {
282 let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
283 let mut reader = SnapshotReader::new(snap, 100);
284 assert_eq!(reader.row_count(), 9);
285
286 let batch = reader.next().unwrap().unwrap();
287 assert_eq!(batch.row_count(), 9);
288 let a = batch.column("a").unwrap();
289 let actual: Vec<String> = (0..9).map(|i| a.data().get_value(i).to_string()).collect();
290 assert_eq!(actual, vec!["10", "20", "30", "40", "50", "60", "70", "80", "90"]);
291 assert!(reader.next().is_none());
292 }
293
294 #[test]
295 fn reader_batch_spans_chunk_boundary() {
296 let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
299 let mut reader = SnapshotReader::new(snap, 4);
300
301 let b0 = reader.next().unwrap().unwrap();
302 assert_eq!(b0.row_count(), 4);
303 let a = b0.column("a").unwrap();
304 let v0: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
305 assert_eq!(v0, vec!["10", "20", "30", "40"]);
306
307 let b1 = reader.next().unwrap().unwrap();
308 assert_eq!(b1.row_count(), 4);
309 let a = b1.column("a").unwrap();
310 let v1: Vec<String> = (0..4).map(|i| a.data().get_value(i).to_string()).collect();
311 assert_eq!(v1, vec!["50", "60", "70", "80"]);
312
313 let b2 = reader.next().unwrap().unwrap();
314 assert_eq!(b2.row_count(), 1);
315 assert_eq!(b2.column("a").unwrap().data().get_value(0).to_string(), "90");
316 assert!(reader.next().is_none());
317 }
318
319 #[test]
320 fn reader_batch_starts_mid_chunk() {
321 let snap = mk_chunked_snapshot(&[&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]);
323 let mut reader = SnapshotReader::new(snap, 3);
324
325 let b0 = reader.next().unwrap().unwrap();
326 assert_eq!(b0.row_count(), 3);
327 let b1 = reader.next().unwrap().unwrap();
328 assert_eq!(b1.row_count(), 3);
329 let a = b1.column("a").unwrap();
330 assert_eq!(a.data().get_value(0).to_string(), "4");
331 assert_eq!(a.data().get_value(2).to_string(), "6");
332 }
333
334 use reifydb_type::value::Value;
335
336 use crate::predicate::{ColRef, Predicate};
337
338 #[test]
339 fn pushdown_eq_predicate_keeps_only_matching_rows() {
340 let snap = mk_snapshot(5);
342 let p = Predicate::Eq(ColRef::from("a"), Value::Int4(3));
343 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
344
345 let batch = reader.next().expect("batch").unwrap();
346 assert_eq!(batch.row_count(), 1);
347 assert_eq!(batch.row_numbers[0], RowNumber(3));
348 assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "3");
349 assert_eq!(batch.column("b").unwrap().data().get_value(0).to_string(), "row-3");
350 assert!(reader.next().is_none());
351 }
352
353 #[test]
354 fn pushdown_filters_across_chunk_boundary() {
355 let snap = mk_chunked_snapshot(&[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]]);
359 let p = Predicate::In(ColRef::from("a"), vec![Value::Int4(30), Value::Int4(80)]);
360 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
361
362 let batch = reader.next().expect("batch").unwrap();
363 assert_eq!(batch.row_count(), 2);
364 let a = batch.column("a").unwrap();
365 assert_eq!(a.data().get_value(0).to_string(), "30");
366 assert_eq!(a.data().get_value(1).to_string(), "80");
367 assert_eq!(batch.row_numbers[0], RowNumber(2));
368 assert_eq!(batch.row_numbers[1], RowNumber(7));
369 assert!(reader.next().is_none());
370 }
371
372 #[test]
373 fn pushdown_skips_empty_batches() {
374 let snap = mk_snapshot(6);
378 let p = Predicate::Eq(ColRef::from("a"), Value::Int4(4));
379 let mut reader = SnapshotReader::new(snap, 2).with_predicate(p);
380
381 let batch = reader.next().expect("only matching batch").unwrap();
382 assert_eq!(batch.row_count(), 1);
383 assert_eq!(batch.row_numbers[0], RowNumber(4));
384 assert_eq!(batch.column("a").unwrap().data().get_value(0).to_string(), "4");
385 assert!(reader.next().is_none());
386 }
387
388 #[test]
389 fn pushdown_selection_all_passes_batch_through() {
390 let snap = mk_snapshot(5);
393 let p = Predicate::GtEq(ColRef::from("a"), Value::Int4(0));
394 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
395
396 let batch = reader.next().expect("batch").unwrap();
397 assert_eq!(batch.row_count(), 5);
398 let a = batch.column("a").unwrap();
399 let vals: Vec<String> = (0..5).map(|i| a.data().get_value(i).to_string()).collect();
400 assert_eq!(vals, vec!["0", "1", "2", "3", "4"]);
401 assert_eq!(batch.row_numbers[0], RowNumber(0));
402 assert_eq!(batch.row_numbers[4], RowNumber(4));
403 }
404
405 #[test]
406 fn pushdown_is_none_over_multi_chunk_nullable() {
407 let mut a = ColumnBuffer::int4_with_capacity(3);
409 a.push::<i32>(10);
410 a.push_none();
411 a.push::<i32>(30);
412 let mut b = ColumnBuffer::int4_with_capacity(3);
413 b.push::<i32>(40);
414 b.push_none();
415 b.push::<i32>(60);
416 let chunks = vec![array_from_column_data(&a), array_from_column_data(&b)];
417 let id_col = ColumnChunks::new(Type::Int4, true, chunks);
418 let schema = Arc::new(vec![("a".to_string(), Type::Int4, true)]);
419 let block = ColumnBlock::new(schema, vec![id_col]);
420 let now = Clock::Real.instant();
421 let snap = Arc::new(Snapshot {
422 id: SnapshotId::Table {
423 table_id: TableId(1),
424 commit_version: CommitVersion(1),
425 },
426 source: SnapshotSource::Table {
427 table_id: TableId(1),
428 commit_version: CommitVersion(1),
429 },
430 namespace: "test".to_string(),
431 name: "t".to_string(),
432 created_at: now,
433 block,
434 });
435
436 let p = Predicate::IsNone(ColRef::from("a"));
437 let mut reader = SnapshotReader::new(snap, 100).with_predicate(p);
438
439 let batch = reader.next().expect("batch").unwrap();
440 assert_eq!(batch.row_count(), 2);
441 assert_eq!(batch.row_numbers[0], RowNumber(1));
442 assert_eq!(batch.row_numbers[1], RowNumber(4));
443 }
444}