llkv_table/stream/
column.rs

1use std::sync::Arc;
2
3use arrow::array::RecordBatch;
4
5use crate::types::RowId;
6use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext};
7use llkv_column_map::{ColumnStore, types::LogicalFieldId};
8use llkv_result::Result as LlkvResult;
9use llkv_storage::pager::{MemPager, Pager};
10use simd_r_drive_entry_handle::EntryHandle;
11
12/// Streaming view over a set of row IDs for selected logical fields.
13///
14/// `ColumnStream` keeps a reusable gather context so repeated calls avoid
15/// reparsing column descriptors or re-fetching chunk metadata. Each call to
16/// [`ColumnStream::next_batch`] returns at most `STREAM_BATCH_ROWS` values,
17/// backed by Arrow arrays without copying the column data.
18pub struct ColumnStream<'table, P = MemPager>
19where
20    P: Pager<Blob = EntryHandle> + Send + Sync,
21{
22    store: &'table ColumnStore<P>,
23    ctx: MultiGatherContext,
24    row_ids: Vec<RowId>,
25    position: usize,
26    chunk_size: usize,
27    policy: GatherNullPolicy,
28    logical_fields: Arc<[LogicalFieldId]>,
29}
30
31/// Single batch produced by [`ColumnStream`].
32pub struct ColumnStreamBatch<'stream> {
33    start: usize,
34    row_ids: &'stream [RowId],
35    batch: RecordBatch,
36}
37
38impl<'table, P> ColumnStream<'table, P>
39where
40    P: Pager<Blob = EntryHandle> + Send + Sync,
41{
42    pub(crate) fn new(
43        store: &'table ColumnStore<P>,
44        ctx: MultiGatherContext,
45        row_ids: Vec<RowId>,
46        chunk_size: usize,
47        policy: GatherNullPolicy,
48        logical_fields: Arc<[LogicalFieldId]>,
49    ) -> Self {
50        Self {
51            store,
52            ctx,
53            row_ids,
54            position: 0,
55            chunk_size,
56            policy,
57            logical_fields,
58        }
59    }
60
61    /// Total number of row IDs covered by this stream.
62    #[inline]
63    pub fn total_rows(&self) -> usize {
64        self.row_ids.len()
65    }
66
67    /// Remaining number of row IDs that have not yet been yielded.
68    #[inline]
69    pub fn remaining_rows(&self) -> usize {
70        self.row_ids.len().saturating_sub(self.position)
71    }
72
73    /// Logical fields produced by this stream.
74    #[inline]
75    pub fn logical_fields(&self) -> &[LogicalFieldId] {
76        &self.logical_fields
77    }
78
79    /// Fetch the next chunk of rows, if any remain.
80    pub fn next_batch(&mut self) -> LlkvResult<Option<ColumnStreamBatch<'_>>> {
81        while self.position < self.row_ids.len() {
82            let start = self.position;
83            let end = (start + self.chunk_size).min(self.row_ids.len());
84            let window = &self.row_ids[start..end];
85
86            let batch =
87                self.store
88                    .gather_rows_with_reusable_context(&mut self.ctx, window, self.policy)?;
89
90            self.position = end;
91
92            if batch.num_rows() == 0 && matches!(self.policy, GatherNullPolicy::DropNulls) {
93                // All rows dropped; continue to the next chunk to avoid yielding empties.
94                continue;
95            }
96
97            return Ok(Some(ColumnStreamBatch {
98                start,
99                row_ids: window,
100                batch,
101            }));
102        }
103
104        Ok(None)
105    }
106}
107
108impl<'stream> ColumnStreamBatch<'stream> {
109    #[inline]
110    pub fn row_ids(&self) -> &'stream [RowId] {
111        self.row_ids
112    }
113
114    #[inline]
115    pub fn row_offset(&self) -> usize {
116        self.start
117    }
118
119    #[inline]
120    pub fn len(&self) -> usize {
121        self.row_ids.len()
122    }
123
124    #[inline]
125    pub fn is_empty(&self) -> bool {
126        self.row_ids.is_empty()
127    }
128
129    #[inline]
130    pub fn batch(&self) -> &RecordBatch {
131        &self.batch
132    }
133
134    #[inline]
135    pub fn into_batch(self) -> RecordBatch {
136        self.batch
137    }
138}