lance_encoding/utils/
accumulation.rs1use arrow_array::ArrayRef;
7use lance_arrow::deepcopy::deep_copy_array;
8use log::{debug, trace};
9
10#[derive(Debug)]
11pub struct AccumulationQueue {
12 cache_bytes: u64,
13 keep_original_array: bool,
14 buffered_arrays: Vec<ArrayRef>,
15 current_bytes: u64,
16 row_number: u64,
18 num_rows: u64,
20 column_index: u32,
22}
23
24impl AccumulationQueue {
25 pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
26 Self {
27 cache_bytes,
28 buffered_arrays: Vec::new(),
29 current_bytes: 0,
30 column_index,
31 keep_original_array,
32 row_number: u64::MAX,
33 num_rows: 0,
34 }
35 }
36
37 pub fn insert(
40 &mut self,
41 array: ArrayRef,
42 row_number: u64,
43 num_rows: u64,
44 ) -> Option<(Vec<ArrayRef>, u64, u64)> {
45 if self.row_number == u64::MAX {
46 self.row_number = row_number;
47 }
48 self.num_rows += num_rows;
49 self.current_bytes += array.get_array_memory_size() as u64;
50 if self.current_bytes > self.cache_bytes {
51 debug!(
52 "Flushing column {} page of size {} bytes (unencoded)",
53 self.column_index, self.current_bytes
54 );
55 self.buffered_arrays.push(array);
57 self.current_bytes = 0;
58 let row_number = self.row_number;
59 self.row_number = u64::MAX;
60 let num_rows = self.num_rows;
61 self.num_rows = 0;
62 Some((
63 std::mem::take(&mut self.buffered_arrays),
64 row_number,
65 num_rows,
66 ))
67 } else {
68 trace!(
69 "Accumulating data for column {}. Now at {} bytes",
70 self.column_index, self.current_bytes
71 );
72 if self.keep_original_array {
73 self.buffered_arrays.push(array);
74 } else {
75 self.buffered_arrays.push(deep_copy_array(array.as_ref()))
76 }
77 None
78 }
79 }
80
81 pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
82 if self.buffered_arrays.is_empty() {
83 trace!(
84 "No final flush since no data at column {}",
85 self.column_index
86 );
87 None
88 } else {
89 trace!(
90 "Final flush of column {} which has {} bytes",
91 self.column_index, self.current_bytes
92 );
93 self.current_bytes = 0;
94 let row_number = self.row_number;
95 self.row_number = u64::MAX;
96 let num_rows = self.num_rows;
97 self.num_rows = 0;
98 Some((
99 std::mem::take(&mut self.buffered_arrays),
100 row_number,
101 num_rows,
102 ))
103 }
104 }
105}