1use std::sync::Arc;
5
6use reifydb_core::{
7 common::CommitVersion,
8 interface::catalog::id::{SeriesId, TableId},
9 value::column::data::Column,
10};
11use reifydb_runtime::context::clock::Instant;
12use reifydb_type::{Result, value::r#type::Type};
13
14use crate::bucket::{Bucket, BucketId};
15
16#[derive(Clone)]
17pub struct ColumnChunks {
18 pub ty: Type,
19 pub nullable: bool,
20 pub chunks: Vec<Column>,
21}
22
23impl ColumnChunks {
24 pub fn new(ty: Type, nullable: bool, chunks: Vec<Column>) -> Self {
25 Self {
26 ty,
27 nullable,
28 chunks,
29 }
30 }
31
32 pub fn single(ty: Type, nullable: bool, array: Column) -> Self {
33 Self {
34 ty,
35 nullable,
36 chunks: vec![array],
37 }
38 }
39
40 pub fn len(&self) -> usize {
41 self.chunks.iter().map(|c| c.len()).sum()
42 }
43
44 pub fn is_empty(&self) -> bool {
45 self.len() == 0
46 }
47
48 pub fn chunk_count(&self) -> usize {
49 self.chunks.len()
50 }
51
52 pub fn iter_range_chunks(&self, start: usize, end: usize) -> Vec<(usize, usize, usize)> {
53 debug_assert!(start <= end, "iter_range_chunks: start {start} > end {end}");
54 let mut out = Vec::new();
55 if start == end {
56 return out;
57 }
58 let mut chunk_offset = 0usize;
59 for (idx, chunk) in self.chunks.iter().enumerate() {
60 let chunk_len = chunk.len();
61 let chunk_end = chunk_offset + chunk_len;
62 if chunk_offset >= end {
63 break;
64 }
65 if chunk_end > start && chunk_len > 0 {
66 let intra_start = start.saturating_sub(chunk_offset);
67 let intra_end = (end - chunk_offset).min(chunk_len);
68 out.push((idx, intra_start, intra_end));
69 }
70 chunk_offset = chunk_end;
71 }
72 out
73 }
74}
75
76pub type Schema = Arc<Vec<(String, Type, bool)>>;
77
78#[derive(Clone)]
79pub struct ColumnBlock {
80 pub schema: Schema,
81 pub columns: Vec<ColumnChunks>,
82}
83
84impl ColumnBlock {
85 pub fn new(schema: Schema, columns: Vec<ColumnChunks>) -> Self {
86 debug_assert_eq!(schema.len(), columns.len(), "ColumnBlock::new: schema and columns length mismatch");
87 Self {
88 schema,
89 columns,
90 }
91 }
92
93 pub fn len(&self) -> usize {
94 self.columns.first().map(|c| c.len()).unwrap_or(0)
95 }
96
97 pub fn is_empty(&self) -> bool {
98 self.len() == 0
99 }
100
101 pub fn column_by_name(&self, name: &str) -> Option<(usize, &ColumnChunks)> {
102 self.schema.iter().position(|(n, _, _)| n == name).map(|i| (i, &self.columns[i]))
103 }
104
105 pub fn view_range(&self, start: usize, end: usize) -> Result<ColumnBlock> {
106 debug_assert!(start <= end, "view_range: start {start} > end {end}");
107 let mut sliced_columns = Vec::with_capacity(self.columns.len());
108 for column in &self.columns {
109 let ranges = column.iter_range_chunks(start, end);
110 let mut sliced_chunks = Vec::with_capacity(ranges.len());
111 for (idx, s, e) in ranges {
112 sliced_chunks.push(column.chunks[idx].slice(s, e)?);
113 }
114 sliced_columns.push(ColumnChunks::new(column.ty.clone(), column.nullable, sliced_chunks));
115 }
116 Ok(ColumnBlock::new(Arc::clone(&self.schema), sliced_columns))
117 }
118}
119
120#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
121pub enum SnapshotId {
122 Table {
123 table_id: TableId,
124 commit_version: CommitVersion,
125 },
126 Series {
127 series_id: SeriesId,
128 bucket: BucketId,
129 },
130}
131
132#[derive(Clone, Debug)]
133pub enum SnapshotSource {
134 Table {
135 table_id: TableId,
136 commit_version: CommitVersion,
137 },
138 Series {
139 series_id: SeriesId,
140 bucket: Bucket,
141 sequence_counter: u64,
142 },
143}
144
145#[derive(Clone)]
146pub struct Snapshot {
147 pub id: SnapshotId,
148 pub source: SnapshotSource,
149 pub namespace: String,
150 pub name: String,
151 pub created_at: Instant,
152 pub block: ColumnBlock,
153}
154
155impl Snapshot {
156 pub fn meta(&self) -> SnapshotMeta {
157 SnapshotMeta {
158 id: self.id,
159 namespace: self.namespace.clone(),
160 name: self.name.clone(),
161 created_at: self.created_at.clone(),
162 row_count: self.block.len(),
163 }
164 }
165}
166
167#[derive(Clone, Debug)]
168pub struct SnapshotMeta {
169 pub id: SnapshotId,
170 pub namespace: String,
171 pub name: String,
172 pub created_at: Instant,
173 pub row_count: usize,
174}
175
176#[cfg(test)]
177mod tests {
178 use reifydb_core::value::column::{buffer::ColumnBuffer, data::canonical::Canonical};
179 use reifydb_type::value::Value;
180
181 use super::*;
182
183 fn chunked_int4(parts: &[&[i32]]) -> ColumnChunks {
184 let chunks = parts
185 .iter()
186 .map(|p| {
187 Column::from_canonical(
188 Canonical::from_column_buffer(&ColumnBuffer::int4(p.to_vec())).unwrap(),
189 )
190 })
191 .collect();
192 ColumnChunks::new(Type::Int4, false, chunks)
193 }
194
195 #[test]
196 fn iter_range_chunks_single_chunk_covers_range() {
197 let ch = chunked_int4(&[&[1, 2, 3, 4, 5]]);
198 assert_eq!(ch.iter_range_chunks(1, 4), vec![(0, 1, 4)]);
199 }
200
201 #[test]
202 fn iter_range_chunks_spans_two_chunks() {
203 let ch = chunked_int4(&[&[1, 2, 3], &[4, 5, 6]]);
204 assert_eq!(ch.iter_range_chunks(2, 5), vec![(0, 2, 3), (1, 0, 2)]);
205 }
206
207 #[test]
208 fn iter_range_chunks_skips_chunks_outside_range() {
209 let ch = chunked_int4(&[&[1, 2], &[3, 4], &[5, 6]]);
210 assert_eq!(ch.iter_range_chunks(2, 4), vec![(1, 0, 2)]);
211 }
212
213 #[test]
214 fn iter_range_chunks_empty_range_yields_nothing() {
215 let ch = chunked_int4(&[&[1, 2, 3]]);
216 assert!(ch.iter_range_chunks(2, 2).is_empty());
217 }
218
219 #[test]
220 fn iter_range_chunks_full_block_walks_every_chunk() {
221 let ch = chunked_int4(&[&[1, 2], &[3, 4, 5], &[6]]);
222 assert_eq!(ch.iter_range_chunks(0, 6), vec![(0, 0, 2), (1, 0, 3), (2, 0, 1)]);
223 }
224
225 fn block_with_columns(named: &[(&str, &[&[i32]])]) -> ColumnBlock {
226 let schema: Schema =
227 Arc::new(named.iter().map(|(n, _)| ((*n).to_string(), Type::Int4, false)).collect());
228 let cols = named.iter().map(|(_, parts)| chunked_int4(parts)).collect();
229 ColumnBlock::new(schema, cols)
230 }
231
232 #[test]
233 fn view_range_empty_window_yields_empty_per_column() {
234 let block = block_with_columns(&[("a", &[&[1, 2, 3]])]);
235 let view = block.view_range(2, 2).unwrap();
236 assert_eq!(view.len(), 0);
237 assert_eq!(view.columns[0].chunks.len(), 0);
238 }
239
240 #[test]
241 fn view_range_spanning_chunks_preserves_total_length() {
242 let block = block_with_columns(&[("a", &[&[10, 20, 30], &[40, 50], &[60, 70, 80, 90]])]);
243 let view = block.view_range(2, 7).unwrap();
244 assert_eq!(view.len(), 5);
245 assert_eq!(view.schema.len(), 1);
246 assert_eq!(view.columns[0].chunks.len(), 3);
248 let vals: Vec<String> =
249 (0..view.columns[0].len()).map(|i| view.columns[0].chunks_value_at(i).to_string()).collect();
250 assert_eq!(vals, vec!["30", "40", "50", "60", "70"]);
251 }
252
253 #[test]
254 fn view_range_multi_column_aligns_per_column_lengths() {
255 let block = block_with_columns(&[("a", &[&[1, 2, 3, 4, 5]]), ("b", &[&[10, 20], &[30, 40, 50]])]);
256 let view = block.view_range(1, 4).unwrap();
257 assert_eq!(view.len(), 3);
258 assert_eq!(view.columns[0].len(), 3);
259 assert_eq!(view.columns[1].len(), 3);
260 }
261
262 impl ColumnChunks {
263 fn chunks_value_at(&self, mut idx: usize) -> Value {
264 for chunk in &self.chunks {
265 if idx < chunk.len() {
266 return chunk.get_value(idx);
267 }
268 idx -= chunk.len();
269 }
270 panic!("out of range");
271 }
272 }
273}