1use core::ptr;
5
6use reifydb_abi::{
7 callbacks::builder::{ColumnBufferHandle, EmitDiffKind},
8 context::context::ContextFFI,
9 data::column::ColumnTypeCode,
10};
11use reifydb_type::value::row_number::RowNumber;
12
13use crate::{error::FFIError, operator::context::OperatorContext};
14
15pub struct ColumnBuilder<'a> {
16 ctx: *mut ContextFFI,
17 handle: *mut ColumnBufferHandle,
18 type_code: ColumnTypeCode,
19 committed: bool,
20 _phantom: core::marker::PhantomData<&'a ()>,
21}
22
23#[derive(Clone, Copy)]
24pub struct CommittedColumn {
25 handle: *mut ColumnBufferHandle,
26 row_count: usize,
27}
28
29impl<'a> ColumnBuilder<'a> {
30 pub fn data_ptr(&self) -> *mut u8 {
31 unsafe {
32 let cb = (*self.ctx).callbacks.builder;
33 (cb.data_ptr)(self.handle)
34 }
35 }
36
37 pub fn offsets_ptr(&self) -> *mut u64 {
38 unsafe {
39 let cb = (*self.ctx).callbacks.builder;
40 (cb.offsets_ptr)(self.handle)
41 }
42 }
43
44 pub fn bitvec_ptr(&self) -> *mut u8 {
45 unsafe {
46 let cb = (*self.ctx).callbacks.builder;
47 (cb.bitvec_ptr)(self.handle)
48 }
49 }
50
51 pub fn grow(&self, additional: usize) -> Result<(), FFIError> {
52 let code = unsafe {
53 let cb = (*self.ctx).callbacks.builder;
54 (cb.grow)(self.handle, additional)
55 };
56 if code != 0 {
57 return Err(FFIError::Other(format!("ColumnBuilder::grow failed: {}", code)));
58 }
59 Ok(())
60 }
61
62 pub fn commit(mut self, written_count: usize) -> Result<CommittedColumn, FFIError> {
63 let code = unsafe {
64 let cb = (*self.ctx).callbacks.builder;
65 (cb.commit)(self.handle, written_count)
66 };
67 self.committed = true;
68 if code != 0 {
69 return Err(FFIError::Other(format!("ColumnBuilder::commit failed: {}", code)));
70 }
71 Ok(CommittedColumn {
72 handle: self.handle,
73 row_count: written_count,
74 })
75 }
76
77 pub fn type_code(&self) -> ColumnTypeCode {
78 self.type_code
79 }
80
81 pub fn write_bool(self, values: &[bool]) -> Result<CommittedColumn, FFIError> {
82 debug_assert_eq!(self.type_code, ColumnTypeCode::Bool, "write_bool requires a Bool ColumnBuilder");
83 let byte_count = values.len().div_ceil(8);
84 let mut packed = vec![0u8; byte_count];
85 for (i, &b) in values.iter().enumerate() {
86 if b {
87 packed[i / 8] |= 1 << (i % 8);
88 }
89 }
90 if byte_count > 0 {
91 unsafe {
92 core::ptr::copy_nonoverlapping(packed.as_ptr(), self.data_ptr(), byte_count);
93 }
94 }
95 self.commit(values.len())
96 }
97
98 pub fn write_f32(self, values: &[f32]) -> Result<CommittedColumn, FFIError> {
99 debug_assert_eq!(self.type_code, ColumnTypeCode::Float4);
100 unsafe { write_scalar(self, values) }
101 }
102
103 pub fn write_f64(self, values: &[f64]) -> Result<CommittedColumn, FFIError> {
104 debug_assert_eq!(self.type_code, ColumnTypeCode::Float8);
105 unsafe { write_scalar(self, values) }
106 }
107
108 pub fn write_i8(self, values: &[i8]) -> Result<CommittedColumn, FFIError> {
109 debug_assert_eq!(self.type_code, ColumnTypeCode::Int1);
110 unsafe { write_scalar(self, values) }
111 }
112
113 pub fn write_i16(self, values: &[i16]) -> Result<CommittedColumn, FFIError> {
114 debug_assert_eq!(self.type_code, ColumnTypeCode::Int2);
115 unsafe { write_scalar(self, values) }
116 }
117
118 pub fn write_i32(self, values: &[i32]) -> Result<CommittedColumn, FFIError> {
119 debug_assert_eq!(self.type_code, ColumnTypeCode::Int4);
120 unsafe { write_scalar(self, values) }
121 }
122
123 pub fn write_i64(self, values: &[i64]) -> Result<CommittedColumn, FFIError> {
124 debug_assert_eq!(self.type_code, ColumnTypeCode::Int8);
125 unsafe { write_scalar(self, values) }
126 }
127
128 pub fn write_i128(self, values: &[i128]) -> Result<CommittedColumn, FFIError> {
129 debug_assert_eq!(self.type_code, ColumnTypeCode::Int16);
130 unsafe { write_scalar(self, values) }
131 }
132
133 pub fn write_u8(self, values: &[u8]) -> Result<CommittedColumn, FFIError> {
134 debug_assert_eq!(self.type_code, ColumnTypeCode::Uint1);
135 unsafe { write_scalar(self, values) }
136 }
137
138 pub fn write_u16(self, values: &[u16]) -> Result<CommittedColumn, FFIError> {
139 debug_assert_eq!(self.type_code, ColumnTypeCode::Uint2);
140 unsafe { write_scalar(self, values) }
141 }
142
143 pub fn write_u32(self, values: &[u32]) -> Result<CommittedColumn, FFIError> {
144 debug_assert_eq!(self.type_code, ColumnTypeCode::Uint4);
145 unsafe { write_scalar(self, values) }
146 }
147
148 pub fn write_u64(self, values: &[u64]) -> Result<CommittedColumn, FFIError> {
149 debug_assert_eq!(self.type_code, ColumnTypeCode::Uint8);
150 unsafe { write_scalar(self, values) }
151 }
152
153 pub fn write_u128(self, values: &[u128]) -> Result<CommittedColumn, FFIError> {
154 debug_assert_eq!(self.type_code, ColumnTypeCode::Uint16);
155 unsafe { write_scalar(self, values) }
156 }
157
158 pub fn write_utf8<S: AsRef<str>>(self, values: &[S]) -> Result<CommittedColumn, FFIError> {
159 debug_assert_eq!(self.type_code, ColumnTypeCode::Utf8, "write_utf8 requires a Utf8 ColumnBuilder");
160 write_var_len(self, values.iter().map(|s| s.as_ref().as_bytes()))
161 }
162
163 pub fn write_blob<B: AsRef<[u8]>>(self, values: &[B]) -> Result<CommittedColumn, FFIError> {
164 debug_assert_eq!(self.type_code, ColumnTypeCode::Blob, "write_blob requires a Blob ColumnBuilder");
165 write_var_len(self, values.iter().map(|b| b.as_ref()))
166 }
167
168 pub fn set_defined(&self, defined: &[bool]) {
169 let bytes = defined.len().div_ceil(8);
170 if bytes == 0 {
171 return;
172 }
173 let mut packed = vec![0u8; bytes];
174 for (i, &b) in defined.iter().enumerate() {
175 if b {
176 packed[i / 8] |= 1 << (i % 8);
177 }
178 }
179 unsafe {
180 core::ptr::copy_nonoverlapping(packed.as_ptr(), self.bitvec_ptr(), bytes);
181 }
182 }
183}
184
185unsafe fn write_scalar<T: Copy>(col: ColumnBuilder<'_>, values: &[T]) -> Result<CommittedColumn, FFIError> {
186 let bytes = core::mem::size_of_val(values);
187 if bytes > 0 {
188 unsafe {
189 core::ptr::copy_nonoverlapping(values.as_ptr() as *const u8, col.data_ptr(), bytes);
190 }
191 }
192 col.commit(values.len())
193}
194
195fn write_var_len<'b, I>(col: ColumnBuilder<'_>, items: I) -> Result<CommittedColumn, FFIError>
196where
197 I: IntoIterator<Item = &'b [u8]>,
198{
199 let items: Vec<&[u8]> = items.into_iter().collect();
200 let total: usize = items.iter().map(|b| b.len()).sum();
201 let needed = total.max(items.len());
202 if needed > 0 {
203 col.grow(needed)?;
204 }
205 let mut cursor = 0usize;
206 unsafe {
207 let data = col.data_ptr();
208 let offsets = col.offsets_ptr();
209 core::ptr::write(offsets, 0u64);
210 for (i, bytes) in items.iter().enumerate() {
211 if !bytes.is_empty() {
212 core::ptr::copy_nonoverlapping(bytes.as_ptr(), data.add(cursor), bytes.len());
213 }
214 cursor += bytes.len();
215 core::ptr::write(offsets.add(i + 1), cursor as u64);
216 }
217 }
218 col.commit(items.len())
219}
220
221impl<'a> Drop for ColumnBuilder<'a> {
222 fn drop(&mut self) {
223 if !self.committed {
224 unsafe {
225 let cb = (*self.ctx).callbacks.builder;
226 (cb.release)(self.handle);
227 }
228 }
229 }
230}
231
232pub struct ColumnsBuilder<'a> {
233 ctx: *mut ContextFFI,
234 _phantom: core::marker::PhantomData<&'a mut ()>,
235}
236
237impl<'a> ColumnsBuilder<'a> {
238 pub fn new(ctx: &'a mut OperatorContext) -> Self {
239 Self {
240 ctx: ctx.ctx,
241 _phantom: core::marker::PhantomData,
242 }
243 }
244
245 pub fn from_raw_ctx(ctx: *mut ContextFFI) -> Self {
249 Self {
250 ctx,
251 _phantom: core::marker::PhantomData,
252 }
253 }
254
255 pub fn acquire(&mut self, type_code: ColumnTypeCode, capacity: usize) -> Result<ColumnBuilder<'_>, FFIError> {
258 let handle = unsafe {
259 let cb = (*self.ctx).callbacks.builder;
260 (cb.acquire)(self.ctx, type_code, capacity)
261 };
262 if handle.is_null() {
263 return Err(FFIError::Other(format!(
264 "ColumnsBuilder::acquire failed for type {:?}",
265 type_code
266 )));
267 }
268 Ok(ColumnBuilder {
269 ctx: self.ctx,
270 handle,
271 type_code,
272 committed: false,
273 _phantom: core::marker::PhantomData,
274 })
275 }
276
277 pub fn emit_insert(
278 &mut self,
279 post: &[CommittedColumn],
280 names: &[&str],
281 row_numbers: &[RowNumber],
282 ) -> Result<(), FFIError> {
283 assert_eq!(post.len(), names.len(), "emit_insert: post columns and names must have matching length");
284 let row_count = post.first().map(|c| c.row_count).unwrap_or(0);
285 assert_eq!(row_numbers.len(), row_count, "emit_insert: row_numbers length must equal post row count");
286 self.emit_internal(EmitDiffKind::Insert, &[], &[], 0, &[], post, names, row_count, row_numbers)
287 }
288
289 #[allow(clippy::too_many_arguments)]
290 pub fn emit_update(
291 &mut self,
292 pre: &[CommittedColumn],
293 pre_names: &[&str],
294 pre_row_count: usize,
295 pre_row_numbers: &[RowNumber],
296 post: &[CommittedColumn],
297 post_names: &[&str],
298 post_row_count: usize,
299 post_row_numbers: &[RowNumber],
300 ) -> Result<(), FFIError> {
301 assert_eq!(pre.len(), pre_names.len(), "emit_update: pre columns/names mismatch");
302 assert_eq!(post.len(), post_names.len(), "emit_update: post columns/names mismatch");
303 assert_eq!(pre_row_numbers.len(), pre_row_count, "emit_update: pre_row_numbers length mismatch");
304 assert_eq!(post_row_numbers.len(), post_row_count, "emit_update: post_row_numbers length mismatch");
305 self.emit_internal(
306 EmitDiffKind::Update,
307 pre,
308 pre_names,
309 pre_row_count,
310 pre_row_numbers,
311 post,
312 post_names,
313 post_row_count,
314 post_row_numbers,
315 )
316 }
317
318 pub fn emit_remove(
319 &mut self,
320 pre: &[CommittedColumn],
321 names: &[&str],
322 row_numbers: &[RowNumber],
323 ) -> Result<(), FFIError> {
324 assert_eq!(pre.len(), names.len(), "emit_remove: pre columns and names must have matching length");
325 let row_count = pre.first().map(|c| c.row_count).unwrap_or(0);
326 assert_eq!(row_numbers.len(), row_count, "emit_remove: row_numbers length must equal pre row count");
327 self.emit_internal(EmitDiffKind::Remove, pre, names, row_count, row_numbers, &[], &[], 0, &[])
328 }
329
330 #[allow(clippy::too_many_arguments)]
331 fn emit_internal(
332 &mut self,
333 kind: EmitDiffKind,
334 pre: &[CommittedColumn],
335 pre_names: &[&str],
336 pre_row_count: usize,
337 pre_row_numbers: &[RowNumber],
338 post: &[CommittedColumn],
339 post_names: &[&str],
340 post_row_count: usize,
341 post_row_numbers: &[RowNumber],
342 ) -> Result<(), FFIError> {
343 let pre_handles: Vec<*mut ColumnBufferHandle> = pre.iter().map(|c| c.handle).collect();
344 let pre_name_ptrs: Vec<*const u8> = pre_names.iter().map(|n| n.as_ptr()).collect();
345 let pre_name_lens: Vec<usize> = pre_names.iter().map(|n| n.len()).collect();
346 let pre_row_nums: Vec<u64> = pre_row_numbers.iter().map(|r| r.0).collect();
347 let post_handles: Vec<*mut ColumnBufferHandle> = post.iter().map(|c| c.handle).collect();
348 let post_name_ptrs: Vec<*const u8> = post_names.iter().map(|n| n.as_ptr()).collect();
349 let post_name_lens: Vec<usize> = post_names.iter().map(|n| n.len()).collect();
350 let post_row_nums: Vec<u64> = post_row_numbers.iter().map(|r| r.0).collect();
351
352 let code = unsafe {
353 let cb = (*self.ctx).callbacks.builder;
354 (cb.emit_diff)(
355 self.ctx,
356 kind,
357 if pre_handles.is_empty() {
358 ptr::null()
359 } else {
360 pre_handles.as_ptr()
361 },
362 if pre_name_ptrs.is_empty() {
363 ptr::null()
364 } else {
365 pre_name_ptrs.as_ptr()
366 },
367 if pre_name_lens.is_empty() {
368 ptr::null()
369 } else {
370 pre_name_lens.as_ptr()
371 },
372 pre_handles.len(),
373 pre_row_count,
374 if pre_row_nums.is_empty() {
375 ptr::null()
376 } else {
377 pre_row_nums.as_ptr()
378 },
379 pre_row_nums.len(),
380 if post_handles.is_empty() {
381 ptr::null()
382 } else {
383 post_handles.as_ptr()
384 },
385 if post_name_ptrs.is_empty() {
386 ptr::null()
387 } else {
388 post_name_ptrs.as_ptr()
389 },
390 if post_name_lens.is_empty() {
391 ptr::null()
392 } else {
393 post_name_lens.as_ptr()
394 },
395 post_handles.len(),
396 post_row_count,
397 if post_row_nums.is_empty() {
398 ptr::null()
399 } else {
400 post_row_nums.as_ptr()
401 },
402 post_row_nums.len(),
403 )
404 };
405 if code != 0 {
406 return Err(FFIError::Other(format!("emit_diff failed: {}", code)));
407 }
408 Ok(())
409 }
410}