Skip to main content

reifydb_sdk/testing/
callbacks.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	alloc::{Layout, alloc, dealloc, realloc as system_realloc},
6	slice::from_raw_parts,
7};
8
9#[unsafe(no_mangle)]
10extern "C" fn test_alloc(size: usize) -> *mut u8 {
11	if size == 0 {
12		return ptr::null_mut();
13	}
14
15	let layout = match Layout::from_size_align(size, 8) {
16		Ok(layout) => layout,
17		Err(_) => return ptr::null_mut(),
18	};
19
20	unsafe { alloc(layout) }
21}
22
23#[unsafe(no_mangle)]
24unsafe extern "C" fn test_free(ptr: *mut u8, size: usize) {
25	if ptr.is_null() || size == 0 {
26		return;
27	}
28
29	let layout = match Layout::from_size_align(size, 8) {
30		Ok(layout) => layout,
31		Err(_) => return,
32	};
33
34	unsafe { dealloc(ptr, layout) }
35}
36
37#[unsafe(no_mangle)]
38unsafe extern "C" fn test_realloc(ptr: *mut u8, old_size: usize, new_size: usize) -> *mut u8 {
39	if ptr.is_null() {
40		return test_alloc(new_size);
41	}
42
43	if new_size == 0 {
44		unsafe { test_free(ptr, old_size) };
45		return ptr::null_mut();
46	}
47
48	let old_layout = match Layout::from_size_align(old_size, 8) {
49		Ok(layout) => layout,
50		Err(_) => return ptr::null_mut(),
51	};
52
53	let new_layout = match Layout::from_size_align(new_size, 8) {
54		Ok(layout) => layout,
55		Err(_) => return ptr::null_mut(),
56	};
57
58	unsafe { system_realloc(ptr, old_layout, new_layout.size()) }
59}
60
61unsafe fn get_test_context(ctx: *mut ContextFFI) -> &'static TestContext {
62	unsafe {
63		let txn_ptr = (*ctx).txn_ptr;
64		&*(txn_ptr as *const TestContext)
65	}
66}
67
68#[unsafe(no_mangle)]
69extern "C" fn test_state_get(
70	_operator_id: u64,
71	ctx: *mut ContextFFI,
72	key_ptr: *const u8,
73	key_len: usize,
74	output: *mut BufferFFI,
75) -> i32 {
76	if ctx.is_null() || key_ptr.is_null() || output.is_null() {
77		return FFI_ERROR_NULL_PTR;
78	}
79
80	unsafe {
81		let test_ctx = get_test_context(ctx);
82
83		let key_bytes = from_raw_parts(key_ptr, key_len);
84		let key = EncodedKey::new(key_bytes.to_vec());
85
86		match test_ctx.get_state(&key) {
87			Some(value_bytes) => {
88				let value_ptr = test_alloc(value_bytes.len());
89				if value_ptr.is_null() {
90					return -2;
91				}
92
93				ptr::copy_nonoverlapping(value_bytes.as_ptr(), value_ptr, value_bytes.len());
94
95				(*output).ptr = value_ptr;
96				(*output).len = value_bytes.len();
97				(*output).cap = value_bytes.len();
98
99				FFI_OK
100			}
101			None => FFI_NOT_FOUND,
102		}
103	}
104}
105
106#[unsafe(no_mangle)]
107extern "C" fn test_state_set(
108	_operator_id: u64,
109	ctx: *mut ContextFFI,
110	key_ptr: *const u8,
111	key_len: usize,
112	value_ptr: *const u8,
113	value_len: usize,
114) -> i32 {
115	if ctx.is_null() || key_ptr.is_null() || value_ptr.is_null() {
116		return FFI_ERROR_NULL_PTR;
117	}
118
119	unsafe {
120		let test_ctx = get_test_context(ctx);
121
122		let key_bytes = from_raw_parts(key_ptr, key_len);
123		let key = EncodedKey::new(key_bytes.to_vec());
124
125		let value_bytes = from_raw_parts(value_ptr, value_len);
126
127		test_ctx.set_state(key, value_bytes.to_vec());
128
129		FFI_OK
130	}
131}
132
133#[unsafe(no_mangle)]
134extern "C" fn test_state_remove(_operator_id: u64, ctx: *mut ContextFFI, key_ptr: *const u8, key_len: usize) -> i32 {
135	if ctx.is_null() || key_ptr.is_null() {
136		return FFI_ERROR_NULL_PTR;
137	}
138
139	unsafe {
140		let test_ctx = get_test_context(ctx);
141
142		let key_bytes = from_raw_parts(key_ptr, key_len);
143		let key = EncodedKey::new(key_bytes.to_vec());
144
145		test_ctx.remove_state(&key);
146
147		FFI_OK
148	}
149}
150
151#[unsafe(no_mangle)]
152extern "C" fn test_state_clear(_operator_id: u64, ctx: *mut ContextFFI) -> i32 {
153	if ctx.is_null() {
154		return FFI_ERROR_NULL_PTR;
155	}
156
157	unsafe {
158		let test_ctx = get_test_context(ctx);
159		test_ctx.clear_state();
160		FFI_OK
161	}
162}
163
164fn test_internal_envelope(operator_id: u64, user_key_bytes: &[u8]) -> EncodedKey {
165	FlowNodeInternalStateKey::new(FlowNodeId(operator_id), user_key_bytes.to_vec()).encode()
166}
167
168#[unsafe(no_mangle)]
169extern "C" fn test_internal_state_get(
170	operator_id: u64,
171	ctx: *mut ContextFFI,
172	key_ptr: *const u8,
173	key_len: usize,
174	output: *mut BufferFFI,
175) -> i32 {
176	if ctx.is_null() || key_ptr.is_null() || output.is_null() {
177		return FFI_ERROR_NULL_PTR;
178	}
179
180	unsafe {
181		let test_ctx = get_test_context(ctx);
182		let key_bytes = from_raw_parts(key_ptr, key_len);
183		let envelope = test_internal_envelope(operator_id, key_bytes);
184
185		match test_ctx.get_state(&envelope) {
186			Some(value_bytes) => {
187				let value_ptr = test_alloc(value_bytes.len());
188				if value_ptr.is_null() {
189					return -2;
190				}
191				ptr::copy_nonoverlapping(value_bytes.as_ptr(), value_ptr, value_bytes.len());
192				(*output).ptr = value_ptr;
193				(*output).len = value_bytes.len();
194				(*output).cap = value_bytes.len();
195				FFI_OK
196			}
197			None => FFI_NOT_FOUND,
198		}
199	}
200}
201
202#[unsafe(no_mangle)]
203extern "C" fn test_internal_state_set(
204	operator_id: u64,
205	ctx: *mut ContextFFI,
206	key_ptr: *const u8,
207	key_len: usize,
208	value_ptr: *const u8,
209	value_len: usize,
210) -> i32 {
211	if ctx.is_null() || key_ptr.is_null() || value_ptr.is_null() {
212		return FFI_ERROR_NULL_PTR;
213	}
214
215	unsafe {
216		let test_ctx = get_test_context(ctx);
217		let key_bytes = from_raw_parts(key_ptr, key_len);
218		let envelope = test_internal_envelope(operator_id, key_bytes);
219		let value_bytes = from_raw_parts(value_ptr, value_len);
220		test_ctx.set_state(envelope, value_bytes.to_vec());
221		FFI_OK
222	}
223}
224
225#[unsafe(no_mangle)]
226extern "C" fn test_internal_state_remove(
227	operator_id: u64,
228	ctx: *mut ContextFFI,
229	key_ptr: *const u8,
230	key_len: usize,
231) -> i32 {
232	if ctx.is_null() || key_ptr.is_null() {
233		return FFI_ERROR_NULL_PTR;
234	}
235
236	unsafe {
237		let test_ctx = get_test_context(ctx);
238		let key_bytes = from_raw_parts(key_ptr, key_len);
239		let envelope = test_internal_envelope(operator_id, key_bytes);
240		test_ctx.remove_state(&envelope);
241		FFI_OK
242	}
243}
244
245#[unsafe(no_mangle)]
246extern "C" fn test_internal_state_prefix(
247	operator_id: u64,
248	ctx: *mut ContextFFI,
249	prefix_ptr: *const u8,
250	prefix_len: usize,
251	iterator_out: *mut *mut StateIteratorFFI,
252) -> i32 {
253	if ctx.is_null() || iterator_out.is_null() {
254		return FFI_ERROR_NULL_PTR;
255	}
256
257	unsafe {
258		let test_ctx = get_test_context(ctx);
259		let user_prefix = if prefix_ptr.is_null() || prefix_len == 0 {
260			vec![]
261		} else {
262			from_raw_parts(prefix_ptr, prefix_len).to_vec()
263		};
264		let envelope_prefix = test_internal_envelope(operator_id, &user_prefix);
265		let envelope_bytes = envelope_prefix.as_ref();
266
267		let state_store = test_ctx.state_store();
268		let state = state_store.lock().unwrap();
269
270		let mut items: Vec<(Vec<u8>, Vec<u8>)> = state
271			.iter()
272			.filter(|(key, _)| key.starts_with(envelope_bytes))
273			.map(|(key, value)| (key.to_vec(), value.0.to_vec()))
274			.collect();
275
276		items.sort_by(|a, b| a.0.cmp(&b.0));
277
278		let iter = Box::new(TestStateIterator {
279			items,
280			position: 0,
281		});
282
283		*iterator_out = Box::into_raw(iter) as *mut StateIteratorFFI;
284
285		FFI_OK
286	}
287}
288
289#[repr(C)]
290struct TestStateIterator {
291	items: Vec<(Vec<u8>, Vec<u8>)>,
292
293	position: usize,
294}
295
296#[unsafe(no_mangle)]
297extern "C" fn test_state_prefix(
298	_operator_id: u64,
299	ctx: *mut ContextFFI,
300	prefix_ptr: *const u8,
301	prefix_len: usize,
302	iterator_out: *mut *mut StateIteratorFFI,
303) -> i32 {
304	if ctx.is_null() || iterator_out.is_null() {
305		return FFI_ERROR_NULL_PTR;
306	}
307
308	unsafe {
309		let test_ctx = get_test_context(ctx);
310
311		let prefix_bytes = if prefix_ptr.is_null() || prefix_len == 0 {
312			vec![]
313		} else {
314			from_raw_parts(prefix_ptr, prefix_len).to_vec()
315		};
316
317		let state_store = test_ctx.state_store();
318		let state = state_store.lock().unwrap();
319
320		let mut items: Vec<(Vec<u8>, Vec<u8>)> = state
321			.iter()
322			.filter(|(key, _)| {
323				if prefix_bytes.is_empty() {
324					true
325				} else {
326					key.starts_with(&prefix_bytes)
327				}
328			})
329			.map(|(key, value)| (key.to_vec(), value.0.to_vec()))
330			.collect();
331
332		items.sort_by(|a, b| a.0.cmp(&b.0));
333
334		let iter = Box::new(TestStateIterator {
335			items,
336			position: 0,
337		});
338
339		*iterator_out = Box::into_raw(iter) as *mut StateIteratorFFI;
340
341		FFI_OK
342	}
343}
344
345#[unsafe(no_mangle)]
346extern "C" fn test_state_iterator_next(
347	iterator: *mut StateIteratorFFI,
348	key_out: *mut BufferFFI,
349	value_out: *mut BufferFFI,
350) -> i32 {
351	if iterator.is_null() || key_out.is_null() || value_out.is_null() {
352		return FFI_ERROR_NULL_PTR;
353	}
354
355	unsafe {
356		let iter = &mut *(iterator as *mut TestStateIterator);
357
358		if iter.position >= iter.items.len() {
359			return FFI_END_OF_ITERATION;
360		}
361
362		let (key, value) = &iter.items[iter.position];
363		iter.position += 1;
364
365		let key_ptr = test_alloc(key.len());
366		if key_ptr.is_null() {
367			return -2;
368		}
369		ptr::copy_nonoverlapping(key.as_ptr(), key_ptr, key.len());
370		(*key_out).ptr = key_ptr;
371		(*key_out).len = key.len();
372		(*key_out).cap = key.len();
373
374		let value_ptr = test_alloc(value.len());
375		if value_ptr.is_null() {
376			test_free(key_ptr, key.len());
377			return -2;
378		}
379		ptr::copy_nonoverlapping(value.as_ptr(), value_ptr, value.len());
380		(*value_out).ptr = value_ptr;
381		(*value_out).len = value.len();
382		(*value_out).cap = value.len();
383
384		FFI_OK
385	}
386}
387
388#[unsafe(no_mangle)]
389extern "C" fn test_state_iterator_free(iterator: *mut StateIteratorFFI) {
390	if iterator.is_null() {
391		return;
392	}
393
394	unsafe {
395		let _ = Box::from_raw(iterator as *mut TestStateIterator);
396	}
397}
398
399const BOUND_UNBOUNDED: u8 = 0;
400const BOUND_INCLUDED: u8 = 1;
401const BOUND_EXCLUDED: u8 = 2;
402
403#[unsafe(no_mangle)]
404extern "C" fn test_state_range(
405	_operator_id: u64,
406	ctx: *mut ContextFFI,
407	start_ptr: *const u8,
408	start_len: usize,
409	start_bound_type: u8,
410	end_ptr: *const u8,
411	end_len: usize,
412	end_bound_type: u8,
413	iterator_out: *mut *mut StateIteratorFFI,
414) -> i32 {
415	if ctx.is_null() || iterator_out.is_null() {
416		return FFI_ERROR_NULL_PTR;
417	}
418
419	unsafe {
420		let test_ctx = get_test_context(ctx);
421
422		let start_key = if start_bound_type == BOUND_UNBOUNDED || start_ptr.is_null() {
423			None
424		} else {
425			Some(from_raw_parts(start_ptr, start_len).to_vec())
426		};
427
428		let end_key = if end_bound_type == BOUND_UNBOUNDED || end_ptr.is_null() {
429			None
430		} else {
431			Some(from_raw_parts(end_ptr, end_len).to_vec())
432		};
433
434		let state_store = test_ctx.state_store();
435		let state = state_store.lock().unwrap();
436
437		let mut items: Vec<(Vec<u8>, Vec<u8>)> = state
438			.iter()
439			.filter(|(key, _)| {
440				let key_bytes = key.as_slice();
441
442				let start_ok = match (&start_key, start_bound_type) {
443					(None, _) => true,
444					(Some(start), BOUND_INCLUDED) => key_bytes >= start.as_slice(),
445					(Some(start), BOUND_EXCLUDED) => key_bytes > start.as_slice(),
446					_ => true,
447				};
448
449				let end_ok = match (&end_key, end_bound_type) {
450					(None, _) => true,
451					(Some(end), BOUND_INCLUDED) => key_bytes <= end.as_slice(),
452					(Some(end), BOUND_EXCLUDED) => key_bytes < end.as_slice(),
453					_ => true,
454				};
455
456				start_ok && end_ok
457			})
458			.map(|(key, value)| (key.to_vec(), value.0.to_vec()))
459			.collect();
460
461		items.sort_by(|a, b| a.0.cmp(&b.0));
462
463		let iter = Box::new(TestStateIterator {
464			items,
465			position: 0,
466		});
467
468		*iterator_out = Box::into_raw(iter) as *mut StateIteratorFFI;
469
470		FFI_OK
471	}
472}
473
474#[unsafe(no_mangle)]
475unsafe extern "C" fn test_log_message(_operator_id: u64, _level: u32, _message: *const u8, _message_len: usize) {
476	unimplemented!()
477}
478
479extern "C" fn test_store_get(_ctx: *mut ContextFFI, _key: *const u8, _key_len: usize, _output: *mut BufferFFI) -> i32 {
480	unimplemented!()
481}
482
483extern "C" fn test_store_contains_key(
484	_ctx: *mut ContextFFI,
485	_key: *const u8,
486	_key_len: usize,
487	_result: *mut u8,
488) -> i32 {
489	unimplemented!()
490}
491
492extern "C" fn test_store_prefix(
493	_ctx: *mut ContextFFI,
494	_prefix: *const u8,
495	_prefix_len: usize,
496	_iterator_out: *mut *mut StoreIteratorFFI,
497) -> i32 {
498	unimplemented!()
499}
500
501extern "C" fn test_store_range(
502	_ctx: *mut ContextFFI,
503	_start: *const u8,
504	_start_len: usize,
505	_start_bound_type: u8,
506	_end: *const u8,
507	_end_len: usize,
508	_end_bound_type: u8,
509	_iterator_out: *mut *mut StoreIteratorFFI,
510) -> i32 {
511	unimplemented!()
512}
513
514extern "C" fn test_store_iterator_next(
515	_iterator: *mut StoreIteratorFFI,
516	_key_out: *mut BufferFFI,
517	_value_out: *mut BufferFFI,
518) -> i32 {
519	unimplemented!()
520}
521
522extern "C" fn test_store_iterator_free(_iterator: *mut StoreIteratorFFI) {
523	unimplemented!()
524}
525
526use std::ptr;
527
528use reifydb_abi::{
529	callbacks::{
530		builder::BuilderCallbacks, catalog::CatalogCallbacks, host::HostCallbacks, log::LogCallbacks,
531		memory::MemoryCallbacks, rql::RqlCallbacks, state::StateCallbacks, store::StoreCallbacks,
532	},
533	catalog::{namespace::NamespaceFFI, table::TableFFI},
534	constants::{FFI_END_OF_ITERATION, FFI_ERROR_INTERNAL, FFI_ERROR_NULL_PTR, FFI_NOT_FOUND, FFI_OK},
535	context::{
536		context::ContextFFI,
537		iterators::{StateIteratorFFI, StoreIteratorFFI},
538	},
539	data::buffer::BufferFFI,
540};
541use reifydb_core::{
542	encoded::key::EncodedKey,
543	interface::catalog::flow::FlowNodeId,
544	key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
545};
546
547use crate::testing::{
548	context::TestContext,
549	registry::{
550		test_acquire, test_bitvec_ptr, test_commit, test_data_ptr, test_emit_diff, test_grow, test_offsets_ptr,
551		test_release,
552	},
553};
554
555extern "C" fn test_catalog_find_namespace(
556	_ctx: *mut ContextFFI,
557	_namespace_id: u64,
558	_version: u64,
559	_output: *mut NamespaceFFI,
560) -> i32 {
561	1
562}
563
564extern "C" fn test_catalog_find_namespace_by_name(
565	_ctx: *mut ContextFFI,
566	_name_ptr: *const u8,
567	_name_len: usize,
568	_version: u64,
569	_output: *mut NamespaceFFI,
570) -> i32 {
571	1
572}
573
574extern "C" fn test_catalog_find_table(
575	_ctx: *mut ContextFFI,
576	_table_id: u64,
577	_version: u64,
578	_output: *mut TableFFI,
579) -> i32 {
580	1
581}
582
583extern "C" fn test_catalog_find_table_by_name(
584	_ctx: *mut ContextFFI,
585	_namespace_id: u64,
586	_name_ptr: *const u8,
587	_name_len: usize,
588	_version: u64,
589	_output: *mut TableFFI,
590) -> i32 {
591	1
592}
593
594extern "C" fn test_catalog_free_namespace(_namespace: *mut NamespaceFFI) {}
595
596extern "C" fn test_catalog_free_table(_table: *mut TableFFI) {}
597
598unsafe extern "C" fn test_rql(
599	_ctx: *mut ContextFFI,
600	_rql_ptr: *const u8,
601	_rql_len: usize,
602	_params_ptr: *const u8,
603	_params_len: usize,
604	_result_out: *mut BufferFFI,
605) -> i32 {
606	FFI_ERROR_INTERNAL
607}
608
609pub fn create_test_callbacks() -> HostCallbacks {
610	HostCallbacks {
611		memory: MemoryCallbacks {
612			alloc: test_alloc,
613			free: test_free,
614			realloc: test_realloc,
615		},
616		state: StateCallbacks {
617			get: test_state_get,
618			set: test_state_set,
619			remove: test_state_remove,
620			clear: test_state_clear,
621			prefix: test_state_prefix,
622			range: test_state_range,
623			iterator_next: test_state_iterator_next,
624			iterator_free: test_state_iterator_free,
625			internal_get: test_internal_state_get,
626			internal_set: test_internal_state_set,
627			internal_remove: test_internal_state_remove,
628			internal_prefix: test_internal_state_prefix,
629		},
630		log: LogCallbacks {
631			message: test_log_message,
632		},
633		store: StoreCallbacks {
634			get: test_store_get,
635			contains_key: test_store_contains_key,
636			prefix: test_store_prefix,
637			range: test_store_range,
638			iterator_next: test_store_iterator_next,
639			iterator_free: test_store_iterator_free,
640		},
641		catalog: CatalogCallbacks {
642			find_namespace: test_catalog_find_namespace,
643			find_namespace_by_name: test_catalog_find_namespace_by_name,
644			find_table: test_catalog_find_table,
645			find_table_by_name: test_catalog_find_table_by_name,
646			free_namespace: test_catalog_free_namespace,
647			free_table: test_catalog_free_table,
648		},
649		rql: RqlCallbacks {
650			rql: test_rql,
651		},
652		builder: BuilderCallbacks {
653			acquire: test_acquire,
654			data_ptr: test_data_ptr,
655			offsets_ptr: test_offsets_ptr,
656			bitvec_ptr: test_bitvec_ptr,
657			grow: test_grow,
658			commit: test_commit,
659			release: test_release,
660			emit_diff: test_emit_diff,
661		},
662	}
663}