Skip to main content

reifydb_column/
snapshot.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::CommitVersion,
8	interface::catalog::id::{SeriesId, TableId},
9	value::column::array::Column,
10};
11use reifydb_runtime::context::clock::Instant;
12use reifydb_type::{Result, value::r#type::Type};
13
14use crate::bucket::{Bucket, BucketId};
15
16// A column as a sequence of `Column` chunks, each encoded independently. v1
17// materialization produces single-chunk `ColumnChunks`s; multi-chunk support
18// is reserved for the future batched-scan path.
19#[derive(Clone)]
20pub struct ColumnChunks {
21	pub ty: Type,
22	pub nullable: bool,
23	pub chunks: Vec<Column>,
24}
25
26impl ColumnChunks {
27	pub fn new(ty: Type, nullable: bool, chunks: Vec<Column>) -> Self {
28		Self {
29			ty,
30			nullable,
31			chunks,
32		}
33	}
34
35	pub fn single(ty: Type, nullable: bool, array: Column) -> Self {
36		Self {
37			ty,
38			nullable,
39			chunks: vec![array],
40		}
41	}
42
43	pub fn len(&self) -> usize {
44		self.chunks.iter().map(|c| c.len()).sum()
45	}
46
47	pub fn is_empty(&self) -> bool {
48		self.len() == 0
49	}
50
51	pub fn chunk_count(&self) -> usize {
52		self.chunks.len()
53	}
54
55	// Map a row range [start, end) onto the chunks that intersect it. Each entry is
56	// `(chunk_idx, intra_start, intra_end)` where `intra_*` are offsets within
57	// `chunks[chunk_idx]`. Empty chunks contribute nothing; empty ranges yield no
58	// entries. The chunk slice goes through `Column::slice`, so encoding-specific
59	// slice paths stay live - canonicalization only happens at the projection
60	// boundary in the caller.
61	pub fn iter_range_chunks(&self, start: usize, end: usize) -> Vec<(usize, usize, usize)> {
62		debug_assert!(start <= end, "iter_range_chunks: start {start} > end {end}");
63		let mut out = Vec::new();
64		if start == end {
65			return out;
66		}
67		let mut chunk_offset = 0usize;
68		for (idx, chunk) in self.chunks.iter().enumerate() {
69			let chunk_len = chunk.len();
70			let chunk_end = chunk_offset + chunk_len;
71			if chunk_offset >= end {
72				break;
73			}
74			if chunk_end > start && chunk_len > 0 {
75				let intra_start = start.saturating_sub(chunk_offset);
76				let intra_end = (end - chunk_offset).min(chunk_len);
77				out.push((idx, intra_start, intra_end));
78			}
79			chunk_offset = chunk_end;
80		}
81		out
82	}
83}
84
85pub type Schema = Arc<Vec<(String, Type, bool)>>;
86
87// The column container used by a `Snapshot` - a schema plus one
88// `ColumnChunks` per user column. The schema's tuple entries are
89// `(name, ty, nullable)` in positional order.
90#[derive(Clone)]
91pub struct ColumnBlock {
92	pub schema: Schema,
93	pub columns: Vec<ColumnChunks>,
94}
95
96impl ColumnBlock {
97	pub fn new(schema: Schema, columns: Vec<ColumnChunks>) -> Self {
98		debug_assert_eq!(schema.len(), columns.len(), "ColumnBlock::new: schema and columns length mismatch");
99		Self {
100			schema,
101			columns,
102		}
103	}
104
105	pub fn len(&self) -> usize {
106		self.columns.first().map(|c| c.len()).unwrap_or(0)
107	}
108
109	pub fn is_empty(&self) -> bool {
110		self.len() == 0
111	}
112
113	pub fn column_by_name(&self, name: &str) -> Option<(usize, &ColumnChunks)> {
114		self.schema.iter().position(|(n, _, _)| n == name).map(|i| (i, &self.columns[i]))
115	}
116
117	// Build a lightweight view of rows `[start, end)` by slicing each column's
118	// chunks via `Column::slice`. Schema is shared (Arc-bumped). For canonical
119	// encodings the slice is an Arc-bump on the underlying buffer; compressed
120	// encodings retain their compressed form, which is what makes batch-scoped
121	// predicate eval cheap. Used by `SnapshotReader` to evaluate a predicate
122	// against just the rows of the current batch.
123	pub fn view_range(&self, start: usize, end: usize) -> Result<ColumnBlock> {
124		debug_assert!(start <= end, "view_range: start {start} > end {end}");
125		let mut sliced_columns = Vec::with_capacity(self.columns.len());
126		for column in &self.columns {
127			let ranges = column.iter_range_chunks(start, end);
128			let mut sliced_chunks = Vec::with_capacity(ranges.len());
129			for (idx, s, e) in ranges {
130				sliced_chunks.push(column.chunks[idx].slice(s, e)?);
131			}
132			sliced_columns.push(ColumnChunks::new(column.ty.clone(), column.nullable, sliced_chunks));
133		}
134		Ok(ColumnBlock::new(Arc::clone(&self.schema), sliced_columns))
135	}
136}
137
138// Registry key. Disjoint keyspaces per shape: table snapshots are keyed by
139// `(table_id, commit_version)`, series by `(series_id, bucket)`. Bucket
140// replacement reuses the same `(series_id, bucket)` key, so late-arrival
141// re-materialization overwrites atomically via `DashMap::insert`.
142#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
143pub enum SnapshotId {
144	Table {
145		table_id: TableId,
146		commit_version: CommitVersion,
147	},
148	Series {
149		series_id: SeriesId,
150		bucket: BucketId,
151	},
152}
153
154// Provenance - what the snapshot was built from. The `Series` variant carries
155// the full `Bucket` (not just `BucketId`) and the `sequence_counter` observed
156// at materialization time. Readers use these to decide whether a snapshot is
157// still current vs. stale.
158#[derive(Clone, Debug)]
159pub enum SnapshotSource {
160	Table {
161		table_id: TableId,
162		commit_version: CommitVersion,
163	},
164	Series {
165		series_id: SeriesId,
166		bucket: Bucket,
167		sequence_counter: u64,
168	},
169}
170
171// A materialized columnar snapshot. Same `ColumnBlock` container for both
172// shapes - table and series snapshots differ only in their provenance and
173// keying.
174#[derive(Clone)]
175pub struct Snapshot {
176	pub id: SnapshotId,
177	pub source: SnapshotSource,
178	pub namespace: String,
179	pub name: String,
180	pub created_at: Instant,
181	pub block: ColumnBlock,
182}
183
184impl Snapshot {
185	pub fn meta(&self) -> SnapshotMeta {
186		SnapshotMeta {
187			id: self.id,
188			namespace: self.namespace.clone(),
189			name: self.name.clone(),
190			created_at: self.created_at.clone(),
191			row_count: self.block.len(),
192		}
193	}
194}
195
196// Lightweight listing record - the shape callers get from
197// `SnapshotRegistry::list()` without cloning the backing `ColumnBlock`.
198#[derive(Clone, Debug)]
199pub struct SnapshotMeta {
200	pub id: SnapshotId,
201	pub namespace: String,
202	pub name: String,
203	pub created_at: Instant,
204	pub row_count: usize,
205}
206
207#[cfg(test)]
208mod tests {
209	use reifydb_core::value::column::{array::canonical::Canonical, buffer::ColumnBuffer};
210	use reifydb_type::value::Value;
211
212	use super::*;
213
214	fn chunked_int4(parts: &[&[i32]]) -> ColumnChunks {
215		let chunks = parts
216			.iter()
217			.map(|p| {
218				Column::from_canonical(
219					Canonical::from_column_buffer(&ColumnBuffer::int4(p.to_vec())).unwrap(),
220				)
221			})
222			.collect();
223		ColumnChunks::new(Type::Int4, false, chunks)
224	}
225
226	#[test]
227	fn iter_range_chunks_single_chunk_covers_range() {
228		let ch = chunked_int4(&[&[1, 2, 3, 4, 5]]);
229		assert_eq!(ch.iter_range_chunks(1, 4), vec![(0, 1, 4)]);
230	}
231
232	#[test]
233	fn iter_range_chunks_spans_two_chunks() {
234		let ch = chunked_int4(&[&[1, 2, 3], &[4, 5, 6]]);
235		assert_eq!(ch.iter_range_chunks(2, 5), vec![(0, 2, 3), (1, 0, 2)]);
236	}
237
238	#[test]
239	fn iter_range_chunks_skips_chunks_outside_range() {
240		let ch = chunked_int4(&[&[1, 2], &[3, 4], &[5, 6]]);
241		assert_eq!(ch.iter_range_chunks(2, 4), vec![(1, 0, 2)]);
242	}
243
244	#[test]
245	fn iter_range_chunks_empty_range_yields_nothing() {
246		let ch = chunked_int4(&[&[1, 2, 3]]);
247		assert!(ch.iter_range_chunks(2, 2).is_empty());
248	}
249
250	#[test]
251	fn iter_range_chunks_full_block_walks_every_chunk() {
252		let ch = chunked_int4(&[&[1, 2], &[3, 4, 5], &[6]]);
253		assert_eq!(ch.iter_range_chunks(0, 6), vec![(0, 0, 2), (1, 0, 3), (2, 0, 1)]);
254	}
255
256	fn block_with_columns(named: &[(&str, &[&[i32]])]) -> ColumnBlock {
257		let schema: Schema =
258			Arc::new(named.iter().map(|(n, _)| ((*n).to_string(), Type::Int4, false)).collect());
259		let cols = named.iter().map(|(_, parts)| chunked_int4(parts)).collect();
260		ColumnBlock::new(schema, cols)
261	}
262
263	#[test]
264	fn view_range_empty_window_yields_empty_per_column() {
265		let block = block_with_columns(&[("a", &[&[1, 2, 3]])]);
266		let view = block.view_range(2, 2).unwrap();
267		assert_eq!(view.len(), 0);
268		assert_eq!(view.columns[0].chunks.len(), 0);
269	}
270
271	#[test]
272	fn view_range_spanning_chunks_preserves_total_length() {
273		let block = block_with_columns(&[("a", &[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]])]);
274		let view = block.view_range(2, 7).unwrap();
275		assert_eq!(view.len(), 5);
276		assert_eq!(view.schema.len(), 1);
277		// Three chunks contribute: chunk0[2..3] + chunk1[0..2] + chunk2[0..2].
278		assert_eq!(view.columns[0].chunks.len(), 3);
279		let vals: Vec<String> =
280			(0..view.columns[0].len()).map(|i| view.columns[0].chunks_value_at(i).to_string()).collect();
281		assert_eq!(vals, vec!["30", "40", "50", "60", "70"]);
282	}
283
284	#[test]
285	fn view_range_multi_column_aligns_per_column_lengths() {
286		let block = block_with_columns(&[("a", &[&[1, 2, 3, 4, 5]]), ("b", &[&[10, 20], &[30, 40, 50]])]);
287		let view = block.view_range(1, 4).unwrap();
288		assert_eq!(view.len(), 3);
289		assert_eq!(view.columns[0].len(), 3);
290		assert_eq!(view.columns[1].len(), 3);
291	}
292
293	impl ColumnChunks {
294		fn chunks_value_at(&self, mut idx: usize) -> Value {
295			for chunk in &self.chunks {
296				if idx < chunk.len() {
297					return chunk.get_value(idx);
298				}
299				idx -= chunk.len();
300			}
301			panic!("out of range");
302		}
303	}
304}