1use std::sync::Arc;
5
6use reifydb_core::{
7 common::CommitVersion,
8 interface::catalog::id::{SeriesId, TableId},
9 value::column::array::Column,
10};
11use reifydb_runtime::context::clock::Instant;
12use reifydb_type::{Result, value::r#type::Type};
13
14use crate::bucket::{Bucket, BucketId};
15
16#[derive(Clone)]
20pub struct ColumnChunks {
21 pub ty: Type,
22 pub nullable: bool,
23 pub chunks: Vec<Column>,
24}
25
26impl ColumnChunks {
27 pub fn new(ty: Type, nullable: bool, chunks: Vec<Column>) -> Self {
28 Self {
29 ty,
30 nullable,
31 chunks,
32 }
33 }
34
35 pub fn single(ty: Type, nullable: bool, array: Column) -> Self {
36 Self {
37 ty,
38 nullable,
39 chunks: vec![array],
40 }
41 }
42
43 pub fn len(&self) -> usize {
44 self.chunks.iter().map(|c| c.len()).sum()
45 }
46
47 pub fn is_empty(&self) -> bool {
48 self.len() == 0
49 }
50
51 pub fn chunk_count(&self) -> usize {
52 self.chunks.len()
53 }
54
55 pub fn iter_range_chunks(&self, start: usize, end: usize) -> Vec<(usize, usize, usize)> {
62 debug_assert!(start <= end, "iter_range_chunks: start {start} > end {end}");
63 let mut out = Vec::new();
64 if start == end {
65 return out;
66 }
67 let mut chunk_offset = 0usize;
68 for (idx, chunk) in self.chunks.iter().enumerate() {
69 let chunk_len = chunk.len();
70 let chunk_end = chunk_offset + chunk_len;
71 if chunk_offset >= end {
72 break;
73 }
74 if chunk_end > start && chunk_len > 0 {
75 let intra_start = start.saturating_sub(chunk_offset);
76 let intra_end = (end - chunk_offset).min(chunk_len);
77 out.push((idx, intra_start, intra_end));
78 }
79 chunk_offset = chunk_end;
80 }
81 out
82 }
83}
84
85pub type Schema = Arc<Vec<(String, Type, bool)>>;
86
87#[derive(Clone)]
91pub struct ColumnBlock {
92 pub schema: Schema,
93 pub columns: Vec<ColumnChunks>,
94}
95
96impl ColumnBlock {
97 pub fn new(schema: Schema, columns: Vec<ColumnChunks>) -> Self {
98 debug_assert_eq!(schema.len(), columns.len(), "ColumnBlock::new: schema and columns length mismatch");
99 Self {
100 schema,
101 columns,
102 }
103 }
104
105 pub fn len(&self) -> usize {
106 self.columns.first().map(|c| c.len()).unwrap_or(0)
107 }
108
109 pub fn is_empty(&self) -> bool {
110 self.len() == 0
111 }
112
113 pub fn column_by_name(&self, name: &str) -> Option<(usize, &ColumnChunks)> {
114 self.schema.iter().position(|(n, _, _)| n == name).map(|i| (i, &self.columns[i]))
115 }
116
117 pub fn view_range(&self, start: usize, end: usize) -> Result<ColumnBlock> {
124 debug_assert!(start <= end, "view_range: start {start} > end {end}");
125 let mut sliced_columns = Vec::with_capacity(self.columns.len());
126 for column in &self.columns {
127 let ranges = column.iter_range_chunks(start, end);
128 let mut sliced_chunks = Vec::with_capacity(ranges.len());
129 for (idx, s, e) in ranges {
130 sliced_chunks.push(column.chunks[idx].slice(s, e)?);
131 }
132 sliced_columns.push(ColumnChunks::new(column.ty.clone(), column.nullable, sliced_chunks));
133 }
134 Ok(ColumnBlock::new(Arc::clone(&self.schema), sliced_columns))
135 }
136}
137
138#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
143pub enum SnapshotId {
144 Table {
145 table_id: TableId,
146 commit_version: CommitVersion,
147 },
148 Series {
149 series_id: SeriesId,
150 bucket: BucketId,
151 },
152}
153
154#[derive(Clone, Debug)]
159pub enum SnapshotSource {
160 Table {
161 table_id: TableId,
162 commit_version: CommitVersion,
163 },
164 Series {
165 series_id: SeriesId,
166 bucket: Bucket,
167 sequence_counter: u64,
168 },
169}
170
171#[derive(Clone)]
175pub struct Snapshot {
176 pub id: SnapshotId,
177 pub source: SnapshotSource,
178 pub namespace: String,
179 pub name: String,
180 pub created_at: Instant,
181 pub block: ColumnBlock,
182}
183
184impl Snapshot {
185 pub fn meta(&self) -> SnapshotMeta {
186 SnapshotMeta {
187 id: self.id,
188 namespace: self.namespace.clone(),
189 name: self.name.clone(),
190 created_at: self.created_at.clone(),
191 row_count: self.block.len(),
192 }
193 }
194}
195
196#[derive(Clone, Debug)]
199pub struct SnapshotMeta {
200 pub id: SnapshotId,
201 pub namespace: String,
202 pub name: String,
203 pub created_at: Instant,
204 pub row_count: usize,
205}
206
207#[cfg(test)]
208mod tests {
209 use reifydb_core::value::column::{array::canonical::Canonical, buffer::ColumnBuffer};
210 use reifydb_type::value::Value;
211
212 use super::*;
213
214 fn chunked_int4(parts: &[&[i32]]) -> ColumnChunks {
215 let chunks = parts
216 .iter()
217 .map(|p| {
218 Column::from_canonical(
219 Canonical::from_column_buffer(&ColumnBuffer::int4(p.to_vec())).unwrap(),
220 )
221 })
222 .collect();
223 ColumnChunks::new(Type::Int4, false, chunks)
224 }
225
226 #[test]
227 fn iter_range_chunks_single_chunk_covers_range() {
228 let ch = chunked_int4(&[&[1, 2, 3, 4, 5]]);
229 assert_eq!(ch.iter_range_chunks(1, 4), vec![(0, 1, 4)]);
230 }
231
232 #[test]
233 fn iter_range_chunks_spans_two_chunks() {
234 let ch = chunked_int4(&[&[1, 2, 3], &[4, 5, 6]]);
235 assert_eq!(ch.iter_range_chunks(2, 5), vec![(0, 2, 3), (1, 0, 2)]);
236 }
237
238 #[test]
239 fn iter_range_chunks_skips_chunks_outside_range() {
240 let ch = chunked_int4(&[&[1, 2], &[3, 4], &[5, 6]]);
241 assert_eq!(ch.iter_range_chunks(2, 4), vec![(1, 0, 2)]);
242 }
243
244 #[test]
245 fn iter_range_chunks_empty_range_yields_nothing() {
246 let ch = chunked_int4(&[&[1, 2, 3]]);
247 assert!(ch.iter_range_chunks(2, 2).is_empty());
248 }
249
250 #[test]
251 fn iter_range_chunks_full_block_walks_every_chunk() {
252 let ch = chunked_int4(&[&[1, 2], &[3, 4, 5], &[6]]);
253 assert_eq!(ch.iter_range_chunks(0, 6), vec![(0, 0, 2), (1, 0, 3), (2, 0, 1)]);
254 }
255
256 fn block_with_columns(named: &[(&str, &[&[i32]])]) -> ColumnBlock {
257 let schema: Schema =
258 Arc::new(named.iter().map(|(n, _)| ((*n).to_string(), Type::Int4, false)).collect());
259 let cols = named.iter().map(|(_, parts)| chunked_int4(parts)).collect();
260 ColumnBlock::new(schema, cols)
261 }
262
263 #[test]
264 fn view_range_empty_window_yields_empty_per_column() {
265 let block = block_with_columns(&[("a", &[&[1, 2, 3]])]);
266 let view = block.view_range(2, 2).unwrap();
267 assert_eq!(view.len(), 0);
268 assert_eq!(view.columns[0].chunks.len(), 0);
269 }
270
271 #[test]
272 fn view_range_spanning_chunks_preserves_total_length() {
273 let block = block_with_columns(&[("a", &[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]])]);
274 let view = block.view_range(2, 7).unwrap();
275 assert_eq!(view.len(), 5);
276 assert_eq!(view.schema.len(), 1);
277 assert_eq!(view.columns[0].chunks.len(), 3);
279 let vals: Vec<String> =
280 (0..view.columns[0].len()).map(|i| view.columns[0].chunks_value_at(i).to_string()).collect();
281 assert_eq!(vals, vec!["30", "40", "50", "60", "70"]);
282 }
283
284 #[test]
285 fn view_range_multi_column_aligns_per_column_lengths() {
286 let block = block_with_columns(&[("a", &[&[1, 2, 3, 4, 5]]), ("b", &[&[10, 20], &[30, 40, 50]])]);
287 let view = block.view_range(1, 4).unwrap();
288 assert_eq!(view.len(), 3);
289 assert_eq!(view.columns[0].len(), 3);
290 assert_eq!(view.columns[1].len(), 3);
291 }
292
293 impl ColumnChunks {
294 fn chunks_value_at(&self, mut idx: usize) -> Value {
295 for chunk in &self.chunks {
296 if idx < chunk.len() {
297 return chunk.get_value(idx);
298 }
299 idx -= chunk.len();
300 }
301 panic!("out of range");
302 }
303 }
304}