1use std::{cell::Cell, collections::HashMap, fmt, mem, ptr, slice, str, sync::Mutex};
5
6use postcard::from_bytes as postcard_decode;
7use reifydb_abi::{
8 callbacks::builder::{ColumnBufferHandle, EmitDiffKind},
9 context::context::ContextFFI,
10 data::column::ColumnTypeCode,
11};
12use reifydb_core::{
13 interface::change::{Diff, Diffs},
14 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
15};
16use reifydb_type::{
17 fragment::Fragment,
18 util::{bitvec::BitVec, cowvec::CowVec},
19 value::{
20 Value,
21 constraint::{bytes::MaxBytes, precision::Precision, scale::Scale},
22 container::{
23 any::AnyContainer, blob::BlobContainer, bool::BoolContainer, dictionary::DictionaryContainer,
24 identity_id::IdentityIdContainer, number::NumberContainer, temporal::TemporalContainer,
25 utf8::Utf8Container, uuid::UuidContainer,
26 },
27 date::Date,
28 datetime::DateTime,
29 decimal::Decimal,
30 dictionary::DictionaryEntryId,
31 duration::Duration,
32 identity::IdentityId,
33 int::Int,
34 is::IsNumber,
35 row_number::RowNumber,
36 time::Time,
37 uint::Uint,
38 uuid::{Uuid4, Uuid7},
39 },
40};
41use serde::de::DeserializeOwned;
42
43pub struct TestBuilderRegistry {
44 inner: Mutex<RegistryInner>,
45}
46
47struct RegistryInner {
48 slots: HashMap<u64, Slot>,
49 accumulator: Vec<EmittedDiff>,
50 next_id: u64,
51}
52
53enum Slot {
54 Active(Active),
55 Committed(Committed),
56}
57
58pub struct Active {
59 pub type_code: ColumnTypeCode,
60 pub data: Vec<u8>,
61 pub offsets: Option<Vec<u64>>,
62 pub bitvec: Option<Vec<u8>>,
63 pub generation: u64,
64}
65
66pub struct Committed {
67 pub buffer: ColumnBuffer,
68 pub row_count: usize,
69}
70
71pub struct EmittedDiff {
72 pub kind: EmitDiffKind,
73 pub pre: Option<Columns>,
74 pub post: Option<Columns>,
75}
76
77impl Default for TestBuilderRegistry {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl TestBuilderRegistry {
84 pub fn new() -> Self {
85 Self {
86 inner: Mutex::new(RegistryInner {
87 slots: HashMap::new(),
88 accumulator: Vec::new(),
89 next_id: 1,
90 }),
91 }
92 }
93
94 pub fn drain_diffs(&self) -> Vec<EmittedDiff> {
95 let mut inner = self.inner.lock().unwrap();
96 inner.slots.clear();
97 mem::take(&mut inner.accumulator)
98 }
99}
100
101#[derive(Clone, Copy)]
102struct Handle {
103 id: u64,
104 generation: u64,
105}
106
107impl Handle {
108 fn encode(self) -> *mut ColumnBufferHandle {
109 assert!(self.id != 0 && self.id < (1 << 48));
110 assert!(self.generation < (1 << 16));
111 (self.id | (self.generation << 48)) as *mut ColumnBufferHandle
112 }
113
114 fn decode(ptr: *mut ColumnBufferHandle) -> Self {
115 let packed = ptr as u64;
116 Self {
117 id: packed & ((1 << 48) - 1),
118 generation: packed >> 48,
119 }
120 }
121}
122
123thread_local! {
124 static REGISTRY: Cell<Option<&'static TestBuilderRegistry>> = const { Cell::new(None) };
125}
126
127pub fn with_registry<R>(registry: &TestBuilderRegistry, f: impl FnOnce() -> R) -> R {
128 let extended: &'static TestBuilderRegistry = unsafe { mem::transmute(registry) };
131 let prev = REGISTRY.with(|cell| cell.replace(Some(extended)));
132 let r = f();
133 REGISTRY.with(|cell| cell.set(prev));
134 r
135}
136
137fn current() -> Option<&'static TestBuilderRegistry> {
138 REGISTRY.with(|cell| cell.get())
139}
140
141fn elem_size_for(type_code: ColumnTypeCode) -> usize {
142 match type_code {
143 ColumnTypeCode::Bool => 1,
144 ColumnTypeCode::Float4 | ColumnTypeCode::Int4 | ColumnTypeCode::Uint4 | ColumnTypeCode::Date => 4,
145 ColumnTypeCode::Int1 | ColumnTypeCode::Uint1 => 1,
146 ColumnTypeCode::Int2 | ColumnTypeCode::Uint2 => 2,
147 ColumnTypeCode::Float8
148 | ColumnTypeCode::Int8
149 | ColumnTypeCode::Uint8
150 | ColumnTypeCode::DateTime
151 | ColumnTypeCode::Time => 8,
152 ColumnTypeCode::Int16 | ColumnTypeCode::Uint16 => 16,
153 ColumnTypeCode::Duration
154 | ColumnTypeCode::IdentityId
155 | ColumnTypeCode::Uuid4
156 | ColumnTypeCode::Uuid7
157 | ColumnTypeCode::DictionaryId => 16,
158 ColumnTypeCode::Utf8 | ColumnTypeCode::Blob => 1,
159 ColumnTypeCode::Int | ColumnTypeCode::Uint | ColumnTypeCode::Decimal | ColumnTypeCode::Any => 1,
160 ColumnTypeCode::Undefined => 1,
161 }
162}
163
164fn is_var_len(type_code: ColumnTypeCode) -> bool {
165 matches!(
166 type_code,
167 ColumnTypeCode::Utf8
168 | ColumnTypeCode::Blob
169 | ColumnTypeCode::Int | ColumnTypeCode::Uint
170 | ColumnTypeCode::Decimal
171 | ColumnTypeCode::Any | ColumnTypeCode::DictionaryId
172 )
173}
174
175pub(crate) unsafe extern "C" fn test_acquire(
176 _ctx: *mut ContextFFI,
177 type_code: ColumnTypeCode,
178 capacity: usize,
179) -> *mut ColumnBufferHandle {
180 let Some(registry) = current() else {
181 return ptr::null_mut();
182 };
183 let mut inner = registry.inner.lock().unwrap();
184 let id = inner.next_id;
185 inner.next_id = inner.next_id.checked_add(1).unwrap_or(1);
186
187 let elem = elem_size_for(type_code);
188 let active = Active {
189 type_code,
190 data: Vec::with_capacity(capacity.saturating_mul(elem)),
191 offsets: if is_var_len(type_code) {
192 let mut o = Vec::with_capacity(capacity + 1);
193 o.push(0);
194 Some(o)
195 } else {
196 None
197 },
198 bitvec: None,
199 generation: 1,
200 };
201 inner.slots.insert(id, Slot::Active(active));
202 Handle {
203 id,
204 generation: 1,
205 }
206 .encode()
207}
208
209pub(crate) unsafe extern "C" fn test_data_ptr(handle: *mut ColumnBufferHandle) -> *mut u8 {
210 let Some(registry) = current() else {
211 return ptr::null_mut();
212 };
213 let h = Handle::decode(handle);
214 let mut inner = registry.inner.lock().unwrap();
215 match inner.slots.get_mut(&h.id) {
216 Some(Slot::Active(a)) if a.generation == h.generation => a.data.as_mut_ptr(),
217 _ => ptr::null_mut(),
218 }
219}
220
221pub(crate) unsafe extern "C" fn test_offsets_ptr(handle: *mut ColumnBufferHandle) -> *mut u64 {
222 let Some(registry) = current() else {
223 return ptr::null_mut();
224 };
225 let h = Handle::decode(handle);
226 let mut inner = registry.inner.lock().unwrap();
227 match inner.slots.get_mut(&h.id) {
228 Some(Slot::Active(a)) if a.generation == h.generation => match &mut a.offsets {
229 Some(o) => o.as_mut_ptr(),
230 None => ptr::null_mut(),
231 },
232 _ => ptr::null_mut(),
233 }
234}
235
236pub(crate) unsafe extern "C" fn test_bitvec_ptr(handle: *mut ColumnBufferHandle) -> *mut u8 {
237 let Some(registry) = current() else {
238 return ptr::null_mut();
239 };
240 let h = Handle::decode(handle);
241 let mut inner = registry.inner.lock().unwrap();
242 match inner.slots.get_mut(&h.id) {
243 Some(Slot::Active(a)) if a.generation == h.generation => {
244 if a.bitvec.is_none() {
245 let cap = a.data.capacity() / elem_size_for(a.type_code).max(1);
246 a.bitvec = Some(vec![0u8; cap.div_ceil(8)]);
247 }
248 a.bitvec.as_mut().unwrap().as_mut_ptr()
249 }
250 _ => ptr::null_mut(),
251 }
252}
253
254pub(crate) unsafe extern "C" fn test_grow(handle: *mut ColumnBufferHandle, additional: usize) -> i32 {
255 let Some(registry) = current() else {
256 return -1;
257 };
258 let h = Handle::decode(handle);
259 let mut inner = registry.inner.lock().unwrap();
260 match inner.slots.get_mut(&h.id) {
261 Some(Slot::Active(a)) if a.generation == h.generation => {
262 let elem = elem_size_for(a.type_code);
263 a.data.reserve(additional.saturating_mul(elem));
264 if let Some(o) = a.offsets.as_mut() {
265 o.reserve(additional);
266 }
267 0
268 }
269 _ => -1,
270 }
271}
272
273pub(crate) unsafe extern "C" fn test_commit(handle: *mut ColumnBufferHandle, written_count: usize) -> i32 {
274 let Some(registry) = current() else {
275 return -1;
276 };
277 let h = Handle::decode(handle);
278 let mut inner = registry.inner.lock().unwrap();
279 let slot = match inner.slots.remove(&h.id) {
280 Some(s) => s,
281 None => return -1,
282 };
283 let mut active = match slot {
284 Slot::Active(a) if a.generation == h.generation => a,
285 other => {
286 inner.slots.insert(h.id, other);
287 return -1;
288 }
289 };
290
291 let elem = elem_size_for(active.type_code);
294 if let Some(offsets) = active.offsets.as_mut() {
297 let offsets_len = written_count + 1;
298 if offsets_len > offsets.capacity() {
299 return -1;
300 }
301 unsafe {
302 offsets.set_len(offsets_len);
303 }
304 }
305 let data_byte_len = if is_var_len(active.type_code) {
306 match active.offsets.as_ref() {
309 Some(o) if !o.is_empty() => *o.last().unwrap() as usize,
310 _ => 0,
311 }
312 } else {
313 written_count.saturating_mul(elem)
314 };
315 if data_byte_len > active.data.capacity() {
316 return -1;
318 }
319 unsafe {
320 active.data.set_len(data_byte_len);
321 }
322 if let Some(bitvec) = active.bitvec.as_mut() {
323 let needed = written_count.div_ceil(8);
324 if needed > bitvec.capacity() {
325 return -1;
326 }
327 unsafe {
328 bitvec.set_len(needed);
329 }
330 }
331
332 let buffer = match finalize_buffer(active.type_code, active.data, active.offsets, active.bitvec, written_count)
333 {
334 Some(b) => b,
335 None => return -1,
336 };
337 inner.slots.insert(
338 h.id,
339 Slot::Committed(Committed {
340 buffer,
341 row_count: written_count,
342 }),
343 );
344 0
345}
346
347pub(crate) unsafe extern "C" fn test_release(handle: *mut ColumnBufferHandle) {
348 let Some(registry) = current() else {
349 return;
350 };
351 let h = Handle::decode(handle);
352 let mut inner = registry.inner.lock().unwrap();
353 inner.slots.remove(&h.id);
354}
355
356pub(crate) unsafe extern "C" fn test_emit_diff(
357 _ctx: *mut ContextFFI,
358 kind: EmitDiffKind,
359 pre_handles_ptr: *const *mut ColumnBufferHandle,
360 pre_name_ptrs: *const *const u8,
361 pre_name_lens: *const usize,
362 pre_count: usize,
363 pre_row_count: usize,
364 pre_row_numbers_ptr: *const u64,
365 pre_row_numbers_len: usize,
366 post_handles_ptr: *const *mut ColumnBufferHandle,
367 post_name_ptrs: *const *const u8,
368 post_name_lens: *const usize,
369 post_count: usize,
370 post_row_count: usize,
371 post_row_numbers_ptr: *const u64,
372 post_row_numbers_len: usize,
373) -> i32 {
374 let Some(registry) = current() else {
375 return -1;
376 };
377 let mut inner = registry.inner.lock().unwrap();
378 let now = DateTime::default();
379
380 let pre = if pre_count > 0 {
381 let ptrs = ColumnsPtrs {
382 handles: pre_handles_ptr,
383 names: pre_name_ptrs,
384 name_lens: pre_name_lens,
385 count: pre_count,
386 };
387 match assemble(&mut inner, ptrs, pre_row_count, pre_row_numbers_ptr, pre_row_numbers_len, now) {
388 Ok(c) => Some(c),
389 Err(code) => return code,
390 }
391 } else {
392 None
393 };
394 let post = if post_count > 0 {
395 let ptrs = ColumnsPtrs {
396 handles: post_handles_ptr,
397 names: post_name_ptrs,
398 name_lens: post_name_lens,
399 count: post_count,
400 };
401 match assemble(&mut inner, ptrs, post_row_count, post_row_numbers_ptr, post_row_numbers_len, now) {
402 Ok(c) => Some(c),
403 Err(code) => return code,
404 }
405 } else {
406 None
407 };
408
409 inner.accumulator.push(EmittedDiff {
410 kind,
411 pre,
412 post,
413 });
414 0
415}
416
417struct ColumnsPtrs {
418 handles: *const *mut ColumnBufferHandle,
419 names: *const *const u8,
420 name_lens: *const usize,
421 count: usize,
422}
423
424fn assemble(
425 inner: &mut RegistryInner,
426 ptrs: ColumnsPtrs,
427 row_count: usize,
428 row_numbers_ptr: *const u64,
429 row_numbers_len: usize,
430 now: DateTime,
431) -> Result<Columns, i32> {
432 if ptrs.handles.is_null() || ptrs.names.is_null() || ptrs.name_lens.is_null() {
433 return Err(-1);
434 }
435 if row_numbers_len != row_count {
436 return Err(-1);
437 }
438 if row_count > 0 && row_numbers_ptr.is_null() {
439 return Err(-1);
440 }
441 let count = ptrs.count;
442 let handles = unsafe { slice::from_raw_parts(ptrs.handles, count) };
443 let names = unsafe { slice::from_raw_parts(ptrs.names, count) };
444 let lens = unsafe { slice::from_raw_parts(ptrs.name_lens, count) };
445
446 let mut cols: Vec<ColumnWithName> = Vec::with_capacity(count);
447 for i in 0..count {
448 let h = Handle::decode(handles[i]);
449 let slot = inner.slots.remove(&h.id).ok_or(-1)?;
450 let committed = match slot {
451 Slot::Committed(c) => c,
452 Slot::Active(a) => {
453 inner.slots.insert(h.id, Slot::Active(a));
454 return Err(-1);
455 }
456 };
457 let name = if names[i].is_null() || lens[i] == 0 {
458 ""
459 } else {
460 let s = unsafe { slice::from_raw_parts(names[i], lens[i]) };
461 str::from_utf8(s).unwrap_or("")
462 };
463 cols.push(ColumnWithName::new(Fragment::internal(name), committed.buffer));
464 }
465 let row_numbers: Vec<RowNumber> = if row_count == 0 {
466 Vec::new()
467 } else {
468 let raw = unsafe { slice::from_raw_parts(row_numbers_ptr, row_count) };
469 raw.iter().copied().map(RowNumber).collect()
470 };
471 let timestamps: Vec<DateTime> = vec![now; row_count];
472 Ok(Columns::with_system_columns(cols, row_numbers, timestamps.clone(), timestamps))
473}
474
475pub(crate) fn finalize_buffer(
477 type_code: ColumnTypeCode,
478 mut data: Vec<u8>,
479 offsets: Option<Vec<u64>>,
480 bitvec: Option<Vec<u8>>,
481 written_count: usize,
482) -> Option<ColumnBuffer> {
483 let make_option_wrapped = |inner: ColumnBuffer| match bitvec {
484 Some(bytes) => {
485 let bv = BitVec::from_raw(bytes, written_count);
486 ColumnBuffer::Option {
487 inner: Box::new(inner),
488 bitvec: bv,
489 }
490 }
491 None => inner,
492 };
493
494 let inner = match type_code {
495 ColumnTypeCode::Bool => {
496 let bv = BitVec::from_raw(data, written_count);
497 ColumnBuffer::Bool(BoolContainer::from_parts(bv))
498 }
499 ColumnTypeCode::Float4 => to_numeric::<f32>(&data, written_count, ColumnBuffer::Float4)?,
500 ColumnTypeCode::Float8 => to_numeric::<f64>(&data, written_count, ColumnBuffer::Float8)?,
501 ColumnTypeCode::Int1 => to_numeric::<i8>(&data, written_count, ColumnBuffer::Int1)?,
502 ColumnTypeCode::Int2 => to_numeric::<i16>(&data, written_count, ColumnBuffer::Int2)?,
503 ColumnTypeCode::Int4 => to_numeric::<i32>(&data, written_count, ColumnBuffer::Int4)?,
504 ColumnTypeCode::Int8 => to_numeric::<i64>(&data, written_count, ColumnBuffer::Int8)?,
505 ColumnTypeCode::Int16 => to_numeric::<i128>(&data, written_count, ColumnBuffer::Int16)?,
506 ColumnTypeCode::Uint1 => to_numeric::<u8>(&data, written_count, ColumnBuffer::Uint1)?,
507 ColumnTypeCode::Uint2 => to_numeric::<u16>(&data, written_count, ColumnBuffer::Uint2)?,
508 ColumnTypeCode::Uint4 => to_numeric::<u32>(&data, written_count, ColumnBuffer::Uint4)?,
509 ColumnTypeCode::Uint8 => to_numeric::<u64>(&data, written_count, ColumnBuffer::Uint8)?,
510 ColumnTypeCode::Uint16 => to_numeric::<u128>(&data, written_count, ColumnBuffer::Uint16)?,
511 ColumnTypeCode::Date => {
512 let v = bytes_to_vec::<Date>(&data, written_count)?;
513 ColumnBuffer::Date(TemporalContainer::from_parts(CowVec::new(v)))
514 }
515 ColumnTypeCode::DateTime => {
516 let v = bytes_to_vec::<DateTime>(&data, written_count)?;
517 ColumnBuffer::DateTime(TemporalContainer::from_parts(CowVec::new(v)))
518 }
519 ColumnTypeCode::Time => {
520 let v = bytes_to_vec::<Time>(&data, written_count)?;
521 ColumnBuffer::Time(TemporalContainer::from_parts(CowVec::new(v)))
522 }
523 ColumnTypeCode::Duration => {
524 let v = bytes_to_vec::<Duration>(&data, written_count)?;
525 ColumnBuffer::Duration(TemporalContainer::from_parts(CowVec::new(v)))
526 }
527 ColumnTypeCode::IdentityId => {
528 let v = bytes_to_vec::<IdentityId>(&data, written_count)?;
529 ColumnBuffer::IdentityId(IdentityIdContainer::from_parts(CowVec::new(v)))
530 }
531 ColumnTypeCode::Uuid4 => {
532 let v = bytes_to_vec::<Uuid4>(&data, written_count)?;
533 ColumnBuffer::Uuid4(UuidContainer::from_parts(CowVec::new(v)))
534 }
535 ColumnTypeCode::Uuid7 => {
536 let v = bytes_to_vec::<Uuid7>(&data, written_count)?;
537 ColumnBuffer::Uuid7(UuidContainer::from_parts(CowVec::new(v)))
538 }
539 ColumnTypeCode::Utf8 => {
540 let offsets = offsets.unwrap_or_else(|| vec![0u64]);
541 let payload_len = *offsets.last().unwrap_or(&0) as usize;
542 data.truncate(payload_len);
543 ColumnBuffer::Utf8 {
544 container: Utf8Container::from_bytes_offsets(data, offsets),
545 max_bytes: MaxBytes::MAX,
546 }
547 }
548 ColumnTypeCode::Blob => {
549 let offsets = offsets.unwrap_or_else(|| vec![0u64]);
550 let payload_len = *offsets.last().unwrap_or(&0) as usize;
551 data.truncate(payload_len);
552 ColumnBuffer::Blob {
553 container: BlobContainer::from_bytes_offsets(data, offsets),
554 max_bytes: MaxBytes::MAX,
555 }
556 }
557 ColumnTypeCode::Int => {
558 let v = postcard_per_element::<Int>(&data, &offsets, written_count)?;
559 ColumnBuffer::Int {
560 container: NumberContainer::from_vec(v),
561 max_bytes: MaxBytes::MAX,
562 }
563 }
564 ColumnTypeCode::Uint => {
565 let v = postcard_per_element::<Uint>(&data, &offsets, written_count)?;
566 ColumnBuffer::Uint {
567 container: NumberContainer::from_vec(v),
568 max_bytes: MaxBytes::MAX,
569 }
570 }
571 ColumnTypeCode::Decimal => {
572 let v = postcard_per_element::<Decimal>(&data, &offsets, written_count)?;
573 ColumnBuffer::Decimal {
574 container: NumberContainer::from_vec(v),
575 precision: Precision::MAX,
576 scale: Scale::MIN,
577 }
578 }
579 ColumnTypeCode::Any => {
580 let values: Vec<Value> = postcard_per_element::<Value>(&data, &offsets, written_count)?;
581 let boxed: Vec<Box<Value>> = values.into_iter().map(Box::new).collect();
582 ColumnBuffer::Any(AnyContainer::from_vec(boxed))
583 }
584 ColumnTypeCode::DictionaryId => {
585 let entries: Vec<DictionaryEntryId> =
586 postcard_per_element::<DictionaryEntryId>(&data, &offsets, written_count)?;
587 ColumnBuffer::DictionaryId(DictionaryContainer::from_vec(entries))
588 }
589 _ => return None,
590 };
591 Some(make_option_wrapped(inner))
592}
593
594fn postcard_per_element<T: DeserializeOwned>(data: &[u8], offsets: &Option<Vec<u64>>, count: usize) -> Option<Vec<T>> {
595 let offsets = offsets.as_ref()?;
596 if offsets.len() < count + 1 {
597 return None;
598 }
599 let mut out: Vec<T> = Vec::with_capacity(count);
600 for i in 0..count {
601 let start = offsets[i] as usize;
602 let end = offsets[i + 1] as usize;
603 if end > data.len() || start > end {
604 return None;
605 }
606 let value: T = postcard_decode(&data[start..end]).ok()?;
607 out.push(value);
608 }
609 Some(out)
610}
611
612fn bytes_to_vec<T: Copy>(data: &[u8], count: usize) -> Option<Vec<T>> {
613 let needed = count.checked_mul(mem::size_of::<T>())?;
614 if data.len() < needed {
615 return None;
616 }
617 let mut v: Vec<T> = Vec::with_capacity(count);
618 unsafe {
619 ptr::copy_nonoverlapping(data.as_ptr() as *const T, v.as_mut_ptr(), count);
620 v.set_len(count);
621 }
622 Some(v)
623}
624
625fn to_numeric<T: Copy + IsNumber + fmt::Debug + Default>(
626 data: &[u8],
627 count: usize,
628 wrap: fn(NumberContainer<T>) -> ColumnBuffer,
629) -> Option<ColumnBuffer> {
630 let v = bytes_to_vec::<T>(data, count)?;
631 Some(wrap(NumberContainer::from_parts(CowVec::new(v))))
632}
633
634pub fn into_diffs(emitted: Vec<EmittedDiff>) -> Diffs {
636 emitted.into_iter()
637 .map(|d| match d.kind {
638 EmitDiffKind::Insert => Diff::insert(d.post.unwrap_or_else(Columns::empty)),
639 EmitDiffKind::Update => Diff::update(
640 d.pre.unwrap_or_else(Columns::empty),
641 d.post.unwrap_or_else(Columns::empty),
642 ),
643 EmitDiffKind::Remove => Diff::remove(d.pre.unwrap_or_else(Columns::empty)),
644 })
645 .collect()
646}