nautilus_plugin/surfaces/custom_data.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Custom-data plug point.
17//!
18//! A plug-in registers one [`CustomDataVTable`] per concrete custom-data
19//! type. The host calls through the vtable to decode JSON envelopes from
20//! the wire and to encode/decode Arrow IPC batches for catalog persistence.
21
22#![allow(unsafe_code)]
23
24use std::marker::PhantomData;
25
26use crate::{
27 boundary::{BorrowedStr, OwnedBytes, PluginError, PluginErrorCode, PluginResult, Slice},
28 panic::{guard, guard_infallible},
29};
30
31/// Opaque handle to a single custom-data value owned by the plug-in.
32///
33/// The host never deref's this pointer; it only passes it back to the vtable
34/// through `clone`, `to_json`, `ts_event`, `ts_init`, `eq`, and `drop`.
35#[repr(C)]
36pub struct CustomDataHandle {
37 _opaque: [u8; 0],
38}
39
40/// Borrowed view of a plug-in custom-data value during `on_data` dispatch.
41///
42/// The value stays owned by the host-side wrapper. Plug-in code can inspect
43/// the type name and downcast to a concrete custom-data type that was declared
44/// in the same cdylib manifest.
45#[repr(C)]
46#[derive(Clone, Copy)]
47pub struct PluginCustomDataRef {
48 type_name: BorrowedStr<'static>,
49 vtable: *const CustomDataVTable,
50 handle: *const CustomDataHandle,
51}
52
53impl PluginCustomDataRef {
54 /// Constructs a borrowed custom-data reference from raw boundary parts.
55 ///
56 /// # Safety
57 ///
58 /// `handle` must be null or point at a live value allocated by `vtable`,
59 /// and `type_name` must name the same concrete custom-data type for the
60 /// duration of the call.
61 #[must_use]
62 pub unsafe fn from_raw_parts(
63 type_name: BorrowedStr<'static>,
64 vtable: *const CustomDataVTable,
65 handle: *const CustomDataHandle,
66 ) -> Self {
67 Self {
68 type_name,
69 vtable,
70 handle,
71 }
72 }
73
74 /// Returns the canonical custom-data type name.
75 #[must_use]
76 pub fn type_name(&self) -> &str {
77 // SAFETY: constructor requires process-lifetime valid UTF-8 storage.
78 unsafe { self.type_name.as_str() }
79 }
80
81 /// Returns whether this value was allocated by the vtable for `T`.
82 #[must_use]
83 pub fn is<T>(&self) -> bool
84 where
85 T: PluginCustomData + PartialEq + Clone,
86 {
87 self.vtable == custom_data_vtable::<T>()
88 }
89
90 /// Returns the value as `T` when it was allocated by `T`'s vtable.
91 #[must_use]
92 pub fn downcast_ref<T>(&self) -> Option<&T>
93 where
94 T: PluginCustomData + PartialEq + Clone,
95 {
96 if self.handle.is_null() || !self.is::<T>() {
97 return None;
98 }
99
100 // SAFETY: matching vtable proves the handle came from `T`'s
101 // generated custom-data thunks and is live for this callback.
102 Some(unsafe { &*self.handle.cast::<T>() })
103 }
104}
105
106/// Function table for a single custom-data type.
107///
108/// One static vtable per concrete type, generated by the `nautilus_plugin!`
109/// macro and pointed to from the corresponding
110/// [`crate::manifest::CustomDataRegistration`] entry.
111///
112/// Slots are nullable at the ABI type level so the host can reject malformed
113/// manifests with null callbacks before registering or invoking the vtable.
114/// Macro-generated vtables fill every required slot.
115#[repr(C)]
116pub struct CustomDataVTable {
117 /// Returns the canonical type name (e.g. `"MyTickType"`). Used for
118 /// routing, persistence path layout, and JSON envelope tagging.
119 pub type_name: Option<unsafe extern "C" fn() -> BorrowedStr<'static>>,
120
121 /// Returns the Arrow schema for this type as an Arrow IPC byte stream.
122 ///
123 /// Plug-ins serialize their `arrow::datatypes::Schema` via
124 /// `arrow::ipc::writer::StreamWriter` so host and plug-in stay agnostic
125 /// of each other's Arrow crate version.
126 pub schema_ipc: Option<unsafe extern "C" fn() -> PluginResult<OwnedBytes>>,
127
128 /// Decodes a single value from its JSON payload (no envelope) into a new
129 /// handle. The handle is owned by the host until passed to `drop`.
130 pub from_json: Option<
131 unsafe extern "C" fn(payload: BorrowedStr<'_>) -> PluginResult<*mut CustomDataHandle>,
132 >,
133
134 /// Encodes a batch of handles into an Arrow IPC byte stream.
135 pub encode_batch: Option<
136 unsafe extern "C" fn(
137 handles: Slice<'_, *const CustomDataHandle>,
138 ) -> PluginResult<OwnedBytes>,
139 >,
140
141 /// Decodes an Arrow IPC byte stream into a freshly allocated array of
142 /// owned handles, packed as `*mut CustomDataHandle` elements inside the
143 /// returned `OwnedBytes`. The host must pass every handle through
144 /// `drop_handle` to release its inner value, then drop the `OwnedBytes`
145 /// (or invoke its embedded `drop_fn` directly) so the array storage is
146 /// freed with the original allocator layout. Do not call
147 /// [`crate::boundary::drop_owned_bytes`] directly: the underlying
148 /// allocation is a `Vec<*mut CustomDataHandle>`, not a `Vec<u8>`, and the
149 /// vtable installs a matching `drop_fn` on the returned `OwnedBytes`.
150 pub decode_batch: Option<
151 unsafe extern "C" fn(
152 ipc_bytes: Slice<'_, u8>,
153 metadata: Slice<'_, MetadataEntry<'_>>,
154 ) -> PluginResult<OwnedBytes>,
155 >,
156
157 /// Returns the event timestamp of the value behind the handle.
158 pub ts_event: Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> u64>,
159
160 /// Returns the init timestamp of the value behind the handle.
161 pub ts_init: Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> u64>,
162
163 /// Serializes a single value to a JSON payload (no envelope).
164 pub to_json:
165 Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> PluginResult<OwnedBytes>>,
166
167 /// Clones the value behind the handle into a new owned handle.
168 pub clone_handle:
169 Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> *mut CustomDataHandle>,
170
171 /// Drops the value behind the handle, freeing all of its resources.
172 pub drop_handle: Option<unsafe extern "C" fn(handle: *mut CustomDataHandle)>,
173
174 /// Tests two handles of the same type for value equality.
175 pub eq_handles: Option<
176 unsafe extern "C" fn(lhs: *const CustomDataHandle, rhs: *const CustomDataHandle) -> bool,
177 >,
178}
179
180/// A single key/value pair in batch-decode metadata.
181#[repr(C)]
182#[derive(Clone, Copy)]
183pub struct MetadataEntry<'a> {
184 pub key: BorrowedStr<'a>,
185 pub value: BorrowedStr<'a>,
186}
187
188/// Author-facing trait for a custom-data type contributed by a plug-in.
189///
190/// Plug-in authors implement this on their data struct; the
191/// [`nautilus_plugin!`](crate::nautilus_plugin) macro generates the
192/// `extern "C"` thunks that adapt this trait to a [`CustomDataVTable`].
193///
194/// All trait methods run inside [`crate::panic::guard`] in the generated
195/// thunks, so a panic surfaces as a [`PluginError`] with code
196/// [`PluginErrorCode::Panic`] instead of unwinding through the FFI.
197pub trait PluginCustomData: 'static + Send + Sync + Sized {
198 /// Canonical type name. Must be unique across a Nautilus deployment.
199 const TYPE_NAME: &'static str;
200
201 /// Returns the event timestamp as UNIX nanoseconds.
202 fn ts_event(&self) -> u64;
203
204 /// Returns the init timestamp as UNIX nanoseconds.
205 fn ts_init(&self) -> u64;
206
207 /// Serializes this value to a JSON payload (no envelope, payload only).
208 fn to_json(&self) -> anyhow::Result<Vec<u8>>;
209
210 /// Deserializes a value from a JSON payload (no envelope).
211 fn from_json(payload: &[u8]) -> anyhow::Result<Self>;
212
213 /// Returns the Arrow schema for this type as an Arrow IPC byte stream.
214 fn schema_ipc() -> anyhow::Result<Vec<u8>>;
215
216 /// Encodes a batch of values into an Arrow IPC byte stream.
217 fn encode_batch(items: &[&Self]) -> anyhow::Result<Vec<u8>>;
218
219 /// Decodes an Arrow IPC byte stream into a vector of values.
220 fn decode_batch(ipc_bytes: &[u8], metadata: &[(String, String)]) -> anyhow::Result<Vec<Self>>;
221
222 /// Tests two values of this type for equality. Default implementation
223 /// requires `PartialEq`; override if a custom comparison is needed.
224 fn equals(&self, other: &Self) -> bool
225 where
226 Self: PartialEq,
227 {
228 self == other
229 }
230
231 /// Clones the value into a new heap allocation.
232 #[must_use]
233 fn clone_value(&self) -> Self
234 where
235 Self: Clone,
236 {
237 Clone::clone(self)
238 }
239}
240
241/// Returns a `&'static CustomDataVTable` for the given `PluginCustomData` type.
242///
243/// One static vtable per concrete `T`. The vtable lives in const-promoted
244/// static storage attached to a `PhantomData<T>`-tagged generic struct, so
245/// each monomorphisation gets its own table.
246///
247/// Naive `static FOO: OnceLock<CustomDataVTable> = OnceLock::new()` inside a
248/// generic function is unsound here: when `FOO`'s type does not mention `T`,
249/// Rust shares the static across every monomorphisation and the first
250/// initializer wins, so every later type collapses onto the first type's
251/// thunks.
252#[must_use]
253pub fn custom_data_vtable<T>() -> *const CustomDataVTable
254where
255 T: PluginCustomData + PartialEq + Clone,
256{
257 &VTableTag::<T>::VTABLE
258}
259
260struct VTableTag<T>(PhantomData<T>);
261
262impl<T> VTableTag<T>
263where
264 T: PluginCustomData + PartialEq + Clone,
265{
266 const VTABLE: CustomDataVTable = CustomDataVTable {
267 type_name: Some(type_name_thunk::<T>),
268 schema_ipc: Some(schema_ipc_thunk::<T>),
269 from_json: Some(from_json_thunk::<T>),
270 encode_batch: Some(encode_batch_thunk::<T>),
271 decode_batch: Some(decode_batch_thunk::<T>),
272 ts_event: Some(ts_event_thunk::<T>),
273 ts_init: Some(ts_init_thunk::<T>),
274 to_json: Some(to_json_thunk::<T>),
275 clone_handle: Some(clone_handle_thunk::<T>),
276 drop_handle: Some(drop_handle_thunk::<T>),
277 eq_handles: Some(eq_handles_thunk::<T>),
278 };
279}
280
281unsafe extern "C" fn type_name_thunk<T: PluginCustomData>() -> BorrowedStr<'static> {
282 BorrowedStr::from_str(T::TYPE_NAME)
283}
284
285unsafe extern "C" fn schema_ipc_thunk<T: PluginCustomData>() -> PluginResult<OwnedBytes> {
286 guard(|| {
287 T::schema_ipc()
288 .map(OwnedBytes::from_vec)
289 .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
290 })
291}
292
293unsafe extern "C" fn from_json_thunk<T: PluginCustomData>(
294 payload: BorrowedStr<'_>,
295) -> PluginResult<*mut CustomDataHandle> {
296 guard(|| {
297 // SAFETY: caller upholds liveness of `payload`'s storage across the call.
298 let bytes = unsafe { payload.as_str() }.as_bytes();
299 T::from_json(bytes)
300 .map(|v| Box::into_raw(Box::new(v)).cast::<CustomDataHandle>())
301 .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
302 })
303}
304
305unsafe extern "C" fn encode_batch_thunk<T: PluginCustomData>(
306 handles: Slice<'_, *const CustomDataHandle>,
307) -> PluginResult<OwnedBytes> {
308 guard(|| {
309 // SAFETY: caller upholds liveness of the handles slice and each handle.
310 let raw = unsafe { handles.as_slice() };
311 let items: Vec<&T> = raw
312 .iter()
313 .map(|p| {
314 // SAFETY: each handle originated from a `Box::into_raw(Box::new(T))`
315 // in `from_json_thunk` or `clone_handle_thunk`. The host promises
316 // they are still live.
317 unsafe { &*p.cast::<T>() }
318 })
319 .collect();
320 T::encode_batch(&items)
321 .map(OwnedBytes::from_vec)
322 .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
323 })
324}
325
326unsafe extern "C" fn decode_batch_thunk<T: PluginCustomData>(
327 ipc_bytes: Slice<'_, u8>,
328 metadata: Slice<'_, MetadataEntry<'_>>,
329) -> PluginResult<OwnedBytes> {
330 guard(|| {
331 // SAFETY: caller upholds liveness of `ipc_bytes` and `metadata`.
332 let bytes = unsafe { ipc_bytes.as_slice() };
333 // SAFETY: see above.
334 let entries = unsafe { metadata.as_slice() };
335 let md: Vec<(String, String)> = entries
336 .iter()
337 .map(|e| {
338 // SAFETY: caller upholds storage liveness for key.
339 let k = unsafe { e.key.as_str() }.to_string();
340 // SAFETY: caller upholds storage liveness for value.
341 let v = unsafe { e.value.as_str() }.to_string();
342 (k, v)
343 })
344 .collect();
345 let values: Vec<T> = T::decode_batch(bytes, &md)
346 .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))?;
347
348 // Pack the decoded values into a contiguous run of opaque handles.
349 // The host owns the buffer and must hand every handle back to
350 // `drop_handle` before freeing the array via `drop_fn`.
351 let handles: Vec<*mut CustomDataHandle> = values
352 .into_iter()
353 .map(|v| Box::into_raw(Box::new(v)).cast::<CustomDataHandle>())
354 .collect();
355 let mut handles = std::mem::ManuallyDrop::new(handles);
356 let ptr = handles.as_mut_ptr().cast::<u8>();
357 let elem_size = std::mem::size_of::<*mut CustomDataHandle>();
358 let len = handles.len() * elem_size;
359 let cap = handles.capacity() * elem_size;
360
361 // `drop_fn` reconstructs the Vec with the original element layout so
362 // the allocator deallocates with matching alignment; using
363 // `drop_owned_bytes` here would call `dealloc` with the wrong layout
364 // (Vec<u8> has 1-byte alignment, Vec<*mut _> has pointer alignment).
365 Ok(OwnedBytes {
366 ptr,
367 len,
368 cap,
369 drop_fn: Some(drop_handle_array),
370 })
371 })
372}
373
374unsafe extern "C" fn drop_handle_array(ptr: *mut u8, len: usize, cap: usize) {
375 if ptr.is_null() {
376 return;
377 }
378 let elem_size = std::mem::size_of::<*mut CustomDataHandle>();
379 if elem_size == 0 {
380 return;
381 }
382 let len_count = len / elem_size;
383 let cap_count = cap / elem_size;
384 // SAFETY: ptr/len/cap originate from a `Vec<*mut CustomDataHandle>`
385 // leaked via `ManuallyDrop` in `decode_batch_thunk`.
386 unsafe {
387 let _ = Vec::from_raw_parts(ptr.cast::<*mut CustomDataHandle>(), len_count, cap_count);
388 }
389}
390
391unsafe extern "C" fn ts_event_thunk<T: PluginCustomData>(handle: *const CustomDataHandle) -> u64 {
392 guard_infallible("ts_event", || {
393 // SAFETY: handle originated from a `Box::into_raw(Box::new(T))` and
394 // is still live (the host has not called drop_handle on it).
395 let value = unsafe { &*handle.cast::<T>() };
396 value.ts_event()
397 })
398}
399
400unsafe extern "C" fn ts_init_thunk<T: PluginCustomData>(handle: *const CustomDataHandle) -> u64 {
401 guard_infallible("ts_init", || {
402 // SAFETY: see ts_event_thunk.
403 let value = unsafe { &*handle.cast::<T>() };
404 value.ts_init()
405 })
406}
407
408unsafe extern "C" fn to_json_thunk<T: PluginCustomData>(
409 handle: *const CustomDataHandle,
410) -> PluginResult<OwnedBytes> {
411 guard(|| {
412 // SAFETY: see ts_event_thunk.
413 let value = unsafe { &*handle.cast::<T>() };
414 value
415 .to_json()
416 .map(OwnedBytes::from_vec)
417 .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
418 })
419}
420
421unsafe extern "C" fn clone_handle_thunk<T: PluginCustomData + Clone>(
422 handle: *const CustomDataHandle,
423) -> *mut CustomDataHandle {
424 guard_infallible("clone_handle", || {
425 // SAFETY: see ts_event_thunk.
426 let value = unsafe { &*handle.cast::<T>() };
427 let cloned = value.clone_value();
428 Box::into_raw(Box::new(cloned)).cast::<CustomDataHandle>()
429 })
430}
431
432unsafe extern "C" fn drop_handle_thunk<T: PluginCustomData>(handle: *mut CustomDataHandle) {
433 if handle.is_null() {
434 return;
435 }
436 guard_infallible("drop_handle", || {
437 // SAFETY: handle was allocated via `Box::into_raw(Box::new(T))`.
438 unsafe {
439 drop(Box::from_raw(handle.cast::<T>()));
440 }
441 });
442}
443
444unsafe extern "C" fn eq_handles_thunk<T: PluginCustomData + PartialEq>(
445 lhs: *const CustomDataHandle,
446 rhs: *const CustomDataHandle,
447) -> bool {
448 guard_infallible("eq_handles", || {
449 // SAFETY: see ts_event_thunk.
450 let lhs = unsafe { &*lhs.cast::<T>() };
451 // SAFETY: see ts_event_thunk.
452 let rhs = unsafe { &*rhs.cast::<T>() };
453 lhs.equals(rhs)
454 })
455}