Skip to main content

clickhouse_native_client/
block.rs

1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2use crate::{
3    column::{
4        Column,
5        ColumnRef,
6    },
7    types::Type,
8    Error,
9    Result,
10};
11use std::sync::Arc;
12
13/// Block metadata used by ClickHouse for distributed query processing.
14#[derive(Debug, Clone, Default)]
15pub struct BlockInfo {
16    /// Whether this block is an overflow block (1 = yes, 0 = no).
17    pub is_overflows: u8,
18    /// Bucket number for distributed GROUP BY (-1 if not applicable).
19    pub bucket_num: i32,
20}
21
22/// A block is a collection of named columns with the same number of rows
23#[derive(Clone)]
24pub struct Block {
25    columns: Vec<ColumnItem>,
26    rows: usize,
27    info: BlockInfo,
28}
29
30#[derive(Clone)]
31struct ColumnItem {
32    name: String,
33    column: ColumnRef,
34}
35
36impl Block {
37    /// Create a new empty block
38    pub fn new() -> Self {
39        Self { columns: Vec::new(), rows: 0, info: BlockInfo::default() }
40    }
41
42    /// Create a block with reserved capacity
43    pub fn with_capacity(cols: usize, rows: usize) -> Self {
44        Self {
45            columns: Vec::with_capacity(cols),
46            rows,
47            info: BlockInfo::default(),
48        }
49    }
50
51    /// Append a named column to the block
52    pub fn append_column(
53        &mut self,
54        name: impl Into<String>,
55        column: ColumnRef,
56    ) -> Result<()> {
57        let name = name.into();
58
59        if self.columns.is_empty() {
60            self.rows = column.size();
61        } else if column.size() != self.rows {
62            return Err(Error::Validation(format!(
63                "All columns in block must have same count of rows. Name: '{}', expected rows: {}, got: {}",
64                name,
65                self.rows,
66                column.size()
67            )));
68        }
69
70        self.columns.push(ColumnItem { name, column });
71        Ok(())
72    }
73
74    /// Get the number of columns in the block
75    pub fn column_count(&self) -> usize {
76        self.columns.len()
77    }
78
79    /// Get the number of rows in the block
80    pub fn row_count(&self) -> usize {
81        self.rows
82    }
83
84    /// Get column by index
85    pub fn column(&self, index: usize) -> Option<ColumnRef> {
86        self.columns.get(index).map(|item| item.column.clone())
87    }
88
89    /// Get mutable access to column by index
90    /// Returns None if index is out of bounds
91    /// Panics if the column has multiple references
92    pub fn column_mut(
93        &mut self,
94        index: usize,
95    ) -> Option<&mut (dyn Column + '_)> {
96        let item = self.columns.get_mut(index)?;
97        Some(Arc::get_mut(&mut item.column)
98            .expect("Cannot get mutable access to shared column - column has multiple references"))
99    }
100
101    /// Get column name by index
102    pub fn column_name(&self, index: usize) -> Option<&str> {
103        self.columns.get(index).map(|item| item.name.as_str())
104    }
105
106    /// Get column by name
107    pub fn column_by_name(&self, name: &str) -> Option<ColumnRef> {
108        self.columns
109            .iter()
110            .find(|item| item.name == name)
111            .map(|item| item.column.clone())
112    }
113
114    /// Get mutable access to column by name
115    /// Returns None if column with given name is not found
116    /// Panics if the column has multiple references
117    pub fn column_by_name_mut(
118        &mut self,
119        name: &str,
120    ) -> Option<&mut (dyn Column + '_)> {
121        let item = self.columns.iter_mut().find(|item| item.name == name)?;
122        Some(Arc::get_mut(&mut item.column)
123            .expect("Cannot get mutable access to shared column - column has multiple references"))
124    }
125
126    /// Get block info
127    pub fn info(&self) -> &BlockInfo {
128        &self.info
129    }
130
131    /// Set block info
132    pub fn set_info(&mut self, info: BlockInfo) {
133        self.info = info;
134    }
135
136    /// Clear all data from all columns
137    pub fn clear(&mut self) {
138        // Clear by removing all columns from the block
139        self.columns.clear();
140        self.rows = 0;
141    }
142
143    /// Reserve capacity in all columns
144    pub fn reserve(&mut self, new_cap: usize) {
145        for item in &mut self.columns {
146            // Must panic if can't reserve to prevent inconsistent state
147            Arc::get_mut(&mut item.column)
148                .expect("Cannot reserve on shared column - column has multiple references")
149                .reserve(new_cap);
150        }
151    }
152
153    /// Refresh and validate row count
154    pub fn refresh_row_count(&mut self) -> Result<usize> {
155        if self.columns.is_empty() {
156            self.rows = 0;
157            return Ok(0);
158        }
159
160        let first_rows = self.columns[0].column.size();
161
162        for item in &self.columns {
163            let col_rows = item.column.size();
164            if col_rows != first_rows {
165                return Err(Error::Validation(format!(
166                    "All columns in block must have same count of rows. Name: '{}', expected: {}, got: {}",
167                    item.name, first_rows, col_rows
168                )));
169            }
170        }
171
172        self.rows = first_rows;
173        Ok(first_rows)
174    }
175
176    /// Iterate over columns
177    pub fn iter(&self) -> BlockIterator<'_> {
178        BlockIterator { block: self, index: 0 }
179    }
180
181    /// Check if block is empty
182    pub fn is_empty(&self) -> bool {
183        self.rows == 0 || self.columns.is_empty()
184    }
185}
186
187impl Default for Block {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193/// Iterator over block columns
194pub struct BlockIterator<'a> {
195    block: &'a Block,
196    index: usize,
197}
198
199impl<'a> Iterator for BlockIterator<'a> {
200    type Item = (&'a str, &'a Type, ColumnRef);
201
202    fn next(&mut self) -> Option<Self::Item> {
203        if self.index < self.block.columns.len() {
204            let item = &self.block.columns[self.index];
205            self.index += 1;
206            Some((&item.name, item.column.column_type(), item.column.clone()))
207        } else {
208            None
209        }
210    }
211}
212
213impl<'a> IntoIterator for &'a Block {
214    type Item = (&'a str, &'a Type, ColumnRef);
215    type IntoIter = BlockIterator<'a>;
216
217    fn into_iter(self) -> Self::IntoIter {
218        self.iter()
219    }
220}
221
222#[cfg(test)]
223#[cfg_attr(coverage_nightly, coverage(off))]
224mod tests {
225    use super::*;
226    use crate::column::numeric::ColumnUInt64;
227    use std::sync::Arc;
228
229    #[test]
230    fn test_block_creation() {
231        let block = Block::new();
232        assert_eq!(block.column_count(), 0);
233        assert_eq!(block.row_count(), 0);
234        assert!(block.is_empty());
235    }
236
237    #[test]
238    fn test_block_append_column() {
239        let mut block = Block::new();
240
241        let mut col1 = ColumnUInt64::new();
242        col1.append(1);
243        col1.append(2);
244        col1.append(3);
245
246        block.append_column("id", Arc::new(col1)).unwrap();
247
248        assert_eq!(block.column_count(), 1);
249        assert_eq!(block.row_count(), 3);
250        assert!(!block.is_empty());
251    }
252
253    #[test]
254    fn test_block_multiple_columns() {
255        let mut block = Block::new();
256
257        let mut col1 = ColumnUInt64::new();
258        col1.append(1);
259        col1.append(2);
260
261        let mut col2 = ColumnUInt64::new();
262        col2.append(100);
263        col2.append(200);
264
265        block.append_column("id", Arc::new(col1)).unwrap();
266        block.append_column("value", Arc::new(col2)).unwrap();
267
268        assert_eq!(block.column_count(), 2);
269        assert_eq!(block.row_count(), 2);
270    }
271
272    #[test]
273    fn test_block_mismatched_rows() {
274        let mut block = Block::new();
275
276        let mut col1 = ColumnUInt64::new();
277        col1.append(1);
278        col1.append(2);
279
280        let mut col2 = ColumnUInt64::new();
281        col2.append(100);
282        col2.append(200);
283        col2.append(300); // Extra row!
284
285        block.append_column("id", Arc::new(col1)).unwrap();
286        let result = block.append_column("value", Arc::new(col2));
287
288        assert!(result.is_err());
289    }
290
291    #[test]
292    fn test_block_get_column() {
293        let mut block = Block::new();
294
295        let mut col1 = ColumnUInt64::new();
296        col1.append(42);
297
298        block.append_column("test", Arc::new(col1)).unwrap();
299
300        let col = block.column(0).unwrap();
301        assert_eq!(col.size(), 1);
302
303        assert!(block.column(1).is_none());
304    }
305
306    #[test]
307    fn test_block_get_column_by_name() {
308        let mut block = Block::new();
309
310        let mut col1 = ColumnUInt64::new();
311        col1.append(42);
312
313        block.append_column("my_column", Arc::new(col1)).unwrap();
314
315        let col = block.column_by_name("my_column").unwrap();
316        assert_eq!(col.size(), 1);
317
318        assert!(block.column_by_name("nonexistent").is_none());
319    }
320
321    #[test]
322    fn test_block_column_name() {
323        let mut block = Block::new();
324
325        let mut col1 = ColumnUInt64::new();
326        col1.append(1);
327
328        block.append_column("test_name", Arc::new(col1)).unwrap();
329
330        assert_eq!(block.column_name(0), Some("test_name"));
331        assert_eq!(block.column_name(1), None);
332    }
333
334    #[test]
335    fn test_block_iterator() {
336        let mut block = Block::new();
337
338        let mut col1 = ColumnUInt64::new();
339        col1.append(1);
340
341        let mut col2 = ColumnUInt64::new();
342        col2.append(2);
343
344        block.append_column("first", Arc::new(col1)).unwrap();
345        block.append_column("second", Arc::new(col2)).unwrap();
346
347        let names: Vec<&str> = block.iter().map(|(name, _, _)| name).collect();
348        assert_eq!(names, vec!["first", "second"]);
349    }
350
351    #[test]
352    fn test_block_info() {
353        let mut block = Block::new();
354
355        let info = BlockInfo { is_overflows: 1, bucket_num: 42 };
356
357        block.set_info(info.clone());
358
359        assert_eq!(block.info().is_overflows, 1);
360        assert_eq!(block.info().bucket_num, 42);
361    }
362
363    #[test]
364    fn test_block_column_mut_exclusive_access() {
365        let mut block = Block::new();
366
367        let mut col1 = ColumnUInt64::new();
368        col1.append(42);
369
370        block.append_column("test", Arc::new(col1)).unwrap();
371
372        // Should get mutable access since block has exclusive ownership
373        let col_mut = block.column_mut(0).expect("Should get mutable access");
374
375        // Can use it to append data
376        let col_u64 =
377            col_mut.as_any_mut().downcast_mut::<ColumnUInt64>().unwrap();
378        col_u64.append(100);
379
380        // Verify data was appended
381        let col = block.column(0).unwrap();
382        let col_u64 = col.as_any().downcast_ref::<ColumnUInt64>().unwrap();
383        assert_eq!(col_u64.len(), 2);
384        assert_eq!(col_u64.at(0), 42);
385        assert_eq!(col_u64.at(1), 100);
386    }
387
388    #[test]
389    #[should_panic(expected = "Cannot get mutable access to shared column")]
390    fn test_block_column_mut_panics_on_shared() {
391        let mut block = Block::new();
392
393        let mut col1 = ColumnUInt64::new();
394        col1.append(42);
395
396        block.append_column("test", Arc::new(col1)).unwrap();
397
398        // Clone the column to create a shared reference
399        let _shared_ref = block.column(0).unwrap();
400
401        // Should panic because column now has multiple references
402        let _ = block.column_mut(0);
403    }
404
405    #[test]
406    fn test_block_column_mut_out_of_bounds() {
407        let mut block = Block::new();
408
409        // Should return None for out of bounds index
410        assert!(block.column_mut(0).is_none());
411        assert!(block.column_mut(100).is_none());
412    }
413
414    #[test]
415    fn test_block_column_by_name_mut_exclusive_access() {
416        let mut block = Block::new();
417
418        let mut col1 = ColumnUInt64::new();
419        col1.append(42);
420
421        block.append_column("my_column", Arc::new(col1)).unwrap();
422
423        // Should get mutable access since block has exclusive ownership
424        let col_mut = block
425            .column_by_name_mut("my_column")
426            .expect("Should get mutable access");
427
428        // Can use it to append data
429        let col_u64 =
430            col_mut.as_any_mut().downcast_mut::<ColumnUInt64>().unwrap();
431        col_u64.append(100);
432
433        // Verify data was appended
434        let col = block.column_by_name("my_column").unwrap();
435        let col_u64 = col.as_any().downcast_ref::<ColumnUInt64>().unwrap();
436        assert_eq!(col_u64.len(), 2);
437        assert_eq!(col_u64.at(0), 42);
438        assert_eq!(col_u64.at(1), 100);
439    }
440
441    #[test]
442    #[should_panic(expected = "Cannot get mutable access to shared column")]
443    fn test_block_column_by_name_mut_panics_on_shared() {
444        let mut block = Block::new();
445
446        let mut col1 = ColumnUInt64::new();
447        col1.append(42);
448
449        block.append_column("my_column", Arc::new(col1)).unwrap();
450
451        // Clone the column to create a shared reference
452        let _shared_ref = block.column_by_name("my_column").unwrap();
453
454        // Should panic because column now has multiple references
455        let _ = block.column_by_name_mut("my_column");
456    }
457
458    #[test]
459    fn test_block_column_by_name_mut_not_found() {
460        let mut block = Block::new();
461
462        let mut col1 = ColumnUInt64::new();
463        col1.append(42);
464
465        block.append_column("my_column", Arc::new(col1)).unwrap();
466
467        // Should return None for non-existent column name
468        assert!(block.column_by_name_mut("nonexistent").is_none());
469    }
470}