Skip to main content

nautilus_plugin/surfaces/
custom_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Custom-data plug point.
17//!
18//! A plug-in registers one [`CustomDataVTable`] per concrete custom-data
19//! type. The host calls through the vtable to decode JSON envelopes from
20//! the wire and to encode/decode Arrow IPC batches for catalog persistence.
21
22#![allow(unsafe_code)]
23
24use std::marker::PhantomData;
25
26use crate::{
27    boundary::{BorrowedStr, OwnedBytes, PluginError, PluginErrorCode, PluginResult, Slice},
28    panic::{guard, guard_infallible},
29};
30
31/// Opaque handle to a single custom-data value owned by the plug-in.
32///
33/// The host never deref's this pointer; it only passes it back to the vtable
34/// through `clone`, `to_json`, `ts_event`, `ts_init`, `eq`, and `drop`.
35#[repr(C)]
36pub struct CustomDataHandle {
37    _opaque: [u8; 0],
38}
39
40/// Borrowed view of a plug-in custom-data value during `on_data` dispatch.
41///
42/// The value stays owned by the host-side wrapper. Plug-in code can inspect
43/// the type name and downcast to a concrete custom-data type that was declared
44/// in the same cdylib manifest.
45#[repr(C)]
46#[derive(Clone, Copy)]
47pub struct PluginCustomDataRef {
48    type_name: BorrowedStr<'static>,
49    vtable: *const CustomDataVTable,
50    handle: *const CustomDataHandle,
51}
52
53impl PluginCustomDataRef {
54    /// Constructs a borrowed custom-data reference from raw boundary parts.
55    ///
56    /// # Safety
57    ///
58    /// `handle` must be null or point at a live value allocated by `vtable`,
59    /// and `type_name` must name the same concrete custom-data type for the
60    /// duration of the call.
61    #[must_use]
62    pub unsafe fn from_raw_parts(
63        type_name: BorrowedStr<'static>,
64        vtable: *const CustomDataVTable,
65        handle: *const CustomDataHandle,
66    ) -> Self {
67        Self {
68            type_name,
69            vtable,
70            handle,
71        }
72    }
73
74    /// Returns the canonical custom-data type name.
75    #[must_use]
76    pub fn type_name(&self) -> &str {
77        // SAFETY: constructor requires process-lifetime valid UTF-8 storage.
78        unsafe { self.type_name.as_str() }
79    }
80
81    /// Returns whether this value was allocated by the vtable for `T`.
82    #[must_use]
83    pub fn is<T>(&self) -> bool
84    where
85        T: PluginCustomData + PartialEq + Clone,
86    {
87        self.vtable == custom_data_vtable::<T>()
88    }
89
90    /// Returns the value as `T` when it was allocated by `T`'s vtable.
91    #[must_use]
92    pub fn downcast_ref<T>(&self) -> Option<&T>
93    where
94        T: PluginCustomData + PartialEq + Clone,
95    {
96        if self.handle.is_null() || !self.is::<T>() {
97            return None;
98        }
99
100        // SAFETY: matching vtable proves the handle came from `T`'s
101        // generated custom-data thunks and is live for this callback.
102        Some(unsafe { &*self.handle.cast::<T>() })
103    }
104}
105
106/// Function table for a single custom-data type.
107///
108/// One static vtable per concrete type, generated by the `nautilus_plugin!`
109/// macro and pointed to from the corresponding
110/// [`crate::manifest::CustomDataRegistration`] entry.
111///
112/// Slots are nullable at the ABI type level so the host can reject malformed
113/// manifests with null callbacks before registering or invoking the vtable.
114/// Macro-generated vtables fill every required slot.
115#[repr(C)]
116pub struct CustomDataVTable {
117    /// Returns the canonical type name (e.g. `"MyTickType"`). Used for
118    /// routing, persistence path layout, and JSON envelope tagging.
119    pub type_name: Option<unsafe extern "C" fn() -> BorrowedStr<'static>>,
120
121    /// Returns the Arrow schema for this type as an Arrow IPC byte stream.
122    ///
123    /// Plug-ins serialize their `arrow::datatypes::Schema` via
124    /// `arrow::ipc::writer::StreamWriter` so host and plug-in stay agnostic
125    /// of each other's Arrow crate version.
126    pub schema_ipc: Option<unsafe extern "C" fn() -> PluginResult<OwnedBytes>>,
127
128    /// Decodes a single value from its JSON payload (no envelope) into a new
129    /// handle. The handle is owned by the host until passed to `drop`.
130    pub from_json: Option<
131        unsafe extern "C" fn(payload: BorrowedStr<'_>) -> PluginResult<*mut CustomDataHandle>,
132    >,
133
134    /// Encodes a batch of handles into an Arrow IPC byte stream.
135    pub encode_batch: Option<
136        unsafe extern "C" fn(
137            handles: Slice<'_, *const CustomDataHandle>,
138        ) -> PluginResult<OwnedBytes>,
139    >,
140
141    /// Decodes an Arrow IPC byte stream into a freshly allocated array of
142    /// owned handles, packed as `*mut CustomDataHandle` elements inside the
143    /// returned `OwnedBytes`. The host must pass every handle through
144    /// `drop_handle` to release its inner value, then drop the `OwnedBytes`
145    /// (or invoke its embedded `drop_fn` directly) so the array storage is
146    /// freed with the original allocator layout. Do not call
147    /// [`crate::boundary::drop_owned_bytes`] directly: the underlying
148    /// allocation is a `Vec<*mut CustomDataHandle>`, not a `Vec<u8>`, and the
149    /// vtable installs a matching `drop_fn` on the returned `OwnedBytes`.
150    pub decode_batch: Option<
151        unsafe extern "C" fn(
152            ipc_bytes: Slice<'_, u8>,
153            metadata: Slice<'_, MetadataEntry<'_>>,
154        ) -> PluginResult<OwnedBytes>,
155    >,
156
157    /// Returns the event timestamp of the value behind the handle.
158    pub ts_event: Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> u64>,
159
160    /// Returns the init timestamp of the value behind the handle.
161    pub ts_init: Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> u64>,
162
163    /// Serializes a single value to a JSON payload (no envelope).
164    pub to_json:
165        Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> PluginResult<OwnedBytes>>,
166
167    /// Clones the value behind the handle into a new owned handle.
168    pub clone_handle:
169        Option<unsafe extern "C" fn(handle: *const CustomDataHandle) -> *mut CustomDataHandle>,
170
171    /// Drops the value behind the handle, freeing all of its resources.
172    pub drop_handle: Option<unsafe extern "C" fn(handle: *mut CustomDataHandle)>,
173
174    /// Tests two handles of the same type for value equality.
175    pub eq_handles: Option<
176        unsafe extern "C" fn(lhs: *const CustomDataHandle, rhs: *const CustomDataHandle) -> bool,
177    >,
178}
179
180/// A single key/value pair in batch-decode metadata.
181#[repr(C)]
182#[derive(Clone, Copy)]
183pub struct MetadataEntry<'a> {
184    pub key: BorrowedStr<'a>,
185    pub value: BorrowedStr<'a>,
186}
187
188/// Author-facing trait for a custom-data type contributed by a plug-in.
189///
190/// Plug-in authors implement this on their data struct; the
191/// [`nautilus_plugin!`](crate::nautilus_plugin) macro generates the
192/// `extern "C"` thunks that adapt this trait to a [`CustomDataVTable`].
193///
194/// All trait methods run inside [`crate::panic::guard`] in the generated
195/// thunks, so a panic surfaces as a [`PluginError`] with code
196/// [`PluginErrorCode::Panic`] instead of unwinding through the FFI.
197pub trait PluginCustomData: 'static + Send + Sync + Sized {
198    /// Canonical type name. Must be unique across a Nautilus deployment.
199    const TYPE_NAME: &'static str;
200
201    /// Returns the event timestamp as UNIX nanoseconds.
202    fn ts_event(&self) -> u64;
203
204    /// Returns the init timestamp as UNIX nanoseconds.
205    fn ts_init(&self) -> u64;
206
207    /// Serializes this value to a JSON payload (no envelope, payload only).
208    fn to_json(&self) -> anyhow::Result<Vec<u8>>;
209
210    /// Deserializes a value from a JSON payload (no envelope).
211    fn from_json(payload: &[u8]) -> anyhow::Result<Self>;
212
213    /// Returns the Arrow schema for this type as an Arrow IPC byte stream.
214    fn schema_ipc() -> anyhow::Result<Vec<u8>>;
215
216    /// Encodes a batch of values into an Arrow IPC byte stream.
217    fn encode_batch(items: &[&Self]) -> anyhow::Result<Vec<u8>>;
218
219    /// Decodes an Arrow IPC byte stream into a vector of values.
220    fn decode_batch(ipc_bytes: &[u8], metadata: &[(String, String)]) -> anyhow::Result<Vec<Self>>;
221
222    /// Tests two values of this type for equality. Default implementation
223    /// requires `PartialEq`; override if a custom comparison is needed.
224    fn equals(&self, other: &Self) -> bool
225    where
226        Self: PartialEq,
227    {
228        self == other
229    }
230
231    /// Clones the value into a new heap allocation.
232    #[must_use]
233    fn clone_value(&self) -> Self
234    where
235        Self: Clone,
236    {
237        Clone::clone(self)
238    }
239}
240
241/// Returns a `&'static CustomDataVTable` for the given `PluginCustomData` type.
242///
243/// One static vtable per concrete `T`. The vtable lives in const-promoted
244/// static storage attached to a `PhantomData<T>`-tagged generic struct, so
245/// each monomorphisation gets its own table.
246///
247/// Naive `static FOO: OnceLock<CustomDataVTable> = OnceLock::new()` inside a
248/// generic function is unsound here: when `FOO`'s type does not mention `T`,
249/// Rust shares the static across every monomorphisation and the first
250/// initializer wins, so every later type collapses onto the first type's
251/// thunks.
252#[must_use]
253pub fn custom_data_vtable<T>() -> *const CustomDataVTable
254where
255    T: PluginCustomData + PartialEq + Clone,
256{
257    &VTableTag::<T>::VTABLE
258}
259
260struct VTableTag<T>(PhantomData<T>);
261
262impl<T> VTableTag<T>
263where
264    T: PluginCustomData + PartialEq + Clone,
265{
266    const VTABLE: CustomDataVTable = CustomDataVTable {
267        type_name: Some(type_name_thunk::<T>),
268        schema_ipc: Some(schema_ipc_thunk::<T>),
269        from_json: Some(from_json_thunk::<T>),
270        encode_batch: Some(encode_batch_thunk::<T>),
271        decode_batch: Some(decode_batch_thunk::<T>),
272        ts_event: Some(ts_event_thunk::<T>),
273        ts_init: Some(ts_init_thunk::<T>),
274        to_json: Some(to_json_thunk::<T>),
275        clone_handle: Some(clone_handle_thunk::<T>),
276        drop_handle: Some(drop_handle_thunk::<T>),
277        eq_handles: Some(eq_handles_thunk::<T>),
278    };
279}
280
281unsafe extern "C" fn type_name_thunk<T: PluginCustomData>() -> BorrowedStr<'static> {
282    BorrowedStr::from_str(T::TYPE_NAME)
283}
284
285unsafe extern "C" fn schema_ipc_thunk<T: PluginCustomData>() -> PluginResult<OwnedBytes> {
286    guard(|| {
287        T::schema_ipc()
288            .map(OwnedBytes::from_vec)
289            .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
290    })
291}
292
293unsafe extern "C" fn from_json_thunk<T: PluginCustomData>(
294    payload: BorrowedStr<'_>,
295) -> PluginResult<*mut CustomDataHandle> {
296    guard(|| {
297        // SAFETY: caller upholds liveness of `payload`'s storage across the call.
298        let bytes = unsafe { payload.as_str() }.as_bytes();
299        T::from_json(bytes)
300            .map(|v| Box::into_raw(Box::new(v)).cast::<CustomDataHandle>())
301            .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
302    })
303}
304
305unsafe extern "C" fn encode_batch_thunk<T: PluginCustomData>(
306    handles: Slice<'_, *const CustomDataHandle>,
307) -> PluginResult<OwnedBytes> {
308    guard(|| {
309        // SAFETY: caller upholds liveness of the handles slice and each handle.
310        let raw = unsafe { handles.as_slice() };
311        let items: Vec<&T> = raw
312            .iter()
313            .map(|p| {
314                // SAFETY: each handle originated from a `Box::into_raw(Box::new(T))`
315                // in `from_json_thunk` or `clone_handle_thunk`. The host promises
316                // they are still live.
317                unsafe { &*p.cast::<T>() }
318            })
319            .collect();
320        T::encode_batch(&items)
321            .map(OwnedBytes::from_vec)
322            .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
323    })
324}
325
326unsafe extern "C" fn decode_batch_thunk<T: PluginCustomData>(
327    ipc_bytes: Slice<'_, u8>,
328    metadata: Slice<'_, MetadataEntry<'_>>,
329) -> PluginResult<OwnedBytes> {
330    guard(|| {
331        // SAFETY: caller upholds liveness of `ipc_bytes` and `metadata`.
332        let bytes = unsafe { ipc_bytes.as_slice() };
333        // SAFETY: see above.
334        let entries = unsafe { metadata.as_slice() };
335        let md: Vec<(String, String)> = entries
336            .iter()
337            .map(|e| {
338                // SAFETY: caller upholds storage liveness for key.
339                let k = unsafe { e.key.as_str() }.to_string();
340                // SAFETY: caller upholds storage liveness for value.
341                let v = unsafe { e.value.as_str() }.to_string();
342                (k, v)
343            })
344            .collect();
345        let values: Vec<T> = T::decode_batch(bytes, &md)
346            .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))?;
347
348        // Pack the decoded values into a contiguous run of opaque handles.
349        // The host owns the buffer and must hand every handle back to
350        // `drop_handle` before freeing the array via `drop_fn`.
351        let handles: Vec<*mut CustomDataHandle> = values
352            .into_iter()
353            .map(|v| Box::into_raw(Box::new(v)).cast::<CustomDataHandle>())
354            .collect();
355        let mut handles = std::mem::ManuallyDrop::new(handles);
356        let ptr = handles.as_mut_ptr().cast::<u8>();
357        let elem_size = std::mem::size_of::<*mut CustomDataHandle>();
358        let len = handles.len() * elem_size;
359        let cap = handles.capacity() * elem_size;
360
361        // `drop_fn` reconstructs the Vec with the original element layout so
362        // the allocator deallocates with matching alignment; using
363        // `drop_owned_bytes` here would call `dealloc` with the wrong layout
364        // (Vec<u8> has 1-byte alignment, Vec<*mut _> has pointer alignment).
365        Ok(OwnedBytes {
366            ptr,
367            len,
368            cap,
369            drop_fn: Some(drop_handle_array),
370        })
371    })
372}
373
374unsafe extern "C" fn drop_handle_array(ptr: *mut u8, len: usize, cap: usize) {
375    if ptr.is_null() {
376        return;
377    }
378    let elem_size = std::mem::size_of::<*mut CustomDataHandle>();
379    if elem_size == 0 {
380        return;
381    }
382    let len_count = len / elem_size;
383    let cap_count = cap / elem_size;
384    // SAFETY: ptr/len/cap originate from a `Vec<*mut CustomDataHandle>`
385    // leaked via `ManuallyDrop` in `decode_batch_thunk`.
386    unsafe {
387        let _ = Vec::from_raw_parts(ptr.cast::<*mut CustomDataHandle>(), len_count, cap_count);
388    }
389}
390
391unsafe extern "C" fn ts_event_thunk<T: PluginCustomData>(handle: *const CustomDataHandle) -> u64 {
392    guard_infallible("ts_event", || {
393        // SAFETY: handle originated from a `Box::into_raw(Box::new(T))` and
394        // is still live (the host has not called drop_handle on it).
395        let value = unsafe { &*handle.cast::<T>() };
396        value.ts_event()
397    })
398}
399
400unsafe extern "C" fn ts_init_thunk<T: PluginCustomData>(handle: *const CustomDataHandle) -> u64 {
401    guard_infallible("ts_init", || {
402        // SAFETY: see ts_event_thunk.
403        let value = unsafe { &*handle.cast::<T>() };
404        value.ts_init()
405    })
406}
407
408unsafe extern "C" fn to_json_thunk<T: PluginCustomData>(
409    handle: *const CustomDataHandle,
410) -> PluginResult<OwnedBytes> {
411    guard(|| {
412        // SAFETY: see ts_event_thunk.
413        let value = unsafe { &*handle.cast::<T>() };
414        value
415            .to_json()
416            .map(OwnedBytes::from_vec)
417            .map_err(|e| PluginError::new(PluginErrorCode::SerializationFailed, e.to_string()))
418    })
419}
420
421unsafe extern "C" fn clone_handle_thunk<T: PluginCustomData + Clone>(
422    handle: *const CustomDataHandle,
423) -> *mut CustomDataHandle {
424    guard_infallible("clone_handle", || {
425        // SAFETY: see ts_event_thunk.
426        let value = unsafe { &*handle.cast::<T>() };
427        let cloned = value.clone_value();
428        Box::into_raw(Box::new(cloned)).cast::<CustomDataHandle>()
429    })
430}
431
432unsafe extern "C" fn drop_handle_thunk<T: PluginCustomData>(handle: *mut CustomDataHandle) {
433    if handle.is_null() {
434        return;
435    }
436    guard_infallible("drop_handle", || {
437        // SAFETY: handle was allocated via `Box::into_raw(Box::new(T))`.
438        unsafe {
439            drop(Box::from_raw(handle.cast::<T>()));
440        }
441    });
442}
443
444unsafe extern "C" fn eq_handles_thunk<T: PluginCustomData + PartialEq>(
445    lhs: *const CustomDataHandle,
446    rhs: *const CustomDataHandle,
447) -> bool {
448    guard_infallible("eq_handles", || {
449        // SAFETY: see ts_event_thunk.
450        let lhs = unsafe { &*lhs.cast::<T>() };
451        // SAFETY: see ts_event_thunk.
452        let rhs = unsafe { &*rhs.cast::<T>() };
453        lhs.equals(rhs)
454    })
455}