Skip to main content

nautilus_plugin/bridge/
custom_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Host-side custom-data manifest walk.
17//!
18//! Walks each plug-in manifest's `custom_data` slice and registers a JSON
19//! deserializer with [`nautilus_model::data::registry`] so the engine can
20//! decode wire-format custom data emitted by the plug-in. The registered
21//! deserializer wraps the plug-in's opaque handle in a host-side struct
22//! implementing [`CustomDataTrait`] and routes all trait calls back through
23//! the plug-in's vtable.
24
25#![allow(unsafe_code)]
26#![allow(
27    clippy::multiple_unsafe_ops_per_block,
28    reason = "vtable deref and FFI call form a single boundary callback; \
29              SAFETY comments cover both ops together"
30)]
31
32use std::{any::Any, fmt::Debug, sync::Arc};
33
34use nautilus_core::UnixNanos;
35use nautilus_model::data::{
36    CustomData, CustomDataTrait, HasTsInit,
37    registry::{JsonDeserializer, ensure_json_deserializer_registered},
38};
39
40use crate::{
41    boundary::BorrowedStr,
42    manifest::{
43        ValidatedCustomDataRegistration, ValidatedCustomDataVTable, ValidatedPluginManifest,
44    },
45    surfaces::custom_data::{CustomDataHandle, PluginCustomDataRef},
46};
47
48/// Walks a [`ValidatedPluginManifest`] and registers a JSON deserializer for every
49/// custom-data type the plug-in publishes.
50///
51/// Idempotent: re-registering a type the host has already seen is a no-op,
52/// matching the behaviour of [`ensure_json_deserializer_registered`].
53///
54/// # Errors
55///
56/// Returns an error if any registration call into [`nautilus_model::data::registry`]
57/// fails.
58///
59pub fn register_custom_data_from_manifest(
60    manifest: ValidatedPluginManifest<'_>,
61) -> anyhow::Result<usize> {
62    let mut count = 0usize;
63
64    for entry in manifest.custom_data() {
65        register_custom_data_entry(entry)?;
66        count += 1;
67    }
68    Ok(count)
69}
70
71/// Registers a single custom-data type with the model data registry.
72///
73/// # Errors
74///
75/// Returns an error if [`ensure_json_deserializer_registered`] fails.
76///
77/// The validated registration guarantees a non-null vtable with every
78/// required slot present.
79pub fn register_custom_data_entry(entry: ValidatedCustomDataRegistration) -> anyhow::Result<()> {
80    let type_name = entry.type_name();
81    let vtable = entry.vtable();
82    // SAFETY: entry comes from a validated manifest registration.
83    let from_json = unsafe { validated_slot!(CustomDataVTable, vtable.as_ptr(), from_json) };
84
85    let deserializer: JsonDeserializer = Box::new(move |value| {
86        let payload = serde_json::to_vec(&value)?;
87        let payload_str = std::str::from_utf8(&payload)?;
88        // SAFETY: vtable is non-null and live; payload_str outlives the call.
89        let handle_result = unsafe { from_json(BorrowedStr::from_str(payload_str)) };
90        let handle = handle_result.into_result().map_err(|e| {
91            anyhow::anyhow!(
92                "plug-in '{type_name}' from_json returned error: {}",
93                e.message_string()
94            )
95        })?;
96
97        if handle.is_null() {
98            anyhow::bail!("plug-in '{type_name}' from_json returned a null handle");
99        }
100
101        Ok(Arc::new(PluginCustomDataValue {
102            vtable,
103            handle,
104            type_name,
105        }) as Arc<dyn CustomDataTrait>)
106    });
107
108    ensure_json_deserializer_registered(type_name, deserializer)
109}
110
111/// Host-side trait-object adapter for a plug-in custom-data value.
112///
113/// Holds an opaque handle plus a pointer to the plug-in's vtable; every
114/// trait call is routed through the vtable so the host never needs to know
115/// the plug-in's concrete type. Dropping the wrapper invokes the plug-in's
116/// `drop_handle` thunk so the cdylib's allocator owns the value.
117pub struct PluginCustomDataValue {
118    vtable: ValidatedCustomDataVTable,
119    handle: *mut CustomDataHandle,
120    type_name: &'static str,
121}
122
123impl PluginCustomDataValue {
124    /// Returns the boundary reference used for plug-in `on_data` callbacks.
125    #[must_use]
126    pub fn boundary_ref(&self) -> PluginCustomDataRef {
127        // SAFETY: this wrapper owns a live handle allocated by `vtable`, and
128        // type_name is process-lifetime static registration data.
129        unsafe {
130            PluginCustomDataRef::from_raw_parts(
131                BorrowedStr::from_str(self.type_name),
132                self.vtable.as_ptr(),
133                self.handle.cast_const(),
134            )
135        }
136    }
137}
138
139/// Returns the plug-in boundary reference for a host custom-data value when it
140/// came from a plug-in custom-data registration.
141#[must_use]
142pub fn try_custom_data_boundary_ref(data: &CustomData) -> Option<PluginCustomDataRef> {
143    data.data
144        .as_any()
145        .downcast_ref::<PluginCustomDataValue>()
146        .map(PluginCustomDataValue::boundary_ref)
147}
148
149/// Returns the plug-in boundary reference for historical custom-data payloads
150/// that carry a plug-in custom-data value.
151#[must_use]
152pub fn try_historical_custom_data_boundary_ref(data: &dyn Any) -> Option<PluginCustomDataRef> {
153    data.downcast_ref::<CustomData>()
154        .and_then(try_custom_data_boundary_ref)
155}
156
157/// Returns the plug-in boundary reference for a host custom-data value.
158///
159/// # Errors
160///
161/// Returns an error when the value was not decoded from a `PluginCustomData`
162/// registration and therefore has no plug-in vtable or handle.
163pub fn custom_data_boundary_ref(data: &CustomData) -> anyhow::Result<PluginCustomDataRef> {
164    try_custom_data_boundary_ref(data).ok_or_else(|| {
165        anyhow::anyhow!(
166            "custom data type '{}' is not backed by a plug-in custom-data handle",
167            data.data.type_name()
168        )
169    })
170}
171
172// SAFETY: the inner handle is owned exclusively; the vtable is process-
173// lifetime static. The plug-in contract requires the value type behind the
174// handle to be `Send + Sync`.
175unsafe impl Send for PluginCustomDataValue {}
176/// SAFETY: see above.
177unsafe impl Sync for PluginCustomDataValue {}
178
179impl Debug for PluginCustomDataValue {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        f.debug_struct(stringify!(PluginCustomDataValue))
182            .field("type_name", &self.type_name)
183            .finish()
184    }
185}
186
187impl Drop for PluginCustomDataValue {
188    fn drop(&mut self) {
189        if !self.handle.is_null() {
190            // SAFETY: vtable + handle are live; drop_handle ignores null.
191            unsafe {
192                validated_slot!(CustomDataVTable, self.vtable.as_ptr(), drop_handle)(self.handle);
193            };
194            self.handle = std::ptr::null_mut();
195        }
196    }
197}
198
199impl HasTsInit for PluginCustomDataValue {
200    fn ts_init(&self) -> UnixNanos {
201        // SAFETY: vtable + handle are live.
202        let raw = unsafe {
203            validated_slot!(CustomDataVTable, self.vtable.as_ptr(), ts_init)(self.handle)
204        };
205        UnixNanos::from(raw)
206    }
207}
208
209impl CustomDataTrait for PluginCustomDataValue {
210    fn type_name(&self) -> &'static str {
211        self.type_name
212    }
213
214    fn as_any(&self) -> &dyn Any {
215        self
216    }
217
218    fn ts_event(&self) -> UnixNanos {
219        // SAFETY: vtable + handle are live.
220        let raw = unsafe {
221            validated_slot!(CustomDataVTable, self.vtable.as_ptr(), ts_event)(self.handle)
222        };
223        UnixNanos::from(raw)
224    }
225
226    fn to_json(&self) -> anyhow::Result<String> {
227        // SAFETY: vtable + handle are live.
228        let result = unsafe {
229            validated_slot!(CustomDataVTable, self.vtable.as_ptr(), to_json)(self.handle)
230        };
231        let bytes = result.into_result().map_err(|e| {
232            anyhow::anyhow!(
233                "plug-in '{}' to_json returned error: {}",
234                self.type_name,
235                e.message_string()
236            )
237        })?;
238        // SAFETY: buffer is live until `bytes` is dropped.
239        let view = unsafe { bytes.as_bytes() };
240        let s = std::str::from_utf8(view)?.to_owned();
241        Ok(s)
242    }
243
244    fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
245        // SAFETY: vtable + handle are live.
246        let cloned = unsafe {
247            validated_slot!(CustomDataVTable, self.vtable.as_ptr(), clone_handle)(self.handle)
248        };
249        // The `from_json` path rejects a null handle; clone has no error
250        // channel, so a null return (a misbehaving plug-in clone_handle) would
251        // otherwise be dereferenced as a live value by `ts_event` / `to_json` /
252        // `eq_handles`. Fail fast here instead.
253        assert!(
254            !cloned.is_null(),
255            "plug-in '{}' clone_handle returned a null handle",
256            self.type_name
257        );
258        Arc::new(Self {
259            vtable: self.vtable,
260            handle: cloned,
261            type_name: self.type_name,
262        })
263    }
264
265    fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
266        let Some(rhs) = other.as_any().downcast_ref::<Self>() else {
267            return false;
268        };
269
270        if self.vtable != rhs.vtable {
271            return false;
272        }
273        // SAFETY: vtable + handles are live for both sides.
274        unsafe {
275            validated_slot!(CustomDataVTable, self.vtable.as_ptr(), eq_handles)(
276                self.handle,
277                rhs.handle,
278            )
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use nautilus_model::data::Data;
286    use rstest::rstest;
287
288    use super::*;
289    use crate::{
290        NAUTILUS_PLUGIN_ABI_VERSION,
291        boundary::{BorrowedStr, Slice},
292        manifest::{CustomDataRegistration, PluginBuildId, PluginManifest},
293        surfaces::custom_data::{CustomDataVTable, PluginCustomData, custom_data_vtable},
294    };
295
296    #[derive(Clone, PartialEq)]
297    struct BridgeBoundaryTick {
298        value: u64,
299    }
300
301    impl PluginCustomData for BridgeBoundaryTick {
302        const TYPE_NAME: &'static str = "BridgeBoundaryTick";
303
304        fn ts_event(&self) -> u64 {
305            0
306        }
307
308        fn ts_init(&self) -> u64 {
309            0
310        }
311
312        fn to_json(&self) -> anyhow::Result<Vec<u8>> {
313            Ok(serde_json::json!({ "value": self.value })
314                .to_string()
315                .into_bytes())
316        }
317
318        fn from_json(payload: &[u8]) -> anyhow::Result<Self> {
319            let value: serde_json::Value = serde_json::from_slice(payload)?;
320            Ok(Self {
321                value: value
322                    .get("value")
323                    .and_then(serde_json::Value::as_u64)
324                    .unwrap_or_default(),
325            })
326        }
327
328        fn schema_ipc() -> anyhow::Result<Vec<u8>> {
329            Ok(Vec::new())
330        }
331
332        fn encode_batch(_items: &[&Self]) -> anyhow::Result<Vec<u8>> {
333            Ok(Vec::new())
334        }
335
336        fn decode_batch(
337            _ipc_bytes: &[u8],
338            _metadata: &[(String, String)],
339        ) -> anyhow::Result<Vec<Self>> {
340            Ok(Vec::new())
341        }
342    }
343
344    #[rstest]
345    fn register_custom_data_from_manifest_rejects_null_vtable() {
346        static NULL_VTABLE_CUSTOM_DATA: [CustomDataRegistration; 1] = [CustomDataRegistration {
347            type_name: BorrowedStr::from_str("NullVTableTestType"),
348            vtable: std::ptr::null(),
349        }];
350        let manifest = PluginManifest {
351            abi_version: NAUTILUS_PLUGIN_ABI_VERSION,
352            plugin_name: BorrowedStr::from_str("test-plugin"),
353            plugin_vendor: BorrowedStr::from_str("nautech"),
354            plugin_version: BorrowedStr::from_str("0.0.0"),
355            build_id: PluginBuildId::current(),
356            custom_data: Slice::from_slice(&NULL_VTABLE_CUSTOM_DATA),
357            actors: Slice::empty(),
358            strategies: Slice::empty(),
359            controllers: Slice::empty(),
360        };
361
362        let r = ValidatedPluginManifest::new(&manifest);
363        let err = r.unwrap_err();
364        assert!(
365            err.to_string()
366                .contains("custom_data[0].vtable must not be null"),
367            "expected null vtable error, was: {err}",
368        );
369    }
370
371    #[rstest]
372    fn custom_data_boundary_ref_accepts_plugin_custom_data() {
373        let custom_data = Box::leak(Box::new([CustomDataRegistration {
374            type_name: BorrowedStr::from_str(BridgeBoundaryTick::TYPE_NAME),
375            vtable: custom_data_vtable::<BridgeBoundaryTick>(),
376        }]));
377        let manifest = PluginManifest {
378            abi_version: NAUTILUS_PLUGIN_ABI_VERSION,
379            plugin_name: BorrowedStr::from_str("test-plugin"),
380            plugin_vendor: BorrowedStr::from_str("nautech"),
381            plugin_version: BorrowedStr::from_str("0.0.0"),
382            build_id: PluginBuildId::current(),
383            custom_data: Slice::from_slice(custom_data),
384            actors: Slice::empty(),
385            strategies: Slice::empty(),
386            controllers: Slice::empty(),
387        };
388        let manifest =
389            ValidatedPluginManifest::new(&manifest).expect("test manifest passes validation");
390        register_custom_data_from_manifest(manifest).expect("custom data registration succeeds");
391        let envelope = serde_json::json!({
392            "type": "Custom",
393            "data_type": {
394                "type_name": BridgeBoundaryTick::TYPE_NAME,
395            },
396            "payload": {
397                "value": 42,
398            },
399        });
400        let data = nautilus_model::data::registry::deserialize_custom_from_json(
401            BridgeBoundaryTick::TYPE_NAME,
402            &envelope,
403        )
404        .expect("deserializer succeeds")
405        .expect("custom data type is registered");
406        let Data::Custom(custom) = data else {
407            panic!("expected Custom variant");
408        };
409        let data_ref =
410            custom_data_boundary_ref(&custom).expect("plug-in custom data has boundary ref");
411        let value = data_ref
412            .downcast_ref::<BridgeBoundaryTick>()
413            .expect("boundary ref downcasts to registered plug-in type");
414
415        assert_eq!(value.value, 42);
416    }
417
418    #[rstest]
419    fn custom_data_boundary_ref_rejects_non_plugin_custom_data() {
420        let data = nautilus_model::data::stubs::stub_custom_data(1, 42, None, None);
421        let err = match custom_data_boundary_ref(&data) {
422            Ok(_) => panic!("expected non-plugin custom data to fail"),
423            Err(e) => e,
424        };
425
426        assert!(
427            err.to_string()
428                .contains("not backed by a plug-in custom-data handle"),
429            "expected non-plugin custom-data error, was: {err}",
430        );
431    }
432
433    #[derive(Clone, PartialEq)]
434    struct NonUtf8Tick;
435
436    impl PluginCustomData for NonUtf8Tick {
437        const TYPE_NAME: &'static str = "NonUtf8Tick";
438
439        fn ts_event(&self) -> u64 {
440            0
441        }
442
443        fn ts_init(&self) -> u64 {
444            0
445        }
446
447        fn to_json(&self) -> anyhow::Result<Vec<u8>> {
448            Ok(vec![0xff, 0xfe])
449        }
450
451        fn from_json(_payload: &[u8]) -> anyhow::Result<Self> {
452            Ok(Self)
453        }
454
455        fn schema_ipc() -> anyhow::Result<Vec<u8>> {
456            Ok(Vec::new())
457        }
458
459        fn encode_batch(_items: &[&Self]) -> anyhow::Result<Vec<u8>> {
460            Ok(Vec::new())
461        }
462
463        fn decode_batch(
464            _ipc_bytes: &[u8],
465            _metadata: &[(String, String)],
466        ) -> anyhow::Result<Vec<Self>> {
467            Ok(Vec::new())
468        }
469    }
470
471    #[rstest]
472    fn to_json_surfaces_non_utf8_payload_as_error() {
473        let vtable = custom_data_vtable::<NonUtf8Tick>();
474        let handle = Box::into_raw(Box::new(NonUtf8Tick)).cast::<CustomDataHandle>();
475        let value = PluginCustomDataValue {
476            // SAFETY: generated vtable fills every slot; handle came from Box::into_raw.
477            vtable: unsafe { ValidatedCustomDataVTable::from_raw_unchecked(vtable) },
478            handle,
479            type_name: NonUtf8Tick::TYPE_NAME,
480        };
481
482        let err = value
483            .to_json()
484            .expect_err("non-utf8 to_json payload should surface as an error");
485
486        assert!(
487            err.to_string().contains("utf-8"),
488            "expected utf-8 decode error, was: {err}",
489        );
490        // `value` drops here, freeing the handle via the real drop_handle thunk.
491    }
492
493    #[rstest]
494    #[should_panic(expected = "clone_handle returned a null handle")]
495    fn clone_arc_panics_when_plugin_clone_returns_null() {
496        unsafe extern "C" fn null_clone(_handle: *const CustomDataHandle) -> *mut CustomDataHandle {
497            std::ptr::null_mut()
498        }
499
500        let valid = custom_data_vtable::<BridgeBoundaryTick>();
501        // SAFETY: generated test vtable lives for the process lifetime.
502        let valid = unsafe { &*valid };
503        let vtable = Box::leak(Box::new(CustomDataVTable {
504            type_name: valid.type_name,
505            schema_ipc: valid.schema_ipc,
506            from_json: valid.from_json,
507            encode_batch: valid.encode_batch,
508            decode_batch: valid.decode_batch,
509            ts_event: valid.ts_event,
510            ts_init: valid.ts_init,
511            to_json: valid.to_json,
512            clone_handle: Some(null_clone),
513            drop_handle: valid.drop_handle,
514            eq_handles: valid.eq_handles,
515        }));
516        let handle =
517            Box::into_raw(Box::new(BridgeBoundaryTick { value: 5 })).cast::<CustomDataHandle>();
518        let value = PluginCustomDataValue {
519            // SAFETY: the copied vtable fills every required slot; handle came
520            // from Box::into_raw.
521            vtable: unsafe {
522                ValidatedCustomDataVTable::from_raw_unchecked(std::ptr::from_ref(&*vtable))
523            },
524            handle,
525            type_name: BridgeBoundaryTick::TYPE_NAME,
526        };
527
528        // clone_handle returns null, so clone_arc must panic rather than wrap a
529        // null handle that later thunks would dereference.
530        let _ = value.clone_arc();
531    }
532}