llkv_table/stream/
column.rs1use std::sync::Arc;
2
3use arrow::array::RecordBatch;
4
5use crate::types::RowId;
6use llkv_column_map::store::{GatherNullPolicy, MultiGatherContext};
7use llkv_column_map::{ColumnStore, types::LogicalFieldId};
8use llkv_result::Result as LlkvResult;
9use llkv_storage::pager::{MemPager, Pager};
10use simd_r_drive_entry_handle::EntryHandle;
11
12pub struct ColumnStream<'table, P = MemPager>
19where
20 P: Pager<Blob = EntryHandle> + Send + Sync,
21{
22 store: &'table ColumnStore<P>,
23 ctx: MultiGatherContext,
24 row_ids: Vec<RowId>,
25 position: usize,
26 chunk_size: usize,
27 policy: GatherNullPolicy,
28 logical_fields: Arc<[LogicalFieldId]>,
29}
30
31pub struct ColumnStreamBatch<'stream> {
33 start: usize,
34 row_ids: &'stream [RowId],
35 batch: RecordBatch,
36}
37
38impl<'table, P> ColumnStream<'table, P>
39where
40 P: Pager<Blob = EntryHandle> + Send + Sync,
41{
42 pub(crate) fn new(
43 store: &'table ColumnStore<P>,
44 ctx: MultiGatherContext,
45 row_ids: Vec<RowId>,
46 chunk_size: usize,
47 policy: GatherNullPolicy,
48 logical_fields: Arc<[LogicalFieldId]>,
49 ) -> Self {
50 Self {
51 store,
52 ctx,
53 row_ids,
54 position: 0,
55 chunk_size,
56 policy,
57 logical_fields,
58 }
59 }
60
61 #[inline]
63 pub fn total_rows(&self) -> usize {
64 self.row_ids.len()
65 }
66
67 #[inline]
69 pub fn remaining_rows(&self) -> usize {
70 self.row_ids.len().saturating_sub(self.position)
71 }
72
73 #[inline]
75 pub fn logical_fields(&self) -> &[LogicalFieldId] {
76 &self.logical_fields
77 }
78
79 pub fn next_batch(&mut self) -> LlkvResult<Option<ColumnStreamBatch<'_>>> {
81 while self.position < self.row_ids.len() {
82 let start = self.position;
83 let end = (start + self.chunk_size).min(self.row_ids.len());
84 let window = &self.row_ids[start..end];
85
86 let batch =
87 self.store
88 .gather_rows_with_reusable_context(&mut self.ctx, window, self.policy)?;
89
90 self.position = end;
91
92 if batch.num_rows() == 0 && matches!(self.policy, GatherNullPolicy::DropNulls) {
93 continue;
95 }
96
97 return Ok(Some(ColumnStreamBatch {
98 start,
99 row_ids: window,
100 batch,
101 }));
102 }
103
104 Ok(None)
105 }
106}
107
108impl<'stream> ColumnStreamBatch<'stream> {
109 #[inline]
110 pub fn row_ids(&self) -> &'stream [RowId] {
111 self.row_ids
112 }
113
114 #[inline]
115 pub fn row_offset(&self) -> usize {
116 self.start
117 }
118
119 #[inline]
120 pub fn len(&self) -> usize {
121 self.row_ids.len()
122 }
123
124 #[inline]
125 pub fn is_empty(&self) -> bool {
126 self.row_ids.is_empty()
127 }
128
129 #[inline]
130 pub fn batch(&self) -> &RecordBatch {
131 &self.batch
132 }
133
134 #[inline]
135 pub fn into_batch(self) -> RecordBatch {
136 self.batch
137 }
138}