lance_encoding/utils/
accumulation.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! An accumulation queue accumulates arrays until we have enough data to flush.
5
6use arrow_array::ArrayRef;
7use lance_arrow::deepcopy::deep_copy_array;
8use log::{debug, trace};
9
10#[derive(Debug)]
11pub struct AccumulationQueue {
12    cache_bytes: u64,
13    keep_original_array: bool,
14    buffered_arrays: Vec<ArrayRef>,
15    current_bytes: u64,
16    // Row number of the first item in buffered_arrays, reset on flush
17    row_number: u64,
18    // Number of top level rows represented in buffered_arrays, reset on flush
19    num_rows: u64,
20    // This is only for logging / debugging purposes
21    column_index: u32,
22}
23
24impl AccumulationQueue {
25    pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
26        Self {
27            cache_bytes,
28            buffered_arrays: Vec::new(),
29            current_bytes: 0,
30            column_index,
31            keep_original_array,
32            row_number: u64::MAX,
33            num_rows: 0,
34        }
35    }
36
37    /// Adds an array to the queue, if there is enough data then the queue is flushed
38    /// and returned
39    pub fn insert(
40        &mut self,
41        array: ArrayRef,
42        row_number: u64,
43        num_rows: u64,
44    ) -> Option<(Vec<ArrayRef>, u64, u64)> {
45        if self.row_number == u64::MAX {
46            self.row_number = row_number;
47        }
48        self.num_rows += num_rows;
49        self.current_bytes += array.get_array_memory_size() as u64;
50        if self.current_bytes > self.cache_bytes {
51            debug!(
52                "Flushing column {} page of size {} bytes (unencoded)",
53                self.column_index, self.current_bytes
54            );
55            // Push into buffered_arrays without copy since we are about to flush anyways
56            self.buffered_arrays.push(array);
57            self.current_bytes = 0;
58            let row_number = self.row_number;
59            self.row_number = u64::MAX;
60            let num_rows = self.num_rows;
61            self.num_rows = 0;
62            Some((
63                std::mem::take(&mut self.buffered_arrays),
64                row_number,
65                num_rows,
66            ))
67        } else {
68            trace!(
69                "Accumulating data for column {}.  Now at {} bytes",
70                self.column_index,
71                self.current_bytes
72            );
73            if self.keep_original_array {
74                self.buffered_arrays.push(array);
75            } else {
76                self.buffered_arrays.push(deep_copy_array(array.as_ref()))
77            }
78            None
79        }
80    }
81
82    pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
83        if self.buffered_arrays.is_empty() {
84            trace!(
85                "No final flush since no data at column {}",
86                self.column_index
87            );
88            None
89        } else {
90            trace!(
91                "Final flush of column {} which has {} bytes",
92                self.column_index,
93                self.current_bytes
94            );
95            self.current_bytes = 0;
96            let row_number = self.row_number;
97            self.row_number = u64::MAX;
98            let num_rows = self.num_rows;
99            self.num_rows = 0;
100            Some((
101                std::mem::take(&mut self.buffered_arrays),
102                row_number,
103                num_rows,
104            ))
105        }
106    }
107}