lance_encoding/utils/
accumulation.rs1use arrow_array::ArrayRef;
7use lance_arrow::deepcopy::deep_copy_array;
8use log::{debug, trace};
9
10#[derive(Debug)]
11pub struct AccumulationQueue {
12 cache_bytes: u64,
13 keep_original_array: bool,
14 buffered_arrays: Vec<ArrayRef>,
15 current_bytes: u64,
16 row_number: u64,
18 num_rows: u64,
20 column_index: u32,
22}
23
24impl AccumulationQueue {
25 pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
26 Self {
27 cache_bytes,
28 buffered_arrays: Vec::new(),
29 current_bytes: 0,
30 column_index,
31 keep_original_array,
32 row_number: u64::MAX,
33 num_rows: 0,
34 }
35 }
36
37 pub fn insert(
40 &mut self,
41 array: ArrayRef,
42 row_number: u64,
43 num_rows: u64,
44 ) -> Option<(Vec<ArrayRef>, u64, u64)> {
45 if self.row_number == u64::MAX {
46 self.row_number = row_number;
47 }
48 self.num_rows += num_rows;
49 self.current_bytes += array.get_array_memory_size() as u64;
50 if self.current_bytes > self.cache_bytes {
51 debug!(
52 "Flushing column {} page of size {} bytes (unencoded)",
53 self.column_index, self.current_bytes
54 );
55 self.buffered_arrays.push(array);
57 self.current_bytes = 0;
58 let row_number = self.row_number;
59 self.row_number = u64::MAX;
60 let num_rows = self.num_rows;
61 self.num_rows = 0;
62 Some((
63 std::mem::take(&mut self.buffered_arrays),
64 row_number,
65 num_rows,
66 ))
67 } else {
68 trace!(
69 "Accumulating data for column {}. Now at {} bytes",
70 self.column_index,
71 self.current_bytes
72 );
73 if self.keep_original_array {
74 self.buffered_arrays.push(array);
75 } else {
76 self.buffered_arrays.push(deep_copy_array(array.as_ref()))
77 }
78 None
79 }
80 }
81
82 pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
83 if self.buffered_arrays.is_empty() {
84 trace!(
85 "No final flush since no data at column {}",
86 self.column_index
87 );
88 None
89 } else {
90 trace!(
91 "Final flush of column {} which has {} bytes",
92 self.column_index,
93 self.current_bytes
94 );
95 self.current_bytes = 0;
96 let row_number = self.row_number;
97 self.row_number = u64::MAX;
98 let num_rows = self.num_rows;
99 self.num_rows = 0;
100 Some((
101 std::mem::take(&mut self.buffered_arrays),
102 row_number,
103 num_rows,
104 ))
105 }
106 }
107}