reifydb-extension 0.4.13

Unified extension loading for ReifyDB (FFI, WASM, native)
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::{cell::UnsafeCell, ffi::c_void, ptr};

use reifydb_abi::{
	callbacks::{builder::BuilderCallbacks, host::HostCallbacks, log::LogCallbacks, memory::MemoryCallbacks},
	context::context::ContextFFI,
	transform::{descriptor::TransformDescriptorFFI, vtable::TransformVTableFFI},
};
use reifydb_core::value::column::columns::Columns;
use reifydb_sdk::{error::FFIError, ffi::arena::Arena};
use reifydb_type::{self, Result};
use tracing::instrument;

use super::{Transform, context::TransformContext};
use crate::{
	ffi_callbacks::{
		builder::{
			BuilderRegistry, host_builder_acquire, host_builder_bitvec_ptr, host_builder_commit,
			host_builder_data_ptr, host_builder_emit_diff, host_builder_grow, host_builder_offsets_ptr,
			host_builder_release, with_registry,
		},
		panic::call_with_abort_on_panic,
		single_columns_from_registry,
	},
	procedure::ffi_callbacks::{logging, memory},
};

// One scratch arena per OS thread, shared across all NativeTransformFFI
// instances active on the same thread. Reset (`Arena::clear`) at the top of
// every `apply` so scaffolding memory is bounded to one call's worth.
thread_local! {
	static FFI_TRANSFORM_ARENA: UnsafeCell<Arena> = UnsafeCell::new(Arena::new());
}

/// FFI transform that wraps an external transform implementation.
pub struct NativeTransformFFI {
	#[allow(dead_code)]
	descriptor: TransformDescriptorFFI,
	vtable: TransformVTableFFI,
	instance: *mut c_void,
	/// Per-instance output collector. The guest builds output columns via
	/// `BuilderCallbacks` (acquire/data_ptr/commit/emit_diff); after the
	/// vtable call returns the host drains a single Insert-shaped diff.
	builder_registry: BuilderRegistry,
	/// Pre-built FFI context. `operator_id` and `callbacks` are written
	/// once in `new`. `clock_now_nanos` is refreshed per call.
	cached_ctx: UnsafeCell<ContextFFI>,
}

impl NativeTransformFFI {
	pub fn new(descriptor: TransformDescriptorFFI, instance: *mut c_void) -> Self {
		let vtable = descriptor.vtable;

		Self {
			descriptor,
			vtable,
			instance,
			builder_registry: BuilderRegistry::new(),
			cached_ctx: UnsafeCell::new(ContextFFI {
				txn_ptr: ptr::null_mut(),
				executor_ptr: ptr::null(),
				operator_id: 0,
				clock_now_nanos: 0,
				callbacks: pure_host_callbacks(),
			}),
		}
	}

	#[allow(dead_code)]
	pub(crate) fn descriptor(&self) -> &TransformDescriptorFFI {
		&self.descriptor
	}
}

// SAFETY: NativeTransformFFI is only accessed from a single context at a time.
// The raw pointer, builder registry (Mutex-guarded), and cached ContextFFI
// (UnsafeCell, single-actor invariant) are not shared across threads.
unsafe impl Send for NativeTransformFFI {}
unsafe impl Sync for NativeTransformFFI {}

impl Drop for NativeTransformFFI {
	fn drop(&mut self) {
		if !self.instance.is_null() {
			unsafe { (self.vtable.destroy)(self.instance) };
		}
	}
}

impl Transform for NativeTransformFFI {
	#[instrument(name = "transform::ffi::apply", level = "debug", skip_all)]
	fn apply(&self, ctx: &TransformContext, input: Columns) -> Result<Columns> {
		// Reset the per-thread arena before marshaling input scaffolding.
		// SAFETY: single-threaded per call; no live pointers from a prior
		// call survive past `apply`'s return.
		FFI_TRANSFORM_ARENA.with(|cell| unsafe { (*cell.get()).clear() });
		let ffi_input = FFI_TRANSFORM_ARENA.with(|cell| unsafe { (*cell.get()).marshal_columns(&input) });

		// Refresh the clock from the runtime context so system-column
		// stamping inside the builder callbacks honours mock clocks
		// during tests.
		let ffi_ctx_ptr = self.cached_ctx.get();
		unsafe {
			(*ffi_ctx_ptr).clock_now_nanos = ctx.runtime_context.clock.now_nanos();
		}

		let result_code = with_registry(&self.builder_registry, || {
			call_with_abort_on_panic("transform::apply", || unsafe {
				(self.vtable.transform)(self.instance, ffi_ctx_ptr, &ffi_input)
			})
		});

		if result_code != 0 {
			let _ = self.builder_registry.drain();
			return Err(FFIError::Other(format!("FFI transform apply failed with code: {}", result_code))
				.into());
		}

		Ok(single_columns_from_registry(&self.builder_registry))
	}
}

