Skip to main content

kyu_storage/
node_group.rs

1//! NodeGroup: manages NODE_GROUP_SIZE (131,072) rows as a collection of ChunkedNodeGroups.
2
3use kyu_types::LogicalType;
4
5use crate::chunked_node_group::ChunkedNodeGroup;
6use crate::constants::{CHUNKED_NODE_GROUP_CAPACITY, NODE_GROUP_SIZE};
7use crate::storage_types::NodeGroupFormat;
8
9/// Typed node group index.
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
11pub struct NodeGroupIdx(pub u64);
12
13/// A node group holds up to `NODE_GROUP_SIZE` (131,072) rows,
14/// organized as a collection of `ChunkedNodeGroup`s (each 2,048 rows).
15pub struct NodeGroup {
16    node_group_idx: NodeGroupIdx,
17    format: NodeGroupFormat,
18    data_types: Vec<LogicalType>,
19    num_rows: u64,
20    capacity: u64,
21    chunked_groups: Vec<ChunkedNodeGroup>,
22}
23
24impl NodeGroup {
25    /// Create a new node group with default capacity (NODE_GROUP_SIZE).
26    pub fn new(node_group_idx: NodeGroupIdx, data_types: Vec<LogicalType>) -> Self {
27        Self::with_capacity(node_group_idx, data_types, NODE_GROUP_SIZE)
28    }
29
30    /// Create a node group with a custom capacity.
31    pub fn with_capacity(
32        node_group_idx: NodeGroupIdx,
33        data_types: Vec<LogicalType>,
34        capacity: u64,
35    ) -> Self {
36        Self {
37            node_group_idx,
38            format: NodeGroupFormat::Regular,
39            data_types,
40            num_rows: 0,
41            capacity,
42            chunked_groups: Vec::new(),
43        }
44    }
45
46    pub fn node_group_idx(&self) -> NodeGroupIdx {
47        self.node_group_idx
48    }
49
50    pub fn format(&self) -> NodeGroupFormat {
51        self.format
52    }
53
54    pub fn set_format(&mut self, format: NodeGroupFormat) {
55        self.format = format;
56    }
57
58    pub fn data_types(&self) -> &[LogicalType] {
59        &self.data_types
60    }
61
62    pub fn num_rows(&self) -> u64 {
63        self.num_rows
64    }
65
66    pub fn capacity(&self) -> u64 {
67        self.capacity
68    }
69
70    pub fn is_full(&self) -> bool {
71        self.num_rows >= self.capacity
72    }
73
74    pub fn num_chunked_groups(&self) -> usize {
75        self.chunked_groups.len()
76    }
77
78    pub fn chunked_group(&self, idx: usize) -> &ChunkedNodeGroup {
79        &self.chunked_groups[idx]
80    }
81
82    pub fn chunked_group_mut(&mut self, idx: usize) -> &mut ChunkedNodeGroup {
83        &mut self.chunked_groups[idx]
84    }
85
86    /// Map a global row index to (chunked_group_idx, local_row_within_group).
87    pub fn global_row_to_chunked_group(&self, row: u64) -> (usize, u64) {
88        let chunk_capacity = self
89            .chunked_groups
90            .first()
91            .map(|g| g.capacity())
92            .unwrap_or(CHUNKED_NODE_GROUP_CAPACITY);
93        let group_idx = (row / chunk_capacity) as usize;
94        let local_row = row % chunk_capacity;
95        (group_idx, local_row)
96    }
97
98    /// Append a row of raw byte values across all columns.
99    /// Automatically creates new ChunkedNodeGroups as needed.
100    /// Returns the global row index within the node group.
101    pub fn append_row(&mut self, values: &[Option<&[u8]>]) -> u64 {
102        debug_assert!(!self.is_full());
103
104        // Create first chunked group or a new one if current is full.
105        let needs_new = self.chunked_groups.last().is_none_or(|g| g.is_full());
106
107        if needs_new {
108            let start = self.num_rows;
109            let remaining = self.capacity - self.num_rows;
110            let cap = remaining.min(CHUNKED_NODE_GROUP_CAPACITY);
111            self.chunked_groups.push(ChunkedNodeGroup::with_capacity(
112                &self.data_types,
113                start,
114                cap,
115            ));
116        }
117
118        let last = self.chunked_groups.last_mut().unwrap();
119        last.append_row(values);
120        let global_row = self.num_rows;
121        self.num_rows += 1;
122        global_row
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn new_node_group() {
132        let ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int64]);
133        assert_eq!(ng.node_group_idx(), NodeGroupIdx(0));
134        assert_eq!(ng.capacity(), NODE_GROUP_SIZE);
135        assert_eq!(ng.num_rows(), 0);
136        assert!(!ng.is_full());
137        assert_eq!(ng.num_chunked_groups(), 0);
138    }
139
140    #[test]
141    fn append_creates_chunked_group() {
142        let mut ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int32]);
143        let val: i32 = 42;
144        ng.append_row(&[Some(&val.to_ne_bytes())]);
145        assert_eq!(ng.num_rows(), 1);
146        assert_eq!(ng.num_chunked_groups(), 1);
147    }
148
149    #[test]
150    fn append_multiple_rows() {
151        let mut ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int64]);
152        for i in 0..100u64 {
153            let val = i as i64;
154            ng.append_row(&[Some(&val.to_ne_bytes())]);
155        }
156        assert_eq!(ng.num_rows(), 100);
157        assert_eq!(ng.num_chunked_groups(), 1);
158    }
159
160    #[test]
161    fn spill_to_second_chunked_group() {
162        let mut ng = NodeGroup::with_capacity(
163            NodeGroupIdx(0),
164            vec![LogicalType::Int32],
165            CHUNKED_NODE_GROUP_CAPACITY + 10,
166        );
167
168        for i in 0..CHUNKED_NODE_GROUP_CAPACITY + 5 {
169            let val = i as i32;
170            ng.append_row(&[Some(&val.to_ne_bytes())]);
171        }
172
173        assert_eq!(ng.num_rows(), CHUNKED_NODE_GROUP_CAPACITY + 5);
174        assert_eq!(ng.num_chunked_groups(), 2);
175        assert_eq!(ng.chunked_group(0).num_rows(), CHUNKED_NODE_GROUP_CAPACITY);
176        assert_eq!(ng.chunked_group(1).num_rows(), 5);
177    }
178
179    #[test]
180    fn global_row_to_chunked_group_mapping() {
181        let mut ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int32]);
182        for i in 0..CHUNKED_NODE_GROUP_CAPACITY + 5 {
183            let val = i as i32;
184            ng.append_row(&[Some(&val.to_ne_bytes())]);
185        }
186
187        let (g0, l0) = ng.global_row_to_chunked_group(0);
188        assert_eq!(g0, 0);
189        assert_eq!(l0, 0);
190
191        let (g1, l1) = ng.global_row_to_chunked_group(CHUNKED_NODE_GROUP_CAPACITY);
192        assert_eq!(g1, 1);
193        assert_eq!(l1, 0);
194
195        let (g2, l2) = ng.global_row_to_chunked_group(CHUNKED_NODE_GROUP_CAPACITY + 4);
196        assert_eq!(g2, 1);
197        assert_eq!(l2, 4);
198    }
199
200    #[test]
201    fn with_nulls() {
202        let mut ng = NodeGroup::new(
203            NodeGroupIdx(0),
204            vec![LogicalType::Int32, LogicalType::Int64],
205        );
206        let v: i32 = 1;
207        ng.append_row(&[Some(&v.to_ne_bytes()), None]);
208        assert_eq!(ng.num_rows(), 1);
209
210        let group = ng.chunked_group(0);
211        assert!(!group.column(0).is_null(0));
212        assert!(group.column(1).is_null(0));
213    }
214
215    #[test]
216    fn custom_capacity() {
217        let ng = NodeGroup::with_capacity(NodeGroupIdx(5), vec![LogicalType::Int32], 100);
218        assert_eq!(ng.capacity(), 100);
219        assert_eq!(ng.node_group_idx(), NodeGroupIdx(5));
220    }
221
222    #[test]
223    fn is_full_at_capacity() {
224        let mut ng = NodeGroup::with_capacity(NodeGroupIdx(0), vec![LogicalType::Int32], 3);
225        let v: i32 = 0;
226        ng.append_row(&[Some(&v.to_ne_bytes())]);
227        ng.append_row(&[Some(&v.to_ne_bytes())]);
228        assert!(!ng.is_full());
229        ng.append_row(&[Some(&v.to_ne_bytes())]);
230        assert!(ng.is_full());
231    }
232
233    #[test]
234    fn format_default() {
235        let ng = NodeGroup::new(NodeGroupIdx(0), vec![LogicalType::Int32]);
236        assert_eq!(ng.format(), NodeGroupFormat::Regular);
237    }
238
239    #[test]
240    fn data_types() {
241        let types = vec![LogicalType::Int32, LogicalType::String, LogicalType::Bool];
242        let ng = NodeGroup::new(NodeGroupIdx(0), types.clone());
243        assert_eq!(ng.data_types(), &types);
244    }
245}