1use std::{
5 alloc::{Layout, alloc, dealloc, realloc as system_realloc},
6 slice::from_raw_parts,
7};
8
9#[unsafe(no_mangle)]
10extern "C" fn test_alloc(size: usize) -> *mut u8 {
11 if size == 0 {
12 return ptr::null_mut();
13 }
14
15 let layout = match Layout::from_size_align(size, 8) {
16 Ok(layout) => layout,
17 Err(_) => return ptr::null_mut(),
18 };
19
20 unsafe { alloc(layout) }
21}
22
23#[unsafe(no_mangle)]
24unsafe extern "C" fn test_free(ptr: *mut u8, size: usize) {
25 if ptr.is_null() || size == 0 {
26 return;
27 }
28
29 let layout = match Layout::from_size_align(size, 8) {
30 Ok(layout) => layout,
31 Err(_) => return,
32 };
33
34 unsafe { dealloc(ptr, layout) }
35}
36
37#[unsafe(no_mangle)]
38unsafe extern "C" fn test_realloc(ptr: *mut u8, old_size: usize, new_size: usize) -> *mut u8 {
39 if ptr.is_null() {
40 return test_alloc(new_size);
41 }
42
43 if new_size == 0 {
44 unsafe { test_free(ptr, old_size) };
45 return ptr::null_mut();
46 }
47
48 let old_layout = match Layout::from_size_align(old_size, 8) {
49 Ok(layout) => layout,
50 Err(_) => return ptr::null_mut(),
51 };
52
53 let new_layout = match Layout::from_size_align(new_size, 8) {
54 Ok(layout) => layout,
55 Err(_) => return ptr::null_mut(),
56 };
57
58 unsafe { system_realloc(ptr, old_layout, new_layout.size()) }
59}
60
61unsafe fn get_test_context(ctx: *mut ContextFFI) -> &'static TestContext {
62 unsafe {
63 let txn_ptr = (*ctx).txn_ptr;
64 &*(txn_ptr as *const TestContext)
65 }
66}
67
68#[unsafe(no_mangle)]
69extern "C" fn test_state_get(
70 _operator_id: u64,
71 ctx: *mut ContextFFI,
72 key_ptr: *const u8,
73 key_len: usize,
74 output: *mut BufferFFI,
75) -> i32 {
76 if ctx.is_null() || key_ptr.is_null() || output.is_null() {
77 return FFI_ERROR_NULL_PTR;
78 }
79
80 unsafe {
81 let test_ctx = get_test_context(ctx);
82
83 let key_bytes = from_raw_parts(key_ptr, key_len);
84 let key = EncodedKey::new(key_bytes.to_vec());
85
86 match test_ctx.get_state(&key) {
87 Some(value_bytes) => {
88 let value_ptr = test_alloc(value_bytes.len());
89 if value_ptr.is_null() {
90 return -2;
91 }
92
93 ptr::copy_nonoverlapping(value_bytes.as_ptr(), value_ptr, value_bytes.len());
94
95 (*output).ptr = value_ptr;
96 (*output).len = value_bytes.len();
97 (*output).cap = value_bytes.len();
98
99 FFI_OK
100 }
101 None => FFI_NOT_FOUND,
102 }
103 }
104}
105
106#[unsafe(no_mangle)]
107extern "C" fn test_state_set(
108 _operator_id: u64,
109 ctx: *mut ContextFFI,
110 key_ptr: *const u8,
111 key_len: usize,
112 value_ptr: *const u8,
113 value_len: usize,
114) -> i32 {
115 if ctx.is_null() || key_ptr.is_null() || value_ptr.is_null() {
116 return FFI_ERROR_NULL_PTR;
117 }
118
119 unsafe {
120 let test_ctx = get_test_context(ctx);
121
122 let key_bytes = from_raw_parts(key_ptr, key_len);
123 let key = EncodedKey::new(key_bytes.to_vec());
124
125 let value_bytes = from_raw_parts(value_ptr, value_len);
126
127 test_ctx.set_state(key, value_bytes.to_vec());
128
129 FFI_OK
130 }
131}
132
133#[unsafe(no_mangle)]
134extern "C" fn test_state_remove(_operator_id: u64, ctx: *mut ContextFFI, key_ptr: *const u8, key_len: usize) -> i32 {
135 if ctx.is_null() || key_ptr.is_null() {
136 return FFI_ERROR_NULL_PTR;
137 }
138
139 unsafe {
140 let test_ctx = get_test_context(ctx);
141
142 let key_bytes = from_raw_parts(key_ptr, key_len);
143 let key = EncodedKey::new(key_bytes.to_vec());
144
145 test_ctx.remove_state(&key);
146
147 FFI_OK
148 }
149}
150
151#[unsafe(no_mangle)]
152extern "C" fn test_state_clear(_operator_id: u64, ctx: *mut ContextFFI) -> i32 {
153 if ctx.is_null() {
154 return FFI_ERROR_NULL_PTR;
155 }
156
157 unsafe {
158 let test_ctx = get_test_context(ctx);
159 test_ctx.clear_state();
160 FFI_OK
161 }
162}
163
164fn test_internal_envelope(operator_id: u64, user_key_bytes: &[u8]) -> EncodedKey {
165 FlowNodeInternalStateKey::new(FlowNodeId(operator_id), user_key_bytes.to_vec()).encode()
166}
167
168#[unsafe(no_mangle)]
169extern "C" fn test_internal_state_get(
170 operator_id: u64,
171 ctx: *mut ContextFFI,
172 key_ptr: *const u8,
173 key_len: usize,
174 output: *mut BufferFFI,
175) -> i32 {
176 if ctx.is_null() || key_ptr.is_null() || output.is_null() {
177 return FFI_ERROR_NULL_PTR;
178 }
179
180 unsafe {
181 let test_ctx = get_test_context(ctx);
182 let key_bytes = from_raw_parts(key_ptr, key_len);
183 let envelope = test_internal_envelope(operator_id, key_bytes);
184
185 match test_ctx.get_state(&envelope) {
186 Some(value_bytes) => {
187 let value_ptr = test_alloc(value_bytes.len());
188 if value_ptr.is_null() {
189 return -2;
190 }
191 ptr::copy_nonoverlapping(value_bytes.as_ptr(), value_ptr, value_bytes.len());
192 (*output).ptr = value_ptr;
193 (*output).len = value_bytes.len();
194 (*output).cap = value_bytes.len();
195 FFI_OK
196 }
197 None => FFI_NOT_FOUND,
198 }
199 }
200}
201
202#[unsafe(no_mangle)]
203extern "C" fn test_internal_state_set(
204 operator_id: u64,
205 ctx: *mut ContextFFI,
206 key_ptr: *const u8,
207 key_len: usize,
208 value_ptr: *const u8,
209 value_len: usize,
210) -> i32 {
211 if ctx.is_null() || key_ptr.is_null() || value_ptr.is_null() {
212 return FFI_ERROR_NULL_PTR;
213 }
214
215 unsafe {
216 let test_ctx = get_test_context(ctx);
217 let key_bytes = from_raw_parts(key_ptr, key_len);
218 let envelope = test_internal_envelope(operator_id, key_bytes);
219 let value_bytes = from_raw_parts(value_ptr, value_len);
220 test_ctx.set_state(envelope, value_bytes.to_vec());
221 FFI_OK
222 }
223}
224
225#[unsafe(no_mangle)]
226extern "C" fn test_internal_state_remove(
227 operator_id: u64,
228 ctx: *mut ContextFFI,
229 key_ptr: *const u8,
230 key_len: usize,
231) -> i32 {
232 if ctx.is_null() || key_ptr.is_null() {
233 return FFI_ERROR_NULL_PTR;
234 }
235
236 unsafe {
237 let test_ctx = get_test_context(ctx);
238 let key_bytes = from_raw_parts(key_ptr, key_len);
239 let envelope = test_internal_envelope(operator_id, key_bytes);
240 test_ctx.remove_state(&envelope);
241 FFI_OK
242 }
243}
244
245#[unsafe(no_mangle)]
246extern "C" fn test_internal_state_prefix(
247 operator_id: u64,
248 ctx: *mut ContextFFI,
249 prefix_ptr: *const u8,
250 prefix_len: usize,
251 iterator_out: *mut *mut StateIteratorFFI,
252) -> i32 {
253 if ctx.is_null() || iterator_out.is_null() {
254 return FFI_ERROR_NULL_PTR;
255 }
256
257 unsafe {
258 let test_ctx = get_test_context(ctx);
259 let user_prefix = if prefix_ptr.is_null() || prefix_len == 0 {
260 vec![]
261 } else {
262 from_raw_parts(prefix_ptr, prefix_len).to_vec()
263 };
264 let envelope_prefix = test_internal_envelope(operator_id, &user_prefix);
265 let envelope_bytes = envelope_prefix.as_ref();
266
267 let state_store = test_ctx.state_store();
268 let state = state_store.lock().unwrap();
269
270 let mut items: Vec<(Vec<u8>, Vec<u8>)> = state
271 .iter()
272 .filter(|(key, _)| key.starts_with(envelope_bytes))
273 .map(|(key, value)| (key.to_vec(), value.0.to_vec()))
274 .collect();
275
276 items.sort_by(|a, b| a.0.cmp(&b.0));
277
278 let iter = Box::new(TestStateIterator {
279 items,
280 position: 0,
281 });
282
283 *iterator_out = Box::into_raw(iter) as *mut StateIteratorFFI;
284
285 FFI_OK
286 }
287}
288
289#[repr(C)]
290struct TestStateIterator {
291 items: Vec<(Vec<u8>, Vec<u8>)>,
292
293 position: usize,
294}
295
296#[unsafe(no_mangle)]
297extern "C" fn test_state_prefix(
298 _operator_id: u64,
299 ctx: *mut ContextFFI,
300 prefix_ptr: *const u8,
301 prefix_len: usize,
302 iterator_out: *mut *mut StateIteratorFFI,
303) -> i32 {
304 if ctx.is_null() || iterator_out.is_null() {
305 return FFI_ERROR_NULL_PTR;
306 }
307
308 unsafe {
309 let test_ctx = get_test_context(ctx);
310
311 let prefix_bytes = if prefix_ptr.is_null() || prefix_len == 0 {
312 vec![]
313 } else {
314 from_raw_parts(prefix_ptr, prefix_len).to_vec()
315 };
316
317 let state_store = test_ctx.state_store();
318 let state = state_store.lock().unwrap();
319
320 let mut items: Vec<(Vec<u8>, Vec<u8>)> = state
321 .iter()
322 .filter(|(key, _)| {
323 if prefix_bytes.is_empty() {
324 true
325 } else {
326 key.starts_with(&prefix_bytes)
327 }
328 })
329 .map(|(key, value)| (key.to_vec(), value.0.to_vec()))
330 .collect();
331
332 items.sort_by(|a, b| a.0.cmp(&b.0));
333
334 let iter = Box::new(TestStateIterator {
335 items,
336 position: 0,
337 });
338
339 *iterator_out = Box::into_raw(iter) as *mut StateIteratorFFI;
340
341 FFI_OK
342 }
343}
344
345#[unsafe(no_mangle)]
346extern "C" fn test_state_iterator_next(
347 iterator: *mut StateIteratorFFI,
348 key_out: *mut BufferFFI,
349 value_out: *mut BufferFFI,
350) -> i32 {
351 if iterator.is_null() || key_out.is_null() || value_out.is_null() {
352 return FFI_ERROR_NULL_PTR;
353 }
354
355 unsafe {
356 let iter = &mut *(iterator as *mut TestStateIterator);
357
358 if iter.position >= iter.items.len() {
359 return FFI_END_OF_ITERATION;
360 }
361
362 let (key, value) = &iter.items[iter.position];
363 iter.position += 1;
364
365 let key_ptr = test_alloc(key.len());
366 if key_ptr.is_null() {
367 return -2;
368 }
369 ptr::copy_nonoverlapping(key.as_ptr(), key_ptr, key.len());
370 (*key_out).ptr = key_ptr;
371 (*key_out).len = key.len();
372 (*key_out).cap = key.len();
373
374 let value_ptr = test_alloc(value.len());
375 if value_ptr.is_null() {
376 test_free(key_ptr, key.len());
377 return -2;
378 }
379 ptr::copy_nonoverlapping(value.as_ptr(), value_ptr, value.len());
380 (*value_out).ptr = value_ptr;
381 (*value_out).len = value.len();
382 (*value_out).cap = value.len();
383
384 FFI_OK
385 }
386}
387
388#[unsafe(no_mangle)]
389extern "C" fn test_state_iterator_free(iterator: *mut StateIteratorFFI) {
390 if iterator.is_null() {
391 return;
392 }
393
394 unsafe {
395 let _ = Box::from_raw(iterator as *mut TestStateIterator);
396 }
397}
398
399const BOUND_UNBOUNDED: u8 = 0;
400const BOUND_INCLUDED: u8 = 1;
401const BOUND_EXCLUDED: u8 = 2;
402
403#[unsafe(no_mangle)]
404extern "C" fn test_state_range(
405 _operator_id: u64,
406 ctx: *mut ContextFFI,
407 start_ptr: *const u8,
408 start_len: usize,
409 start_bound_type: u8,
410 end_ptr: *const u8,
411 end_len: usize,
412 end_bound_type: u8,
413 iterator_out: *mut *mut StateIteratorFFI,
414) -> i32 {
415 if ctx.is_null() || iterator_out.is_null() {
416 return FFI_ERROR_NULL_PTR;
417 }
418
419 unsafe {
420 let test_ctx = get_test_context(ctx);
421
422 let start_key = if start_bound_type == BOUND_UNBOUNDED || start_ptr.is_null() {
423 None
424 } else {
425 Some(from_raw_parts(start_ptr, start_len).to_vec())
426 };
427
428 let end_key = if end_bound_type == BOUND_UNBOUNDED || end_ptr.is_null() {
429 None
430 } else {
431 Some(from_raw_parts(end_ptr, end_len).to_vec())
432 };
433
434 let state_store = test_ctx.state_store();
435 let state = state_store.lock().unwrap();
436
437 let mut items: Vec<(Vec<u8>, Vec<u8>)> = state
438 .iter()
439 .filter(|(key, _)| {
440 let key_bytes = key.as_slice();
441
442 let start_ok = match (&start_key, start_bound_type) {
443 (None, _) => true,
444 (Some(start), BOUND_INCLUDED) => key_bytes >= start.as_slice(),
445 (Some(start), BOUND_EXCLUDED) => key_bytes > start.as_slice(),
446 _ => true,
447 };
448
449 let end_ok = match (&end_key, end_bound_type) {
450 (None, _) => true,
451 (Some(end), BOUND_INCLUDED) => key_bytes <= end.as_slice(),
452 (Some(end), BOUND_EXCLUDED) => key_bytes < end.as_slice(),
453 _ => true,
454 };
455
456 start_ok && end_ok
457 })
458 .map(|(key, value)| (key.to_vec(), value.0.to_vec()))
459 .collect();
460
461 items.sort_by(|a, b| a.0.cmp(&b.0));
462
463 let iter = Box::new(TestStateIterator {
464 items,
465 position: 0,
466 });
467
468 *iterator_out = Box::into_raw(iter) as *mut StateIteratorFFI;
469
470 FFI_OK
471 }
472}
473
474#[unsafe(no_mangle)]
475unsafe extern "C" fn test_log_message(_operator_id: u64, _level: u32, _message: *const u8, _message_len: usize) {
476 unimplemented!()
477}
478
479extern "C" fn test_store_get(_ctx: *mut ContextFFI, _key: *const u8, _key_len: usize, _output: *mut BufferFFI) -> i32 {
480 unimplemented!()
481}
482
483extern "C" fn test_store_contains_key(
484 _ctx: *mut ContextFFI,
485 _key: *const u8,
486 _key_len: usize,
487 _result: *mut u8,
488) -> i32 {
489 unimplemented!()
490}
491
492extern "C" fn test_store_prefix(
493 _ctx: *mut ContextFFI,
494 _prefix: *const u8,
495 _prefix_len: usize,
496 _iterator_out: *mut *mut StoreIteratorFFI,
497) -> i32 {
498 unimplemented!()
499}
500
501extern "C" fn test_store_range(
502 _ctx: *mut ContextFFI,
503 _start: *const u8,
504 _start_len: usize,
505 _start_bound_type: u8,
506 _end: *const u8,
507 _end_len: usize,
508 _end_bound_type: u8,
509 _iterator_out: *mut *mut StoreIteratorFFI,
510) -> i32 {
511 unimplemented!()
512}
513
514extern "C" fn test_store_iterator_next(
515 _iterator: *mut StoreIteratorFFI,
516 _key_out: *mut BufferFFI,
517 _value_out: *mut BufferFFI,
518) -> i32 {
519 unimplemented!()
520}
521
522extern "C" fn test_store_iterator_free(_iterator: *mut StoreIteratorFFI) {
523 unimplemented!()
524}
525
526use std::ptr;
527
528use reifydb_abi::{
529 callbacks::{
530 builder::BuilderCallbacks, catalog::CatalogCallbacks, host::HostCallbacks, log::LogCallbacks,
531 memory::MemoryCallbacks, rql::RqlCallbacks, state::StateCallbacks, store::StoreCallbacks,
532 },
533 catalog::{namespace::NamespaceFFI, table::TableFFI},
534 constants::{FFI_END_OF_ITERATION, FFI_ERROR_INTERNAL, FFI_ERROR_NULL_PTR, FFI_NOT_FOUND, FFI_OK},
535 context::{
536 context::ContextFFI,
537 iterators::{StateIteratorFFI, StoreIteratorFFI},
538 },
539 data::buffer::BufferFFI,
540};
541use reifydb_core::{
542 encoded::key::EncodedKey,
543 interface::catalog::flow::FlowNodeId,
544 key::{EncodableKey, flow_node_internal_state::FlowNodeInternalStateKey},
545};
546
547use crate::testing::{
548 context::TestContext,
549 registry::{
550 test_acquire, test_bitvec_ptr, test_commit, test_data_ptr, test_emit_diff, test_grow, test_offsets_ptr,
551 test_release,
552 },
553};
554
555extern "C" fn test_catalog_find_namespace(
556 _ctx: *mut ContextFFI,
557 _namespace_id: u64,
558 _version: u64,
559 _output: *mut NamespaceFFI,
560) -> i32 {
561 1
562}
563
564extern "C" fn test_catalog_find_namespace_by_name(
565 _ctx: *mut ContextFFI,
566 _name_ptr: *const u8,
567 _name_len: usize,
568 _version: u64,
569 _output: *mut NamespaceFFI,
570) -> i32 {
571 1
572}
573
574extern "C" fn test_catalog_find_table(
575 _ctx: *mut ContextFFI,
576 _table_id: u64,
577 _version: u64,
578 _output: *mut TableFFI,
579) -> i32 {
580 1
581}
582
583extern "C" fn test_catalog_find_table_by_name(
584 _ctx: *mut ContextFFI,
585 _namespace_id: u64,
586 _name_ptr: *const u8,
587 _name_len: usize,
588 _version: u64,
589 _output: *mut TableFFI,
590) -> i32 {
591 1
592}
593
594extern "C" fn test_catalog_free_namespace(_namespace: *mut NamespaceFFI) {}
595
596extern "C" fn test_catalog_free_table(_table: *mut TableFFI) {}
597
598unsafe extern "C" fn test_rql(
599 _ctx: *mut ContextFFI,
600 _rql_ptr: *const u8,
601 _rql_len: usize,
602 _params_ptr: *const u8,
603 _params_len: usize,
604 _result_out: *mut BufferFFI,
605) -> i32 {
606 FFI_ERROR_INTERNAL
607}
608
609pub fn create_test_callbacks() -> HostCallbacks {
610 HostCallbacks {
611 memory: MemoryCallbacks {
612 alloc: test_alloc,
613 free: test_free,
614 realloc: test_realloc,
615 },
616 state: StateCallbacks {
617 get: test_state_get,
618 set: test_state_set,
619 remove: test_state_remove,
620 clear: test_state_clear,
621 prefix: test_state_prefix,
622 range: test_state_range,
623 iterator_next: test_state_iterator_next,
624 iterator_free: test_state_iterator_free,
625 internal_get: test_internal_state_get,
626 internal_set: test_internal_state_set,
627 internal_remove: test_internal_state_remove,
628 internal_prefix: test_internal_state_prefix,
629 },
630 log: LogCallbacks {
631 message: test_log_message,
632 },
633 store: StoreCallbacks {
634 get: test_store_get,
635 contains_key: test_store_contains_key,
636 prefix: test_store_prefix,
637 range: test_store_range,
638 iterator_next: test_store_iterator_next,
639 iterator_free: test_store_iterator_free,
640 },
641 catalog: CatalogCallbacks {
642 find_namespace: test_catalog_find_namespace,
643 find_namespace_by_name: test_catalog_find_namespace_by_name,
644 find_table: test_catalog_find_table,
645 find_table_by_name: test_catalog_find_table_by_name,
646 free_namespace: test_catalog_free_namespace,
647 free_table: test_catalog_free_table,
648 },
649 rql: RqlCallbacks {
650 rql: test_rql,
651 },
652 builder: BuilderCallbacks {
653 acquire: test_acquire,
654 data_ptr: test_data_ptr,
655 offsets_ptr: test_offsets_ptr,
656 bitvec_ptr: test_bitvec_ptr,
657 grow: test_grow,
658 commit: test_commit,
659 release: test_release,
660 emit_diff: test_emit_diff,
661 },
662 }
663}