Skip to main content

reifydb_column/
snapshot.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::CommitVersion,
8	interface::catalog::id::{SeriesId, TableId},
9	value::column::data::Column,
10};
11use reifydb_runtime::context::clock::Instant;
12use reifydb_type::{Result, value::r#type::Type};
13
14use crate::bucket::{Bucket, BucketId};
15
16#[derive(Clone)]
17pub struct ColumnChunks {
18	pub ty: Type,
19	pub nullable: bool,
20	pub chunks: Vec<Column>,
21}
22
23impl ColumnChunks {
24	pub fn new(ty: Type, nullable: bool, chunks: Vec<Column>) -> Self {
25		Self {
26			ty,
27			nullable,
28			chunks,
29		}
30	}
31
32	pub fn single(ty: Type, nullable: bool, array: Column) -> Self {
33		Self {
34			ty,
35			nullable,
36			chunks: vec![array],
37		}
38	}
39
40	pub fn len(&self) -> usize {
41		self.chunks.iter().map(|c| c.len()).sum()
42	}
43
44	pub fn is_empty(&self) -> bool {
45		self.len() == 0
46	}
47
48	pub fn chunk_count(&self) -> usize {
49		self.chunks.len()
50	}
51
52	pub fn iter_range_chunks(&self, start: usize, end: usize) -> Vec<(usize, usize, usize)> {
53		debug_assert!(start <= end, "iter_range_chunks: start {start} > end {end}");
54		let mut out = Vec::new();
55		if start == end {
56			return out;
57		}
58		let mut chunk_offset = 0usize;
59		for (idx, chunk) in self.chunks.iter().enumerate() {
60			let chunk_len = chunk.len();
61			let chunk_end = chunk_offset + chunk_len;
62			if chunk_offset >= end {
63				break;
64			}
65			if chunk_end > start && chunk_len > 0 {
66				let intra_start = start.saturating_sub(chunk_offset);
67				let intra_end = (end - chunk_offset).min(chunk_len);
68				out.push((idx, intra_start, intra_end));
69			}
70			chunk_offset = chunk_end;
71		}
72		out
73	}
74}
75
76pub type Schema = Arc<Vec<(String, Type, bool)>>;
77
78#[derive(Clone)]
79pub struct ColumnBlock {
80	pub schema: Schema,
81	pub columns: Vec<ColumnChunks>,
82}
83
84impl ColumnBlock {
85	pub fn new(schema: Schema, columns: Vec<ColumnChunks>) -> Self {
86		debug_assert_eq!(schema.len(), columns.len(), "ColumnBlock::new: schema and columns length mismatch");
87		Self {
88			schema,
89			columns,
90		}
91	}
92
93	pub fn len(&self) -> usize {
94		self.columns.first().map(|c| c.len()).unwrap_or(0)
95	}
96
97	pub fn is_empty(&self) -> bool {
98		self.len() == 0
99	}
100
101	pub fn column_by_name(&self, name: &str) -> Option<(usize, &ColumnChunks)> {
102		self.schema.iter().position(|(n, _, _)| n == name).map(|i| (i, &self.columns[i]))
103	}
104
105	pub fn view_range(&self, start: usize, end: usize) -> Result<ColumnBlock> {
106		debug_assert!(start <= end, "view_range: start {start} > end {end}");
107		let mut sliced_columns = Vec::with_capacity(self.columns.len());
108		for column in &self.columns {
109			let ranges = column.iter_range_chunks(start, end);
110			let mut sliced_chunks = Vec::with_capacity(ranges.len());
111			for (idx, s, e) in ranges {
112				sliced_chunks.push(column.chunks[idx].slice(s, e)?);
113			}
114			sliced_columns.push(ColumnChunks::new(column.ty.clone(), column.nullable, sliced_chunks));
115		}
116		Ok(ColumnBlock::new(Arc::clone(&self.schema), sliced_columns))
117	}
118}
119
120#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
121pub enum SnapshotId {
122	Table {
123		table_id: TableId,
124		commit_version: CommitVersion,
125	},
126	Series {
127		series_id: SeriesId,
128		bucket: BucketId,
129	},
130}
131
132#[derive(Clone, Debug)]
133pub enum SnapshotSource {
134	Table {
135		table_id: TableId,
136		commit_version: CommitVersion,
137	},
138	Series {
139		series_id: SeriesId,
140		bucket: Bucket,
141		sequence_counter: u64,
142	},
143}
144
145#[derive(Clone)]
146pub struct Snapshot {
147	pub id: SnapshotId,
148	pub source: SnapshotSource,
149	pub namespace: String,
150	pub name: String,
151	pub created_at: Instant,
152	pub block: ColumnBlock,
153}
154
155impl Snapshot {
156	pub fn meta(&self) -> SnapshotMeta {
157		SnapshotMeta {
158			id: self.id,
159			namespace: self.namespace.clone(),
160			name: self.name.clone(),
161			created_at: self.created_at.clone(),
162			row_count: self.block.len(),
163		}
164	}
165}
166
167#[derive(Clone, Debug)]
168pub struct SnapshotMeta {
169	pub id: SnapshotId,
170	pub namespace: String,
171	pub name: String,
172	pub created_at: Instant,
173	pub row_count: usize,
174}
175
176#[cfg(test)]
177mod tests {
178	use reifydb_core::value::column::{buffer::ColumnBuffer, data::canonical::Canonical};
179	use reifydb_type::value::Value;
180
181	use super::*;
182
183	fn chunked_int4(parts: &[&[i32]]) -> ColumnChunks {
184		let chunks = parts
185			.iter()
186			.map(|p| {
187				Column::from_canonical(
188					Canonical::from_column_buffer(&ColumnBuffer::int4(p.to_vec())).unwrap(),
189				)
190			})
191			.collect();
192		ColumnChunks::new(Type::Int4, false, chunks)
193	}
194
195	#[test]
196	fn iter_range_chunks_single_chunk_covers_range() {
197		let ch = chunked_int4(&[&[1, 2, 3, 4, 5]]);
198		assert_eq!(ch.iter_range_chunks(1, 4), vec![(0, 1, 4)]);
199	}
200
201	#[test]
202	fn iter_range_chunks_spans_two_chunks() {
203		let ch = chunked_int4(&[&[1, 2, 3], &[4, 5, 6]]);
204		assert_eq!(ch.iter_range_chunks(2, 5), vec![(0, 2, 3), (1, 0, 2)]);
205	}
206
207	#[test]
208	fn iter_range_chunks_skips_chunks_outside_range() {
209		let ch = chunked_int4(&[&[1, 2], &[3, 4], &[5, 6]]);
210		assert_eq!(ch.iter_range_chunks(2, 4), vec![(1, 0, 2)]);
211	}
212
213	#[test]
214	fn iter_range_chunks_empty_range_yields_nothing() {
215		let ch = chunked_int4(&[&[1, 2, 3]]);
216		assert!(ch.iter_range_chunks(2, 2).is_empty());
217	}
218
219	#[test]
220	fn iter_range_chunks_full_block_walks_every_chunk() {
221		let ch = chunked_int4(&[&[1, 2], &[3, 4, 5], &[6]]);
222		assert_eq!(ch.iter_range_chunks(0, 6), vec![(0, 0, 2), (1, 0, 3), (2, 0, 1)]);
223	}
224
225	fn block_with_columns(named: &[(&str, &[&[i32]])]) -> ColumnBlock {
226		let schema: Schema =
227			Arc::new(named.iter().map(|(n, _)| ((*n).to_string(), Type::Int4, false)).collect());
228		let cols = named.iter().map(|(_, parts)| chunked_int4(parts)).collect();
229		ColumnBlock::new(schema, cols)
230	}
231
232	#[test]
233	fn view_range_empty_window_yields_empty_per_column() {
234		let block = block_with_columns(&[("a", &[&[1, 2, 3]])]);
235		let view = block.view_range(2, 2).unwrap();
236		assert_eq!(view.len(), 0);
237		assert_eq!(view.columns[0].chunks.len(), 0);
238	}
239
240	#[test]
241	fn view_range_spanning_chunks_preserves_total_length() {
242		let block = block_with_columns(&[("a", &[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]])]);
243		let view = block.view_range(2, 7).unwrap();
244		assert_eq!(view.len(), 5);
245		assert_eq!(view.schema.len(), 1);
246		// Three chunks contribute: chunk0[2..3] + chunk1[0..2] + chunk2[0..2].
247		assert_eq!(view.columns[0].chunks.len(), 3);
248		let vals: Vec<String> =
249			(0..view.columns[0].len()).map(|i| view.columns[0].chunks_value_at(i).to_string()).collect();
250		assert_eq!(vals, vec!["30", "40", "50", "60", "70"]);
251	}
252
253	#[test]
254	fn view_range_multi_column_aligns_per_column_lengths() {
255		let block = block_with_columns(&[("a", &[&[1, 2, 3, 4, 5]]), ("b", &[&[10, 20], &[30, 40, 50]])]);
256		let view = block.view_range(1, 4).unwrap();
257		assert_eq!(view.len(), 3);
258		assert_eq!(view.columns[0].len(), 3);
259		assert_eq!(view.columns[1].len(), 3);
260	}
261
262	impl ColumnChunks {
263		fn chunks_value_at(&self, mut idx: usize) -> Value {
264			for chunk in &self.chunks {
265				if idx < chunk.len() {
266					return chunk.get_value(idx);
267				}
268				idx -= chunk.len();
269			}
270			panic!("out of range");
271		}
272	}
273}