/// Host callbacks for non-flow FFI extension points (transforms, procedures).
/// Reuses the real memory/log/builder callbacks; state, store, catalog, rql
/// are stubbed because there's no flow `FlowTransaction` to read from.
fn pure_host_callbacks() -> HostCallbacks {
	HostCallbacks {
		memory: MemoryCallbacks {
			alloc: memory::host_alloc,
			free: memory::host_free,
			realloc: memory::host_realloc,
		},
		state: stubs::state(),
		log: LogCallbacks {
			message: logging::host_log_message,
		},
		store: stubs::store(),
		catalog: stubs::catalog(),
		rql: stubs::rql(),
		builder: BuilderCallbacks {
			acquire: host_builder_acquire,
			data_ptr: host_builder_data_ptr,
			offsets_ptr: host_builder_offsets_ptr,
			bitvec_ptr: host_builder_bitvec_ptr,
			grow: host_builder_grow,
			commit: host_builder_commit,
			release: host_builder_release,
			emit_diff: host_builder_emit_diff,
		},
	}
}

pub(crate) mod stubs {
	use reifydb_abi::{
		callbacks::{
			catalog::CatalogCallbacks, rql::RqlCallbacks, state::StateCallbacks, store::StoreCallbacks,
		},
		catalog::{namespace::NamespaceFFI, table::TableFFI},
		constants::FFI_ERROR_INTERNAL,
		context::{
			context::ContextFFI,
			iterators::{StateIteratorFFI, StoreIteratorFFI},
		},
		data::buffer::BufferFFI,
	};

	pub fn state() -> StateCallbacks {
		StateCallbacks {
			get: state_get,
			set: state_set,
			remove: state_remove,
			clear: state_clear,
			prefix: state_prefix,
			range: state_range,
			iterator_next: state_iterator_next,
			iterator_free: state_iterator_free,
		}
	}

	extern "C" fn state_get(_: u64, _: *mut ContextFFI, _: *const u8, _: usize, _: *mut BufferFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_set(_: u64, _: *mut ContextFFI, _: *const u8, _: usize, _: *const u8, _: usize) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_remove(_: u64, _: *mut ContextFFI, _: *const u8, _: usize) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_clear(_: u64, _: *mut ContextFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_prefix(
		_: u64,
		_: *mut ContextFFI,
		_: *const u8,
		_: usize,
		_: *mut *mut StateIteratorFFI,
	) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_range(
		_: u64,
		_: *mut ContextFFI,
		_: *const u8,
		_: usize,
		_: u8,
		_: *const u8,
		_: usize,
		_: u8,
		_: *mut *mut StateIteratorFFI,
	) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_iterator_next(_: *mut StateIteratorFFI, _: *mut BufferFFI, _: *mut BufferFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn state_iterator_free(_: *mut StateIteratorFFI) {}

	pub fn store() -> StoreCallbacks {
		StoreCallbacks {
			get: store_get,
			contains_key: store_contains_key,
			prefix: store_prefix,
			range: store_range,
			iterator_next: store_iterator_next,
			iterator_free: store_iterator_free,
		}
	}

	extern "C" fn store_get(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut BufferFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn store_contains_key(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut u8) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn store_prefix(_: *mut ContextFFI, _: *const u8, _: usize, _: *mut *mut StoreIteratorFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn store_range(
		_: *mut ContextFFI,
		_: *const u8,
		_: usize,
		_: u8,
		_: *const u8,
		_: usize,
		_: u8,
		_: *mut *mut StoreIteratorFFI,
	) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn store_iterator_next(_: *mut StoreIteratorFFI, _: *mut BufferFFI, _: *mut BufferFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn store_iterator_free(_: *mut StoreIteratorFFI) {}

	pub fn catalog() -> CatalogCallbacks {
		CatalogCallbacks {
			find_namespace: catalog_find_namespace,
			find_namespace_by_name: catalog_find_namespace_by_name,
			find_table: catalog_find_table,
			find_table_by_name: catalog_find_table_by_name,
			free_namespace: catalog_free_namespace,
			free_table: catalog_free_table,
		}
	}

	extern "C" fn catalog_find_namespace(_: *mut ContextFFI, _: u64, _: u64, _: *mut NamespaceFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn catalog_find_namespace_by_name(
		_: *mut ContextFFI,
		_: *const u8,
		_: usize,
		_: u64,
		_: *mut NamespaceFFI,
	) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn catalog_find_table(_: *mut ContextFFI, _: u64, _: u64, _: *mut TableFFI) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn catalog_find_table_by_name(
		_: *mut ContextFFI,
		_: u64,
		_: *const u8,
		_: usize,
		_: u64,
		_: *mut TableFFI,
	) -> i32 {
		FFI_ERROR_INTERNAL
	}
	extern "C" fn catalog_free_namespace(_: *mut NamespaceFFI) {}
	extern "C" fn catalog_free_table(_: *mut TableFFI) {}

	pub fn rql() -> RqlCallbacks {
		RqlCallbacks {
			rql: rql_unsupported,
		}
	}

	unsafe extern "C" fn rql_unsupported(
		_: *mut ContextFFI,
		_: *const u8,
		_: usize,
		_: *const u8,
		_: usize,
		_: *mut BufferFFI,
	) -> i32 {
		FFI_ERROR_INTERNAL
	}
}