Skip to main content

lance_encoding/utils/
accumulation.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! An accumulation queue accumulates arrays until we have enough data to flush.
5
6use arrow_array::ArrayRef;
7use lance_arrow::deepcopy::deep_copy_array;
8use log::{debug, trace};
9
10#[derive(Debug)]
11pub struct AccumulationQueue {
12    cache_bytes: u64,
13    keep_original_array: bool,
14    buffered_arrays: Vec<ArrayRef>,
15    current_bytes: u64,
16    // Row number of the first item in buffered_arrays, reset on flush
17    row_number: u64,
18    // Number of top level rows represented in buffered_arrays, reset on flush
19    num_rows: u64,
20    // This is only for logging / debugging purposes
21    column_index: u32,
22}
23
24impl AccumulationQueue {
25    pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
26        Self {
27            cache_bytes,
28            buffered_arrays: Vec::new(),
29            current_bytes: 0,
30            column_index,
31            keep_original_array,
32            row_number: u64::MAX,
33            num_rows: 0,
34        }
35    }
36
37    /// Adds an array to the queue, if there is enough data then the queue is flushed
38    /// and returned
39    pub fn insert(
40        &mut self,
41        array: ArrayRef,
42        row_number: u64,
43        num_rows: u64,
44    ) -> Option<(Vec<ArrayRef>, u64, u64)> {
45        if self.row_number == u64::MAX {
46            self.row_number = row_number;
47        }
48        self.num_rows += num_rows;
49        self.current_bytes += array.get_array_memory_size() as u64;
50        if self.current_bytes > self.cache_bytes {
51            debug!(
52                "Flushing column {} page of size {} bytes (unencoded)",
53                self.column_index, self.current_bytes
54            );
55            // Push into buffered_arrays without copy since we are about to flush anyways
56            self.buffered_arrays.push(array);
57            self.current_bytes = 0;
58            let row_number = self.row_number;
59            self.row_number = u64::MAX;
60            let num_rows = self.num_rows;
61            self.num_rows = 0;
62            Some((
63                std::mem::take(&mut self.buffered_arrays),
64                row_number,
65                num_rows,
66            ))
67        } else {
68            trace!(
69                "Accumulating data for column {}.  Now at {} bytes",
70                self.column_index, self.current_bytes
71            );
72            if self.keep_original_array {
73                self.buffered_arrays.push(array);
74            } else {
75                self.buffered_arrays.push(deep_copy_array(array.as_ref()))
76            }
77            None
78        }
79    }
80
81    pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
82        if self.buffered_arrays.is_empty() {
83            trace!(
84                "No final flush since no data at column {}",
85                self.column_index
86            );
87            None
88        } else {
89            trace!(
90                "Final flush of column {} which has {} bytes",
91                self.column_index, self.current_bytes
92            );
93            self.current_bytes = 0;
94            let row_number = self.row_number;
95            self.row_number = u64::MAX;
96            let num_rows = self.num_rows;
97            self.num_rows = 0;
98            Some((
99                std::mem::take(&mut self.buffered_arrays),
100                row_number,
101                num_rows,
102            ))
103        }
104    }
105}