llkv_table/stream/
column.rs1use std::sync::Arc;
2
3use arrow::array::RecordBatch;
4
5use llkv_column_map::ColumnStore;
6use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext};
7use llkv_result::Result as LlkvResult;
8use llkv_storage::pager::{MemPager, Pager};
9use llkv_types::ids::{LogicalFieldId, RowId};
10use simd_r_drive_entry_handle::EntryHandle;
11
12use croaring::Treemap;
13
14pub enum RowIdIter<'a> {
16 Owned(std::vec::IntoIter<RowId>),
17 Borrowed(Box<dyn Iterator<Item = RowId> + 'a>),
18}
19
20impl<'a> Iterator for RowIdIter<'a> {
21 type Item = RowId;
22
23 #[inline]
24 fn next(&mut self) -> Option<Self::Item> {
25 match self {
26 Self::Owned(iter) => iter.next(),
27 Self::Borrowed(iter) => iter.next(),
28 }
29 }
30}
31
32impl From<Treemap> for RowIdIter<'static> {
33 fn from(map: Treemap) -> Self {
34 Self::Owned(map.iter().collect::<Vec<_>>().into_iter())
37 }
38}
39
40impl<'a> From<&'a Treemap> for RowIdIter<'a> {
41 fn from(map: &'a Treemap) -> Self {
42 Self::Borrowed(Box::new(map.iter()))
43 }
44}
45
46pub trait RowIdStreamSource<'a> {
47 fn count(&self) -> RowId;
48 fn into_iter_source(self) -> RowIdIter<'a>;
49}
50
51impl RowIdStreamSource<'static> for Treemap {
52 fn count(&self) -> RowId {
53 self.cardinality()
54 }
55 fn into_iter_source(self) -> RowIdIter<'static> {
56 RowIdIter::Owned(self.iter().collect::<Vec<_>>().into_iter())
59 }
60}
61
62impl<'a> RowIdStreamSource<'a> for &'a Treemap {
63 fn count(&self) -> RowId {
64 self.cardinality()
65 }
66 fn into_iter_source(self) -> RowIdIter<'a> {
67 RowIdIter::Borrowed(Box::new(self.iter()))
68 }
69}
70
71pub struct ColumnStream<'table, 'a, P = MemPager>
78where
79 P: Pager<Blob = EntryHandle> + Send + Sync,
80{
81 store: &'table ColumnStore<P>,
82 ctx: MultiGatherContext,
83 row_ids: RowIdIter<'a>,
84 position: usize,
85 total_rows: usize,
86 chunk_size: usize,
87 policy: GatherNullPolicy,
88 logical_fields: Arc<[LogicalFieldId]>,
89}
90
91pub struct ColumnStreamBatch {
93 start: usize,
94 row_ids: Vec<RowId>,
95 batch: RecordBatch,
96}
97
98impl<'table, 'a, P> ColumnStream<'table, 'a, P>
99where
100 P: Pager<Blob = EntryHandle> + Send + Sync,
101{
102 pub(crate) fn new(
103 store: &'table ColumnStore<P>,
104 ctx: MultiGatherContext,
105 row_ids: RowIdIter<'a>,
106 total_rows: usize,
107 chunk_size: usize,
108 policy: GatherNullPolicy,
109 logical_fields: Arc<[LogicalFieldId]>,
110 ) -> Self {
111 Self {
112 store,
113 ctx,
114 row_ids,
115 position: 0,
116 total_rows,
117 chunk_size,
118 policy,
119 logical_fields,
120 }
121 }
122
123 #[inline]
125 pub fn total_rows(&self) -> usize {
126 self.total_rows
127 }
128
129 #[inline]
131 pub fn remaining_rows(&self) -> usize {
132 self.total_rows.saturating_sub(self.position)
133 }
134
135 #[inline]
137 pub fn logical_fields(&self) -> &[LogicalFieldId] {
138 &self.logical_fields
139 }
140
141 pub fn next_batch(&mut self) -> LlkvResult<Option<ColumnStreamBatch>> {
143 loop {
144 let mut window = Vec::with_capacity(self.chunk_size);
145 for _ in 0..self.chunk_size {
146 if let Some(rid) = self.row_ids.next() {
147 window.push(rid);
148 } else {
149 break;
150 }
151 }
152
153 if window.is_empty() {
154 return Ok(None);
155 }
156
157 let start = self.position;
158 self.position += window.len();
159
160 let batch = self.store.gather_rows_with_reusable_context(
161 &mut self.ctx,
162 &window,
163 self.policy,
164 )?;
165
166 if batch.num_rows() == 0 && matches!(self.policy, GatherNullPolicy::DropNulls) {
167 continue;
169 }
170
171 return Ok(Some(ColumnStreamBatch {
172 start,
173 row_ids: window,
174 batch,
175 }));
176 }
177 }
178}
179
180impl ColumnStreamBatch {
181 #[inline]
182 pub fn row_ids(&self) -> &[RowId] {
183 &self.row_ids
184 }
185
186 #[inline]
187 pub fn row_offset(&self) -> usize {
188 self.start
189 }
190
191 #[inline]
192 pub fn len(&self) -> usize {
193 self.row_ids.len()
194 }
195
196 #[inline]
197 pub fn is_empty(&self) -> bool {
198 self.row_ids.is_empty()
199 }
200
201 #[inline]
202 pub fn batch(&self) -> &RecordBatch {
203 &self.batch
204 }
205
206 #[inline]
207 pub fn into_batch(self) -> RecordBatch {
208 self.batch
209 }
210}