kyu_storage/
chunked_node_group.rs1use kyu_types::LogicalType;
7
8use crate::column_chunk::ColumnChunk;
9use crate::constants::CHUNKED_NODE_GROUP_CAPACITY;
10use crate::storage_types::{NodeGroupFormat, ResidencyState};
11
12pub struct ChunkedNodeGroup {
15 format: NodeGroupFormat,
16 residency_state: ResidencyState,
17 start_row_idx: u64,
18 capacity: u64,
19 num_rows: u64,
20 chunks: Vec<ColumnChunk>,
21}
22
23impl ChunkedNodeGroup {
24 pub fn new(data_types: &[LogicalType], start_row_idx: u64) -> Self {
26 Self::with_capacity(data_types, start_row_idx, CHUNKED_NODE_GROUP_CAPACITY)
27 }
28
29 pub fn with_capacity(data_types: &[LogicalType], start_row_idx: u64, capacity: u64) -> Self {
31 let chunks = data_types
32 .iter()
33 .map(|dt| ColumnChunk::new(dt.clone(), capacity))
34 .collect();
35 Self {
36 format: NodeGroupFormat::Regular,
37 residency_state: ResidencyState::InMemory,
38 start_row_idx,
39 capacity,
40 num_rows: 0,
41 chunks,
42 }
43 }
44
45 pub fn format(&self) -> NodeGroupFormat {
46 self.format
47 }
48
49 pub fn set_format(&mut self, format: NodeGroupFormat) {
50 self.format = format;
51 }
52
53 pub fn residency_state(&self) -> ResidencyState {
54 self.residency_state
55 }
56
57 pub fn start_row_idx(&self) -> u64 {
58 self.start_row_idx
59 }
60
61 pub fn capacity(&self) -> u64 {
62 self.capacity
63 }
64
65 pub fn num_rows(&self) -> u64 {
66 self.num_rows
67 }
68
69 pub fn num_columns(&self) -> usize {
70 self.chunks.len()
71 }
72
73 pub fn is_full(&self) -> bool {
74 self.num_rows >= self.capacity
75 }
76
77 pub fn remaining_capacity(&self) -> u64 {
78 self.capacity.saturating_sub(self.num_rows)
79 }
80
81 pub fn column(&self, idx: usize) -> &ColumnChunk {
82 &self.chunks[idx]
83 }
84
85 pub fn column_mut(&mut self, idx: usize) -> &mut ColumnChunk {
86 &mut self.chunks[idx]
87 }
88
89 pub fn append_row(&mut self, values: &[Option<&[u8]>]) -> u64 {
93 debug_assert!(!self.is_full());
94 debug_assert_eq!(values.len(), self.chunks.len());
95
96 let row = self.num_rows;
97 for (col_idx, value) in values.iter().enumerate() {
98 match value {
99 Some(bytes) => {
100 let chunk = &mut self.chunks[col_idx];
101 match chunk {
102 ColumnChunk::Fixed(c) => {
103 c.set_raw(row, bytes);
104 c.set_num_values(row + 1);
105 }
106 ColumnChunk::Bool(c) => {
107 let val = bytes[0] != 0;
108 c.set_bool(row, val);
109 c.set_num_values(row + 1);
110 }
111 ColumnChunk::String(c) => {
112 let s = std::str::from_utf8(bytes).unwrap_or("");
113 c.set_string(row, smol_str::SmolStr::new(s));
114 c.set_num_values(row + 1);
115 }
116 }
117 }
118 None => {
119 let chunk = &mut self.chunks[col_idx];
120 chunk.set_null(row, true);
121 chunk.set_num_values(row + 1);
122 }
123 }
124 }
125 self.num_rows += 1;
126 row
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133
134 #[test]
135 fn new_chunked_node_group() {
136 let types = vec![LogicalType::Int64, LogicalType::String];
137 let group = ChunkedNodeGroup::new(&types, 0);
138 assert_eq!(group.num_rows(), 0);
139 assert_eq!(group.capacity(), CHUNKED_NODE_GROUP_CAPACITY);
140 assert_eq!(group.num_columns(), 2);
141 assert!(!group.is_full());
142 assert_eq!(group.remaining_capacity(), CHUNKED_NODE_GROUP_CAPACITY);
143 }
144
145 #[test]
146 fn append_row_increments_count() {
147 let types = vec![LogicalType::Int32, LogicalType::Int64];
148 let mut group = ChunkedNodeGroup::new(&types, 0);
149
150 let val_i32: i32 = 42;
151 let val_i64: i64 = 100;
152 let row = group.append_row(&[Some(&val_i32.to_ne_bytes()), Some(&val_i64.to_ne_bytes())]);
153
154 assert_eq!(row, 0);
155 assert_eq!(group.num_rows(), 1);
156 }
157
158 #[test]
159 fn append_row_with_null() {
160 let types = vec![LogicalType::Int32, LogicalType::Int64];
161 let mut group = ChunkedNodeGroup::new(&types, 0);
162
163 let val_i32: i32 = 42;
164 group.append_row(&[Some(&val_i32.to_ne_bytes()), None]);
165
166 assert_eq!(group.num_rows(), 1);
167 assert!(!group.column(0).is_null(0));
168 assert!(group.column(1).is_null(0));
169 }
170
171 #[test]
172 fn append_multiple_rows() {
173 let types = vec![LogicalType::Int64];
174 let mut group = ChunkedNodeGroup::new(&types, 0);
175
176 for i in 0..10u64 {
177 let val = i as i64;
178 group.append_row(&[Some(&val.to_ne_bytes())]);
179 }
180 assert_eq!(group.num_rows(), 10);
181 assert_eq!(group.remaining_capacity(), CHUNKED_NODE_GROUP_CAPACITY - 10);
182 }
183
184 #[test]
185 fn with_custom_capacity() {
186 let types = vec![LogicalType::Int32];
187 let group = ChunkedNodeGroup::with_capacity(&types, 100, 16);
188 assert_eq!(group.capacity(), 16);
189 assert_eq!(group.start_row_idx(), 100);
190 }
191
192 #[test]
193 fn is_full_at_capacity() {
194 let types = vec![LogicalType::Int32];
195 let mut group = ChunkedNodeGroup::with_capacity(&types, 0, 2);
196
197 let v1: i32 = 1;
198 let v2: i32 = 2;
199 group.append_row(&[Some(&v1.to_ne_bytes())]);
200 assert!(!group.is_full());
201 group.append_row(&[Some(&v2.to_ne_bytes())]);
202 assert!(group.is_full());
203 }
204
205 #[test]
206 fn bool_column() {
207 let types = vec![LogicalType::Bool];
208 let mut group = ChunkedNodeGroup::new(&types, 0);
209
210 group.append_row(&[Some(&[1u8])]);
211 group.append_row(&[Some(&[0u8])]);
212 group.append_row(&[None]);
213
214 match group.column(0) {
215 ColumnChunk::Bool(c) => {
216 assert!(c.get_bool(0));
217 assert!(!c.get_bool(1));
218 assert!(c.is_null(2));
219 }
220 _ => panic!("expected Bool chunk"),
221 }
222 }
223
224 #[test]
225 fn column_access() {
226 let types = vec![LogicalType::Int32, LogicalType::Double];
227 let mut group = ChunkedNodeGroup::new(&types, 0);
228
229 let v1: i32 = 42;
230 let v2: f64 = 3.14;
231 group.append_row(&[Some(&v1.to_ne_bytes()), Some(&v2.to_ne_bytes())]);
232
233 assert_eq!(*group.column(0).data_type(), LogicalType::Int32);
234 assert_eq!(*group.column(1).data_type(), LogicalType::Double);
235 }
236
237 #[test]
238 fn format_default() {
239 let group = ChunkedNodeGroup::new(&[LogicalType::Int32], 0);
240 assert_eq!(group.format(), NodeGroupFormat::Regular);
241 }
242
243 #[test]
244 fn residency_state_default() {
245 let group = ChunkedNodeGroup::new(&[LogicalType::Int32], 0);
246 assert_eq!(group.residency_state(), ResidencyState::InMemory);
247 }
248}