llkv_table/stream/
column.rs

1use std::sync::Arc;
2
3use arrow::array::RecordBatch;
4
5use llkv_column_map::ColumnStore;
6use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext};
7use llkv_result::Result as LlkvResult;
8use llkv_storage::pager::{MemPager, Pager};
9use llkv_types::ids::{LogicalFieldId, RowId};
10use simd_r_drive_entry_handle::EntryHandle;
11
12use croaring::Treemap;
13
14/// Iterator over row IDs that can own or borrow the underlying bitmap.
15pub enum RowIdIter<'a> {
16    Owned(std::vec::IntoIter<RowId>),
17    Borrowed(Box<dyn Iterator<Item = RowId> + 'a>),
18}
19
20impl<'a> Iterator for RowIdIter<'a> {
21    type Item = RowId;
22
23    #[inline]
24    fn next(&mut self) -> Option<Self::Item> {
25        match self {
26            Self::Owned(iter) => iter.next(),
27            Self::Borrowed(iter) => iter.next(),
28        }
29    }
30}
31
32impl From<Treemap> for RowIdIter<'static> {
33    fn from(map: Treemap) -> Self {
34        // FIXME: This iter->vec-iter is not ideal but faster than
35        // manipulating the treemap on each iteration
36        Self::Owned(map.iter().collect::<Vec<_>>().into_iter())
37    }
38}
39
40impl<'a> From<&'a Treemap> for RowIdIter<'a> {
41    fn from(map: &'a Treemap) -> Self {
42        Self::Borrowed(Box::new(map.iter()))
43    }
44}
45
46pub trait RowIdStreamSource<'a> {
47    fn count(&self) -> RowId;
48    fn into_iter_source(self) -> RowIdIter<'a>;
49}
50
51impl RowIdStreamSource<'static> for Treemap {
52    fn count(&self) -> RowId {
53        self.cardinality()
54    }
55    fn into_iter_source(self) -> RowIdIter<'static> {
56        // FIXME: This iter->vec-iter is not ideal but faster than
57        // manipulating the treemap on each iteration
58        RowIdIter::Owned(self.iter().collect::<Vec<_>>().into_iter())
59    }
60}
61
62impl<'a> RowIdStreamSource<'a> for &'a Treemap {
63    fn count(&self) -> RowId {
64        self.cardinality()
65    }
66    fn into_iter_source(self) -> RowIdIter<'a> {
67        RowIdIter::Borrowed(Box::new(self.iter()))
68    }
69}
70
71/// Streaming view over a set of row IDs for selected logical fields.
72///
73/// `ColumnStream` keeps a reusable gather context so repeated calls avoid
74/// reparsing column descriptors or re-fetching chunk metadata. Each call to
75/// [`ColumnStream::next_batch`] returns at most `STREAM_BATCH_ROWS` values,
76/// backed by Arrow arrays without copying the column data.
77pub struct ColumnStream<'table, 'a, P = MemPager>
78where
79    P: Pager<Blob = EntryHandle> + Send + Sync,
80{
81    store: &'table ColumnStore<P>,
82    ctx: MultiGatherContext,
83    row_ids: RowIdIter<'a>,
84    position: usize,
85    total_rows: usize,
86    chunk_size: usize,
87    policy: GatherNullPolicy,
88    logical_fields: Arc<[LogicalFieldId]>,
89}
90
91/// Single batch produced by [`ColumnStream`].
92pub struct ColumnStreamBatch {
93    start: usize,
94    row_ids: Vec<RowId>,
95    batch: RecordBatch,
96}
97
98impl<'table, 'a, P> ColumnStream<'table, 'a, P>
99where
100    P: Pager<Blob = EntryHandle> + Send + Sync,
101{
102    pub(crate) fn new(
103        store: &'table ColumnStore<P>,
104        ctx: MultiGatherContext,
105        row_ids: RowIdIter<'a>,
106        total_rows: usize,
107        chunk_size: usize,
108        policy: GatherNullPolicy,
109        logical_fields: Arc<[LogicalFieldId]>,
110    ) -> Self {
111        Self {
112            store,
113            ctx,
114            row_ids,
115            position: 0,
116            total_rows,
117            chunk_size,
118            policy,
119            logical_fields,
120        }
121    }
122
123    /// Total number of row IDs covered by this stream.
124    #[inline]
125    pub fn total_rows(&self) -> usize {
126        self.total_rows
127    }
128
129    /// Remaining number of row IDs that have not yet been yielded.
130    #[inline]
131    pub fn remaining_rows(&self) -> usize {
132        self.total_rows.saturating_sub(self.position)
133    }
134
135    /// Logical fields produced by this stream.
136    #[inline]
137    pub fn logical_fields(&self) -> &[LogicalFieldId] {
138        &self.logical_fields
139    }
140
141    /// Fetch the next chunk of rows, if any remain.
142    pub fn next_batch(&mut self) -> LlkvResult<Option<ColumnStreamBatch>> {
143        loop {
144            let mut window = Vec::with_capacity(self.chunk_size);
145            for _ in 0..self.chunk_size {
146                if let Some(rid) = self.row_ids.next() {
147                    window.push(rid);
148                } else {
149                    break;
150                }
151            }
152
153            if window.is_empty() {
154                return Ok(None);
155            }
156
157            let start = self.position;
158            self.position += window.len();
159
160            let batch = self.store.gather_rows_with_reusable_context(
161                &mut self.ctx,
162                &window,
163                self.policy,
164            )?;
165
166            if batch.num_rows() == 0 && matches!(self.policy, GatherNullPolicy::DropNulls) {
167                // All rows dropped; continue to the next chunk to avoid yielding empties.
168                continue;
169            }
170
171            return Ok(Some(ColumnStreamBatch {
172                start,
173                row_ids: window,
174                batch,
175            }));
176        }
177    }
178}
179
180impl ColumnStreamBatch {
181    #[inline]
182    pub fn row_ids(&self) -> &[RowId] {
183        &self.row_ids
184    }
185
186    #[inline]
187    pub fn row_offset(&self) -> usize {
188        self.start
189    }
190
191    #[inline]
192    pub fn len(&self) -> usize {
193        self.row_ids.len()
194    }
195
196    #[inline]
197    pub fn is_empty(&self) -> bool {
198        self.row_ids.is_empty()
199    }
200
201    #[inline]
202    pub fn batch(&self) -> &RecordBatch {
203        &self.batch
204    }
205
206    #[inline]
207    pub fn into_batch(self) -> RecordBatch {
208        self.batch
209    }
210}