Skip to main content

kyu_storage/
chunked_node_group.rs

1//! ChunkedNodeGroup: groups column chunks for a batch of rows.
2//!
3//! Each ChunkedNodeGroup holds up to `CHUNKED_NODE_GROUP_CAPACITY` (2,048) rows
4//! across all columns.
5
6use kyu_types::LogicalType;
7
8use crate::column_chunk::ColumnChunk;
9use crate::constants::CHUNKED_NODE_GROUP_CAPACITY;
10use crate::storage_types::{NodeGroupFormat, ResidencyState};
11
12/// A group of column chunks, one per column, holding up to
13/// `CHUNKED_NODE_GROUP_CAPACITY` rows.
14pub struct ChunkedNodeGroup {
15    format: NodeGroupFormat,
16    residency_state: ResidencyState,
17    start_row_idx: u64,
18    capacity: u64,
19    num_rows: u64,
20    chunks: Vec<ColumnChunk>,
21}
22
23impl ChunkedNodeGroup {
24    /// Create a new ChunkedNodeGroup with the given column types.
25    pub fn new(data_types: &[LogicalType], start_row_idx: u64) -> Self {
26        Self::with_capacity(data_types, start_row_idx, CHUNKED_NODE_GROUP_CAPACITY)
27    }
28
29    /// Create a ChunkedNodeGroup with a custom capacity.
30    pub fn with_capacity(data_types: &[LogicalType], start_row_idx: u64, capacity: u64) -> Self {
31        let chunks = data_types
32            .iter()
33            .map(|dt| ColumnChunk::new(dt.clone(), capacity))
34            .collect();
35        Self {
36            format: NodeGroupFormat::Regular,
37            residency_state: ResidencyState::InMemory,
38            start_row_idx,
39            capacity,
40            num_rows: 0,
41            chunks,
42        }
43    }
44
45    pub fn format(&self) -> NodeGroupFormat {
46        self.format
47    }
48
49    pub fn set_format(&mut self, format: NodeGroupFormat) {
50        self.format = format;
51    }
52
53    pub fn residency_state(&self) -> ResidencyState {
54        self.residency_state
55    }
56
57    pub fn start_row_idx(&self) -> u64 {
58        self.start_row_idx
59    }
60
61    pub fn capacity(&self) -> u64 {
62        self.capacity
63    }
64
65    pub fn num_rows(&self) -> u64 {
66        self.num_rows
67    }
68
69    pub fn num_columns(&self) -> usize {
70        self.chunks.len()
71    }
72
73    pub fn is_full(&self) -> bool {
74        self.num_rows >= self.capacity
75    }
76
77    pub fn remaining_capacity(&self) -> u64 {
78        self.capacity.saturating_sub(self.num_rows)
79    }
80
81    pub fn column(&self, idx: usize) -> &ColumnChunk {
82        &self.chunks[idx]
83    }
84
85    pub fn column_mut(&mut self, idx: usize) -> &mut ColumnChunk {
86        &mut self.chunks[idx]
87    }
88
89    /// Append a row of raw byte values across all columns.
90    /// `values[i]` is `Some(bytes)` for a non-null value, `None` for null.
91    /// Returns the local row index within this chunked group.
92    pub fn append_row(&mut self, values: &[Option<&[u8]>]) -> u64 {
93        debug_assert!(!self.is_full());
94        debug_assert_eq!(values.len(), self.chunks.len());
95
96        let row = self.num_rows;
97        for (col_idx, value) in values.iter().enumerate() {
98            match value {
99                Some(bytes) => {
100                    let chunk = &mut self.chunks[col_idx];
101                    match chunk {
102                        ColumnChunk::Fixed(c) => {
103                            c.set_raw(row, bytes);
104                            c.set_num_values(row + 1);
105                        }
106                        ColumnChunk::Bool(c) => {
107                            let val = bytes[0] != 0;
108                            c.set_bool(row, val);
109                            c.set_num_values(row + 1);
110                        }
111                        ColumnChunk::String(c) => {
112                            let s = std::str::from_utf8(bytes).unwrap_or("");
113                            c.set_string(row, smol_str::SmolStr::new(s));
114                            c.set_num_values(row + 1);
115                        }
116                    }
117                }
118                None => {
119                    let chunk = &mut self.chunks[col_idx];
120                    chunk.set_null(row, true);
121                    chunk.set_num_values(row + 1);
122                }
123            }
124        }
125        self.num_rows += 1;
126        row
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn new_chunked_node_group() {
136        let types = vec![LogicalType::Int64, LogicalType::String];
137        let group = ChunkedNodeGroup::new(&types, 0);
138        assert_eq!(group.num_rows(), 0);
139        assert_eq!(group.capacity(), CHUNKED_NODE_GROUP_CAPACITY);
140        assert_eq!(group.num_columns(), 2);
141        assert!(!group.is_full());
142        assert_eq!(group.remaining_capacity(), CHUNKED_NODE_GROUP_CAPACITY);
143    }
144
145    #[test]
146    fn append_row_increments_count() {
147        let types = vec![LogicalType::Int32, LogicalType::Int64];
148        let mut group = ChunkedNodeGroup::new(&types, 0);
149
150        let val_i32: i32 = 42;
151        let val_i64: i64 = 100;
152        let row = group.append_row(&[Some(&val_i32.to_ne_bytes()), Some(&val_i64.to_ne_bytes())]);
153
154        assert_eq!(row, 0);
155        assert_eq!(group.num_rows(), 1);
156    }
157
158    #[test]
159    fn append_row_with_null() {
160        let types = vec![LogicalType::Int32, LogicalType::Int64];
161        let mut group = ChunkedNodeGroup::new(&types, 0);
162
163        let val_i32: i32 = 42;
164        group.append_row(&[Some(&val_i32.to_ne_bytes()), None]);
165
166        assert_eq!(group.num_rows(), 1);
167        assert!(!group.column(0).is_null(0));
168        assert!(group.column(1).is_null(0));
169    }
170
171    #[test]
172    fn append_multiple_rows() {
173        let types = vec![LogicalType::Int64];
174        let mut group = ChunkedNodeGroup::new(&types, 0);
175
176        for i in 0..10u64 {
177            let val = i as i64;
178            group.append_row(&[Some(&val.to_ne_bytes())]);
179        }
180        assert_eq!(group.num_rows(), 10);
181        assert_eq!(group.remaining_capacity(), CHUNKED_NODE_GROUP_CAPACITY - 10);
182    }
183
184    #[test]
185    fn with_custom_capacity() {
186        let types = vec![LogicalType::Int32];
187        let group = ChunkedNodeGroup::with_capacity(&types, 100, 16);
188        assert_eq!(group.capacity(), 16);
189        assert_eq!(group.start_row_idx(), 100);
190    }
191
192    #[test]
193    fn is_full_at_capacity() {
194        let types = vec![LogicalType::Int32];
195        let mut group = ChunkedNodeGroup::with_capacity(&types, 0, 2);
196
197        let v1: i32 = 1;
198        let v2: i32 = 2;
199        group.append_row(&[Some(&v1.to_ne_bytes())]);
200        assert!(!group.is_full());
201        group.append_row(&[Some(&v2.to_ne_bytes())]);
202        assert!(group.is_full());
203    }
204
205    #[test]
206    fn bool_column() {
207        let types = vec![LogicalType::Bool];
208        let mut group = ChunkedNodeGroup::new(&types, 0);
209
210        group.append_row(&[Some(&[1u8])]);
211        group.append_row(&[Some(&[0u8])]);
212        group.append_row(&[None]);
213
214        match group.column(0) {
215            ColumnChunk::Bool(c) => {
216                assert!(c.get_bool(0));
217                assert!(!c.get_bool(1));
218                assert!(c.is_null(2));
219            }
220            _ => panic!("expected Bool chunk"),
221        }
222    }
223
224    #[test]
225    fn column_access() {
226        let types = vec![LogicalType::Int32, LogicalType::Double];
227        let mut group = ChunkedNodeGroup::new(&types, 0);
228
229        let v1: i32 = 42;
230        let v2: f64 = 3.14;
231        group.append_row(&[Some(&v1.to_ne_bytes()), Some(&v2.to_ne_bytes())]);
232
233        assert_eq!(*group.column(0).data_type(), LogicalType::Int32);
234        assert_eq!(*group.column(1).data_type(), LogicalType::Double);
235    }
236
237    #[test]
238    fn format_default() {
239        let group = ChunkedNodeGroup::new(&[LogicalType::Int32], 0);
240        assert_eq!(group.format(), NodeGroupFormat::Regular);
241    }
242
243    #[test]
244    fn residency_state_default() {
245        let group = ChunkedNodeGroup::new(&[LogicalType::Int32], 0);
246        assert_eq!(group.residency_state(), ResidencyState::InMemory);
247    }
248}