kyu_storage/
node_group.rs1use kyu_types::LogicalType;
4
5use crate::chunked_node_group::ChunkedNodeGroup;
6use crate::constants::{CHUNKED_NODE_GROUP_CAPACITY, NODE_GROUP_SIZE};
7use crate::storage_types::NodeGroupFormat;
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
11pub struct NodeGroupIdx(pub u64);
12
13pub struct NodeGroup {
16 node_group_idx: NodeGroupIdx,
17 format: NodeGroupFormat,
18 data_types: Vec<LogicalType>,
19 num_rows: u64,
20 capacity: u64,
21 chunked_groups: Vec<ChunkedNodeGroup>,
22}
23
24impl NodeGroup {
25 pub fn new(node_group_idx: NodeGroupIdx, data_types: Vec<LogicalType>) -> Self {
27 Self::with_capacity(node_group_idx, data_types, NODE_GROUP_SIZE)
28 }
29
30 pub fn with_capacity(
32 node_group_idx: NodeGroupIdx,
33 data_types: Vec<LogicalType>,
34 capacity: u64,
35 ) -> Self {
36 Self {
37 node_group_idx,
38 format: NodeGroupFormat::Regular,
39 data_types,
40 num_rows: 0,
41 capacity,
42 chunked_groups: Vec::new(),
43 }
44 }
45
46 pub fn node_group_idx(&self) -> NodeGroupIdx {
47 self.node_group_idx
48 }
49
50 pub fn format(&self) -> NodeGroupFormat {
51 self.format
52 }
53
54 pub fn set_format(&mut self, format: NodeGroupFormat) {
55 self.format = format;
56 }
57
58 pub fn data_types(&self) -> &[LogicalType] {
59 &self.data_types
60 }
61
62 pub fn num_rows(&self) -> u64 {
63 self.num_rows
64 }
65
66 pub fn capacity(&self) -> u64 {
67 self.capacity
68 }
69
70 pub fn is_full(&self) -> bool {
71 self.num_rows >= self.capacity
72 }
73
74 pub fn num_chunked_groups(&self) -> usize {
75 self.chunked_groups.len()
76 }
77
78 pub fn chunked_group(&self, idx: usize) -> &ChunkedNodeGroup {
79 &self.chunked_groups[idx]
80 }
81
82 pub fn chunked_group_mut(&mut self, idx: usize) -> &mut ChunkedNodeGroup {
83 &mut self.chunked_groups[idx]
84 }
85
86 pub fn global_row_to_chunked_group(&self, row: u64) -> (usize, u64) {
88 let chunk_capacity = self
89 .chunked_groups
90 .first()
91 .map(|g| g.capacity())
92 .unwrap_or(CHUNKED_NODE_GROUP_CAPACITY);
93 let group_idx = (row / chunk_capacity) as usize;
94 let local_row = row % chunk_capacity;
95 (group_idx, local_row)
96 }
97
98 pub fn append_row(&mut self, values: &[Option<&[u8]>]) -> u64 {
102 debug_assert!(!self.is_full());
103
104 let needs_new = self.chunked_groups.last().is_none_or(|g| g.is_full());
106
107 if needs_new {
108 let start = self.num_rows;
109 let remaining = self.capacity - self.num_rows;
110 let cap = remaining.min(CHUNKED_NODE_GROUP_CAPACITY);
111 self.chunked_groups.push(ChunkedNodeGroup::with_capacity(
112 &self.data_types,
113 start,
114 cap,
115 ));
116 }
117
118 let last = self.chunked_groups.last_mut().unwrap();
119 last.append_row(values);
120 let global_row = self.num_rows;
121 self.num_rows += 1;
122 global_row
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn new_node_group() {
132 let ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int64]);
133 assert_eq!(ng.node_group_idx(), NodeGroupIdx(0));
134 assert_eq!(ng.capacity(), NODE_GROUP_SIZE);
135 assert_eq!(ng.num_rows(), 0);
136 assert!(!ng.is_full());
137 assert_eq!(ng.num_chunked_groups(), 0);
138 }
139
140 #[test]
141 fn append_creates_chunked_group() {
142 let mut ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int32]);
143 let val: i32 = 42;
144 ng.append_row(&[Some(&val.to_ne_bytes())]);
145 assert_eq!(ng.num_rows(), 1);
146 assert_eq!(ng.num_chunked_groups(), 1);
147 }
148
149 #[test]
150 fn append_multiple_rows() {
151 let mut ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int64]);
152 for i in 0..100u64 {
153 let val = i as i64;
154 ng.append_row(&[Some(&val.to_ne_bytes())]);
155 }
156 assert_eq!(ng.num_rows(), 100);
157 assert_eq!(ng.num_chunked_groups(), 1);
158 }
159
160 #[test]
161 fn spill_to_second_chunked_group() {
162 let mut ng = NodeGroup::with_capacity(
163 NodeGroupIdx(0),
164 vec![LogicalType::Int32],
165 CHUNKED_NODE_GROUP_CAPACITY + 10,
166 );
167
168 for i in 0..CHUNKED_NODE_GROUP_CAPACITY + 5 {
169 let val = i as i32;
170 ng.append_row(&[Some(&val.to_ne_bytes())]);
171 }
172
173 assert_eq!(ng.num_rows(), CHUNKED_NODE_GROUP_CAPACITY + 5);
174 assert_eq!(ng.num_chunked_groups(), 2);
175 assert_eq!(ng.chunked_group(0).num_rows(), CHUNKED_NODE_GROUP_CAPACITY);
176 assert_eq!(ng.chunked_group(1).num_rows(), 5);
177 }
178
179 #[test]
180 fn global_row_to_chunked_group_mapping() {
181 let mut ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int32]);
182 for i in 0..CHUNKED_NODE_GROUP_CAPACITY + 5 {
183 let val = i as i32;
184 ng.append_row(&[Some(&val.to_ne_bytes())]);
185 }
186
187 let (g0, l0) = ng.global_row_to_chunked_group(0);
188 assert_eq!(g0, 0);
189 assert_eq!(l0, 0);
190
191 let (g1, l1) = ng.global_row_to_chunked_group(CHUNKED_NODE_GROUP_CAPACITY);
192 assert_eq!(g1, 1);
193 assert_eq!(l1, 0);
194
195 let (g2, l2) = ng.global_row_to_chunked_group(CHUNKED_NODE_GROUP_CAPACITY + 4);
196 assert_eq!(g2, 1);
197 assert_eq!(l2, 4);
198 }
199
200 #[test]
201 fn with_nulls() {
202 let mut ng = NodeGroup::new(
203 NodeGroupIdx(0),
204 vec![LogicalType::Int32, LogicalType::Int64],
205 );
206 let v: i32 = 1;
207 ng.append_row(&[Some(&v.to_ne_bytes()), None]);
208 assert_eq!(ng.num_rows(), 1);
209
210 let group = ng.chunked_group(0);
211 assert!(!group.column(0).is_null(0));
212 assert!(group.column(1).is_null(0));
213 }
214
215 #[test]
216 fn custom_capacity() {
217 let ng = NodeGroup::with_capacity(NodeGroupIdx(5), vec![LogicalType::Int32], 100);
218 assert_eq!(ng.capacity(), 100);
219 assert_eq!(ng.node_group_idx(), NodeGroupIdx(5));
220 }
221
222 #[test]
223 fn is_full_at_capacity() {
224 let mut ng = NodeGroup::with_capacity(NodeGroupIdx(0), vec![LogicalType::Int32], 3);
225 let v: i32 = 0;
226 ng.append_row(&[Some(&v.to_ne_bytes())]);
227 ng.append_row(&[Some(&v.to_ne_bytes())]);
228 assert!(!ng.is_full());
229 ng.append_row(&[Some(&v.to_ne_bytes())]);
230 assert!(ng.is_full());
231 }
232
233 #[test]
234 fn format_default() {
235 let ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int32]);
236 assert_eq!(ng.format(), NodeGroupFormat::Regular);
237 }
238
239 #[test]
240 fn data_types() {
241 let types = vec![LogicalType::Int32, LogicalType::String, LogicalType::Bool];
242 let ng = NodeGroup::new(NodeGroupIdx(0), types.clone());
243 assert_eq!(ng.data_types(), &types);
244 }
245}