Skip to main content

reifydb_sdk/operator/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use core::ptr;
5
6use reifydb_abi::{
7	callbacks::builder::{ColumnBufferHandle, EmitDiffKind},
8	context::context::ContextFFI,
9	data::column::ColumnTypeCode,
10};
11use reifydb_type::value::row_number::RowNumber;
12
13use crate::{error::FFIError, operator::context::OperatorContext};
14
15pub struct ColumnBuilder<'a> {
16	ctx: *mut ContextFFI,
17	handle: *mut ColumnBufferHandle,
18	type_code: ColumnTypeCode,
19	committed: bool,
20	_phantom: core::marker::PhantomData<&'a ()>,
21}
22
23#[derive(Clone, Copy)]
24pub struct CommittedColumn {
25	handle: *mut ColumnBufferHandle,
26	row_count: usize,
27}
28
29impl<'a> ColumnBuilder<'a> {
30	pub fn data_ptr(&self) -> *mut u8 {
31		unsafe {
32			let cb = (*self.ctx).callbacks.builder;
33			(cb.data_ptr)(self.handle)
34		}
35	}
36
37	pub fn offsets_ptr(&self) -> *mut u64 {
38		unsafe {
39			let cb = (*self.ctx).callbacks.builder;
40			(cb.offsets_ptr)(self.handle)
41		}
42	}
43
44	pub fn bitvec_ptr(&self) -> *mut u8 {
45		unsafe {
46			let cb = (*self.ctx).callbacks.builder;
47			(cb.bitvec_ptr)(self.handle)
48		}
49	}
50
51	pub fn grow(&self, additional: usize) -> Result<(), FFIError> {
52		let code = unsafe {
53			let cb = (*self.ctx).callbacks.builder;
54			(cb.grow)(self.handle, additional)
55		};
56		if code != 0 {
57			return Err(FFIError::Other(format!("ColumnBuilder::grow failed: {}", code)));
58		}
59		Ok(())
60	}
61
62	pub fn commit(mut self, written_count: usize) -> Result<CommittedColumn, FFIError> {
63		let code = unsafe {
64			let cb = (*self.ctx).callbacks.builder;
65			(cb.commit)(self.handle, written_count)
66		};
67		self.committed = true;
68		if code != 0 {
69			return Err(FFIError::Other(format!("ColumnBuilder::commit failed: {}", code)));
70		}
71		Ok(CommittedColumn {
72			handle: self.handle,
73			row_count: written_count,
74		})
75	}
76
77	pub fn type_code(&self) -> ColumnTypeCode {
78		self.type_code
79	}
80
81	pub fn write_bool(self, values: &[bool]) -> Result<CommittedColumn, FFIError> {
82		debug_assert_eq!(self.type_code, ColumnTypeCode::Bool, "write_bool requires a Bool ColumnBuilder");
83		let byte_count = values.len().div_ceil(8);
84		let mut packed = vec![0u8; byte_count];
85		for (i, &b) in values.iter().enumerate() {
86			if b {
87				packed[i / 8] |= 1 << (i % 8);
88			}
89		}
90		if byte_count > 0 {
91			unsafe {
92				core::ptr::copy_nonoverlapping(packed.as_ptr(), self.data_ptr(), byte_count);
93			}
94		}
95		self.commit(values.len())
96	}
97
98	pub fn write_f32(self, values: &[f32]) -> Result<CommittedColumn, FFIError> {
99		debug_assert_eq!(self.type_code, ColumnTypeCode::Float4);
100		unsafe { write_scalar(self, values) }
101	}
102
103	pub fn write_f64(self, values: &[f64]) -> Result<CommittedColumn, FFIError> {
104		debug_assert_eq!(self.type_code, ColumnTypeCode::Float8);
105		unsafe { write_scalar(self, values) }
106	}
107
108	pub fn write_i8(self, values: &[i8]) -> Result<CommittedColumn, FFIError> {
109		debug_assert_eq!(self.type_code, ColumnTypeCode::Int1);
110		unsafe { write_scalar(self, values) }
111	}
112
113	pub fn write_i16(self, values: &[i16]) -> Result<CommittedColumn, FFIError> {
114		debug_assert_eq!(self.type_code, ColumnTypeCode::Int2);
115		unsafe { write_scalar(self, values) }
116	}
117
118	pub fn write_i32(self, values: &[i32]) -> Result<CommittedColumn, FFIError> {
119		debug_assert_eq!(self.type_code, ColumnTypeCode::Int4);
120		unsafe { write_scalar(self, values) }
121	}
122
123	pub fn write_i64(self, values: &[i64]) -> Result<CommittedColumn, FFIError> {
124		debug_assert_eq!(self.type_code, ColumnTypeCode::Int8);
125		unsafe { write_scalar(self, values) }
126	}
127
128	pub fn write_i128(self, values: &[i128]) -> Result<CommittedColumn, FFIError> {
129		debug_assert_eq!(self.type_code, ColumnTypeCode::Int16);
130		unsafe { write_scalar(self, values) }
131	}
132
133	pub fn write_u8(self, values: &[u8]) -> Result<CommittedColumn, FFIError> {
134		debug_assert_eq!(self.type_code, ColumnTypeCode::Uint1);
135		unsafe { write_scalar(self, values) }
136	}
137
138	pub fn write_u16(self, values: &[u16]) -> Result<CommittedColumn, FFIError> {
139		debug_assert_eq!(self.type_code, ColumnTypeCode::Uint2);
140		unsafe { write_scalar(self, values) }
141	}
142
143	pub fn write_u32(self, values: &[u32]) -> Result<CommittedColumn, FFIError> {
144		debug_assert_eq!(self.type_code, ColumnTypeCode::Uint4);
145		unsafe { write_scalar(self, values) }
146	}
147
148	pub fn write_u64(self, values: &[u64]) -> Result<CommittedColumn, FFIError> {
149		debug_assert_eq!(self.type_code, ColumnTypeCode::Uint8);
150		unsafe { write_scalar(self, values) }
151	}
152
153	pub fn write_u128(self, values: &[u128]) -> Result<CommittedColumn, FFIError> {
154		debug_assert_eq!(self.type_code, ColumnTypeCode::Uint16);
155		unsafe { write_scalar(self, values) }
156	}
157
158	pub fn write_utf8<S: AsRef<str>>(self, values: &[S]) -> Result<CommittedColumn, FFIError> {
159		debug_assert_eq!(self.type_code, ColumnTypeCode::Utf8, "write_utf8 requires a Utf8 ColumnBuilder");
160		write_var_len(self, values.iter().map(|s| s.as_ref().as_bytes()))
161	}
162
163	pub fn write_blob<B: AsRef<[u8]>>(self, values: &[B]) -> Result<CommittedColumn, FFIError> {
164		debug_assert_eq!(self.type_code, ColumnTypeCode::Blob, "write_blob requires a Blob ColumnBuilder");
165		write_var_len(self, values.iter().map(|b| b.as_ref()))
166	}
167
168	pub fn set_defined(&self, defined: &[bool]) {
169		let bytes = defined.len().div_ceil(8);
170		if bytes == 0 {
171			return;
172		}
173		let mut packed = vec![0u8; bytes];
174		for (i, &b) in defined.iter().enumerate() {
175			if b {
176				packed[i / 8] |= 1 << (i % 8);
177			}
178		}
179		unsafe {
180			core::ptr::copy_nonoverlapping(packed.as_ptr(), self.bitvec_ptr(), bytes);
181		}
182	}
183}
184
185unsafe fn write_scalar<T: Copy>(col: ColumnBuilder<'_>, values: &[T]) -> Result<CommittedColumn, FFIError> {
186	let bytes = core::mem::size_of_val(values);
187	if bytes > 0 {
188		unsafe {
189			core::ptr::copy_nonoverlapping(values.as_ptr() as *const u8, col.data_ptr(), bytes);
190		}
191	}
192	col.commit(values.len())
193}
194
195fn write_var_len<'b, I>(col: ColumnBuilder<'_>, items: I) -> Result<CommittedColumn, FFIError>
196where
197	I: IntoIterator<Item = &'b [u8]>,
198{
199	let items: Vec<&[u8]> = items.into_iter().collect();
200	let total: usize = items.iter().map(|b| b.len()).sum();
201	let needed = total.max(items.len());
202	if needed > 0 {
203		col.grow(needed)?;
204	}
205	let mut cursor = 0usize;
206	unsafe {
207		let data = col.data_ptr();
208		let offsets = col.offsets_ptr();
209		core::ptr::write(offsets, 0u64);
210		for (i, bytes) in items.iter().enumerate() {
211			if !bytes.is_empty() {
212				core::ptr::copy_nonoverlapping(bytes.as_ptr(), data.add(cursor), bytes.len());
213			}
214			cursor += bytes.len();
215			core::ptr::write(offsets.add(i + 1), cursor as u64);
216		}
217	}
218	col.commit(items.len())
219}
220
221impl<'a> Drop for ColumnBuilder<'a> {
222	fn drop(&mut self) {
223		if !self.committed {
224			unsafe {
225				let cb = (*self.ctx).callbacks.builder;
226				(cb.release)(self.handle);
227			}
228		}
229	}
230}
231
232pub struct ColumnsBuilder<'a> {
233	ctx: *mut ContextFFI,
234	_phantom: core::marker::PhantomData<&'a mut ()>,
235}
236
237impl<'a> ColumnsBuilder<'a> {
238	pub fn new(ctx: &'a mut OperatorContext) -> Self {
239		Self {
240			ctx: ctx.ctx,
241			_phantom: core::marker::PhantomData,
242		}
243	}
244
245	/// Create a builder from a raw `*mut ContextFFI`. Used by transform /
246	/// procedure contexts that aren't `OperatorContext`. The caller is
247	/// responsible for ensuring the pointer outlives `'a`.
248	pub fn from_raw_ctx(ctx: *mut ContextFFI) -> Self {
249		Self {
250			ctx,
251			_phantom: core::marker::PhantomData,
252		}
253	}
254
255	/// Acquire a fresh column builder of the given type with at least
256	/// `capacity` elements (bytes for var-len data buffer).
257	pub fn acquire(&mut self, type_code: ColumnTypeCode, capacity: usize) -> Result<ColumnBuilder<'_>, FFIError> {
258		let handle = unsafe {
259			let cb = (*self.ctx).callbacks.builder;
260			(cb.acquire)(self.ctx, type_code, capacity)
261		};
262		if handle.is_null() {
263			return Err(FFIError::Other(format!(
264				"ColumnsBuilder::acquire failed for type {:?}",
265				type_code
266			)));
267		}
268		Ok(ColumnBuilder {
269			ctx: self.ctx,
270			handle,
271			type_code,
272			committed: false,
273			_phantom: core::marker::PhantomData,
274		})
275	}
276
277	pub fn emit_insert(
278		&mut self,
279		post: &[CommittedColumn],
280		names: &[&str],
281		row_numbers: &[RowNumber],
282	) -> Result<(), FFIError> {
283		assert_eq!(post.len(), names.len(), "emit_insert: post columns and names must have matching length");
284		let row_count = post.first().map(|c| c.row_count).unwrap_or(0);
285		assert_eq!(row_numbers.len(), row_count, "emit_insert: row_numbers length must equal post row count");
286		self.emit_internal(EmitDiffKind::Insert, &[], &[], 0, &[], post, names, row_count, row_numbers)
287	}
288
289	#[allow(clippy::too_many_arguments)]
290	pub fn emit_update(
291		&mut self,
292		pre: &[CommittedColumn],
293		pre_names: &[&str],
294		pre_row_count: usize,
295		pre_row_numbers: &[RowNumber],
296		post: &[CommittedColumn],
297		post_names: &[&str],
298		post_row_count: usize,
299		post_row_numbers: &[RowNumber],
300	) -> Result<(), FFIError> {
301		assert_eq!(pre.len(), pre_names.len(), "emit_update: pre columns/names mismatch");
302		assert_eq!(post.len(), post_names.len(), "emit_update: post columns/names mismatch");
303		assert_eq!(pre_row_numbers.len(), pre_row_count, "emit_update: pre_row_numbers length mismatch");
304		assert_eq!(post_row_numbers.len(), post_row_count, "emit_update: post_row_numbers length mismatch");
305		self.emit_internal(
306			EmitDiffKind::Update,
307			pre,
308			pre_names,
309			pre_row_count,
310			pre_row_numbers,
311			post,
312			post_names,
313			post_row_count,
314			post_row_numbers,
315		)
316	}
317
318	pub fn emit_remove(
319		&mut self,
320		pre: &[CommittedColumn],
321		names: &[&str],
322		row_numbers: &[RowNumber],
323	) -> Result<(), FFIError> {
324		assert_eq!(pre.len(), names.len(), "emit_remove: pre columns and names must have matching length");
325		let row_count = pre.first().map(|c| c.row_count).unwrap_or(0);
326		assert_eq!(row_numbers.len(), row_count, "emit_remove: row_numbers length must equal pre row count");
327		self.emit_internal(EmitDiffKind::Remove, pre, names, row_count, row_numbers, &[], &[], 0, &[])
328	}
329
330	#[allow(clippy::too_many_arguments)]
331	fn emit_internal(
332		&mut self,
333		kind: EmitDiffKind,
334		pre: &[CommittedColumn],
335		pre_names: &[&str],
336		pre_row_count: usize,
337		pre_row_numbers: &[RowNumber],
338		post: &[CommittedColumn],
339		post_names: &[&str],
340		post_row_count: usize,
341		post_row_numbers: &[RowNumber],
342	) -> Result<(), FFIError> {
343		let pre_handles: Vec<*mut ColumnBufferHandle> = pre.iter().map(|c| c.handle).collect();
344		let pre_name_ptrs: Vec<*const u8> = pre_names.iter().map(|n| n.as_ptr()).collect();
345		let pre_name_lens: Vec<usize> = pre_names.iter().map(|n| n.len()).collect();
346		let pre_row_nums: Vec<u64> = pre_row_numbers.iter().map(|r| r.0).collect();
347		let post_handles: Vec<*mut ColumnBufferHandle> = post.iter().map(|c| c.handle).collect();
348		let post_name_ptrs: Vec<*const u8> = post_names.iter().map(|n| n.as_ptr()).collect();
349		let post_name_lens: Vec<usize> = post_names.iter().map(|n| n.len()).collect();
350		let post_row_nums: Vec<u64> = post_row_numbers.iter().map(|r| r.0).collect();
351
352		let code = unsafe {
353			let cb = (*self.ctx).callbacks.builder;
354			(cb.emit_diff)(
355				self.ctx,
356				kind,
357				if pre_handles.is_empty() {
358					ptr::null()
359				} else {
360					pre_handles.as_ptr()
361				},
362				if pre_name_ptrs.is_empty() {
363					ptr::null()
364				} else {
365					pre_name_ptrs.as_ptr()
366				},
367				if pre_name_lens.is_empty() {
368					ptr::null()
369				} else {
370					pre_name_lens.as_ptr()
371				},
372				pre_handles.len(),
373				pre_row_count,
374				if pre_row_nums.is_empty() {
375					ptr::null()
376				} else {
377					pre_row_nums.as_ptr()
378				},
379				pre_row_nums.len(),
380				if post_handles.is_empty() {
381					ptr::null()
382				} else {
383					post_handles.as_ptr()
384				},
385				if post_name_ptrs.is_empty() {
386					ptr::null()
387				} else {
388					post_name_ptrs.as_ptr()
389				},
390				if post_name_lens.is_empty() {
391					ptr::null()
392				} else {
393					post_name_lens.as_ptr()
394				},
395				post_handles.len(),
396				post_row_count,
397				if post_row_nums.is_empty() {
398					ptr::null()
399				} else {
400					post_row_nums.as_ptr()
401				},
402				post_row_nums.len(),
403			)
404		};
405		if code != 0 {
406			return Err(FFIError::Other(format!("emit_diff failed: {}", code)));
407		}
408		Ok(())
409	}
410}