1#![allow(unsafe_code)]
26#![allow(
27 clippy::multiple_unsafe_ops_per_block,
28 reason = "vtable deref and FFI call form a single boundary callback; \
29 SAFETY comments cover both ops together"
30)]
31
32use std::{any::Any, fmt::Debug, sync::Arc};
33
34use nautilus_core::UnixNanos;
35use nautilus_model::data::{
36 CustomData, CustomDataTrait, HasTsInit,
37 registry::{JsonDeserializer, ensure_json_deserializer_registered},
38};
39
40use crate::{
41 boundary::BorrowedStr,
42 manifest::{
43 ValidatedCustomDataRegistration, ValidatedCustomDataVTable, ValidatedPluginManifest,
44 },
45 surfaces::custom_data::{CustomDataHandle, PluginCustomDataRef},
46};
47
48pub fn register_custom_data_from_manifest(
60 manifest: ValidatedPluginManifest<'_>,
61) -> anyhow::Result<usize> {
62 let mut count = 0usize;
63
64 for entry in manifest.custom_data() {
65 register_custom_data_entry(entry)?;
66 count += 1;
67 }
68 Ok(count)
69}
70
71pub fn register_custom_data_entry(entry: ValidatedCustomDataRegistration) -> anyhow::Result<()> {
80 let type_name = entry.type_name();
81 let vtable = entry.vtable();
82 let from_json = unsafe { validated_slot!(CustomDataVTable, vtable.as_ptr(), from_json) };
84
85 let deserializer: JsonDeserializer = Box::new(move |value| {
86 let payload = serde_json::to_vec(&value)?;
87 let payload_str = std::str::from_utf8(&payload)?;
88 let handle_result = unsafe { from_json(BorrowedStr::from_str(payload_str)) };
90 let handle = handle_result.into_result().map_err(|e| {
91 anyhow::anyhow!(
92 "plug-in '{type_name}' from_json returned error: {}",
93 e.message_string()
94 )
95 })?;
96
97 if handle.is_null() {
98 anyhow::bail!("plug-in '{type_name}' from_json returned a null handle");
99 }
100
101 Ok(Arc::new(PluginCustomDataValue {
102 vtable,
103 handle,
104 type_name,
105 }) as Arc<dyn CustomDataTrait>)
106 });
107
108 ensure_json_deserializer_registered(type_name, deserializer)
109}
110
111pub struct PluginCustomDataValue {
118 vtable: ValidatedCustomDataVTable,
119 handle: *mut CustomDataHandle,
120 type_name: &'static str,
121}
122
123impl PluginCustomDataValue {
124 #[must_use]
126 pub fn boundary_ref(&self) -> PluginCustomDataRef {
127 unsafe {
130 PluginCustomDataRef::from_raw_parts(
131 BorrowedStr::from_str(self.type_name),
132 self.vtable.as_ptr(),
133 self.handle.cast_const(),
134 )
135 }
136 }
137}
138
139#[must_use]
142pub fn try_custom_data_boundary_ref(data: &CustomData) -> Option<PluginCustomDataRef> {
143 data.data
144 .as_any()
145 .downcast_ref::<PluginCustomDataValue>()
146 .map(PluginCustomDataValue::boundary_ref)
147}
148
149#[must_use]
152pub fn try_historical_custom_data_boundary_ref(data: &dyn Any) -> Option<PluginCustomDataRef> {
153 data.downcast_ref::<CustomData>()
154 .and_then(try_custom_data_boundary_ref)
155}
156
157pub fn custom_data_boundary_ref(data: &CustomData) -> anyhow::Result<PluginCustomDataRef> {
164 try_custom_data_boundary_ref(data).ok_or_else(|| {
165 anyhow::anyhow!(
166 "custom data type '{}' is not backed by a plug-in custom-data handle",
167 data.data.type_name()
168 )
169 })
170}
171
172unsafe impl Send for PluginCustomDataValue {}
176unsafe impl Sync for PluginCustomDataValue {}
178
179impl Debug for PluginCustomDataValue {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 f.debug_struct(stringify!(PluginCustomDataValue))
182 .field("type_name", &self.type_name)
183 .finish()
184 }
185}
186
187impl Drop for PluginCustomDataValue {
188 fn drop(&mut self) {
189 if !self.handle.is_null() {
190 unsafe {
192 validated_slot!(CustomDataVTable, self.vtable.as_ptr(), drop_handle)(self.handle);
193 };
194 self.handle = std::ptr::null_mut();
195 }
196 }
197}
198
199impl HasTsInit for PluginCustomDataValue {
200 fn ts_init(&self) -> UnixNanos {
201 let raw = unsafe {
203 validated_slot!(CustomDataVTable, self.vtable.as_ptr(), ts_init)(self.handle)
204 };
205 UnixNanos::from(raw)
206 }
207}
208
209impl CustomDataTrait for PluginCustomDataValue {
210 fn type_name(&self) -> &'static str {
211 self.type_name
212 }
213
214 fn as_any(&self) -> &dyn Any {
215 self
216 }
217
218 fn ts_event(&self) -> UnixNanos {
219 let raw = unsafe {
221 validated_slot!(CustomDataVTable, self.vtable.as_ptr(), ts_event)(self.handle)
222 };
223 UnixNanos::from(raw)
224 }
225
226 fn to_json(&self) -> anyhow::Result<String> {
227 let result = unsafe {
229 validated_slot!(CustomDataVTable, self.vtable.as_ptr(), to_json)(self.handle)
230 };
231 let bytes = result.into_result().map_err(|e| {
232 anyhow::anyhow!(
233 "plug-in '{}' to_json returned error: {}",
234 self.type_name,
235 e.message_string()
236 )
237 })?;
238 let view = unsafe { bytes.as_bytes() };
240 let s = std::str::from_utf8(view)?.to_owned();
241 Ok(s)
242 }
243
244 fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
245 let cloned = unsafe {
247 validated_slot!(CustomDataVTable, self.vtable.as_ptr(), clone_handle)(self.handle)
248 };
249 assert!(
254 !cloned.is_null(),
255 "plug-in '{}' clone_handle returned a null handle",
256 self.type_name
257 );
258 Arc::new(Self {
259 vtable: self.vtable,
260 handle: cloned,
261 type_name: self.type_name,
262 })
263 }
264
265 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
266 let Some(rhs) = other.as_any().downcast_ref::<Self>() else {
267 return false;
268 };
269
270 if self.vtable != rhs.vtable {
271 return false;
272 }
273 unsafe {
275 validated_slot!(CustomDataVTable, self.vtable.as_ptr(), eq_handles)(
276 self.handle,
277 rhs.handle,
278 )
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use nautilus_model::data::Data;
286 use rstest::rstest;
287
288 use super::*;
289 use crate::{
290 NAUTILUS_PLUGIN_ABI_VERSION,
291 boundary::{BorrowedStr, Slice},
292 manifest::{CustomDataRegistration, PluginBuildId, PluginManifest},
293 surfaces::custom_data::{CustomDataVTable, PluginCustomData, custom_data_vtable},
294 };
295
296 #[derive(Clone, PartialEq)]
297 struct BridgeBoundaryTick {
298 value: u64,
299 }
300
301 impl PluginCustomData for BridgeBoundaryTick {
302 const TYPE_NAME: &'static str = "BridgeBoundaryTick";
303
304 fn ts_event(&self) -> u64 {
305 0
306 }
307
308 fn ts_init(&self) -> u64 {
309 0
310 }
311
312 fn to_json(&self) -> anyhow::Result<Vec<u8>> {
313 Ok(serde_json::json!({ "value": self.value })
314 .to_string()
315 .into_bytes())
316 }
317
318 fn from_json(payload: &[u8]) -> anyhow::Result<Self> {
319 let value: serde_json::Value = serde_json::from_slice(payload)?;
320 Ok(Self {
321 value: value
322 .get("value")
323 .and_then(serde_json::Value::as_u64)
324 .unwrap_or_default(),
325 })
326 }
327
328 fn schema_ipc() -> anyhow::Result<Vec<u8>> {
329 Ok(Vec::new())
330 }
331
332 fn encode_batch(_items: &[&Self]) -> anyhow::Result<Vec<u8>> {
333 Ok(Vec::new())
334 }
335
336 fn decode_batch(
337 _ipc_bytes: &[u8],
338 _metadata: &[(String, String)],
339 ) -> anyhow::Result<Vec<Self>> {
340 Ok(Vec::new())
341 }
342 }
343
344 #[rstest]
345 fn register_custom_data_from_manifest_rejects_null_vtable() {
346 static NULL_VTABLE_CUSTOM_DATA: [CustomDataRegistration; 1] = [CustomDataRegistration {
347 type_name: BorrowedStr::from_str("NullVTableTestType"),
348 vtable: std::ptr::null(),
349 }];
350 let manifest = PluginManifest {
351 abi_version: NAUTILUS_PLUGIN_ABI_VERSION,
352 plugin_name: BorrowedStr::from_str("test-plugin"),
353 plugin_vendor: BorrowedStr::from_str("nautech"),
354 plugin_version: BorrowedStr::from_str("0.0.0"),
355 build_id: PluginBuildId::current(),
356 custom_data: Slice::from_slice(&NULL_VTABLE_CUSTOM_DATA),
357 actors: Slice::empty(),
358 strategies: Slice::empty(),
359 controllers: Slice::empty(),
360 };
361
362 let r = ValidatedPluginManifest::new(&manifest);
363 let err = r.unwrap_err();
364 assert!(
365 err.to_string()
366 .contains("custom_data[0].vtable must not be null"),
367 "expected null vtable error, was: {err}",
368 );
369 }
370
371 #[rstest]
372 fn custom_data_boundary_ref_accepts_plugin_custom_data() {
373 let custom_data = Box::leak(Box::new([CustomDataRegistration {
374 type_name: BorrowedStr::from_str(BridgeBoundaryTick::TYPE_NAME),
375 vtable: custom_data_vtable::<BridgeBoundaryTick>(),
376 }]));
377 let manifest = PluginManifest {
378 abi_version: NAUTILUS_PLUGIN_ABI_VERSION,
379 plugin_name: BorrowedStr::from_str("test-plugin"),
380 plugin_vendor: BorrowedStr::from_str("nautech"),
381 plugin_version: BorrowedStr::from_str("0.0.0"),
382 build_id: PluginBuildId::current(),
383 custom_data: Slice::from_slice(custom_data),
384 actors: Slice::empty(),
385 strategies: Slice::empty(),
386 controllers: Slice::empty(),
387 };
388 let manifest =
389 ValidatedPluginManifest::new(&manifest).expect("test manifest passes validation");
390 register_custom_data_from_manifest(manifest).expect("custom data registration succeeds");
391 let envelope = serde_json::json!({
392 "type": "Custom",
393 "data_type": {
394 "type_name": BridgeBoundaryTick::TYPE_NAME,
395 },
396 "payload": {
397 "value": 42,
398 },
399 });
400 let data = nautilus_model::data::registry::deserialize_custom_from_json(
401 BridgeBoundaryTick::TYPE_NAME,
402 &envelope,
403 )
404 .expect("deserializer succeeds")
405 .expect("custom data type is registered");
406 let Data::Custom(custom) = data else {
407 panic!("expected Custom variant");
408 };
409 let data_ref =
410 custom_data_boundary_ref(&custom).expect("plug-in custom data has boundary ref");
411 let value = data_ref
412 .downcast_ref::<BridgeBoundaryTick>()
413 .expect("boundary ref downcasts to registered plug-in type");
414
415 assert_eq!(value.value, 42);
416 }
417
418 #[rstest]
419 fn custom_data_boundary_ref_rejects_non_plugin_custom_data() {
420 let data = nautilus_model::data::stubs::stub_custom_data(1, 42, None, None);
421 let err = match custom_data_boundary_ref(&data) {
422 Ok(_) => panic!("expected non-plugin custom data to fail"),
423 Err(e) => e,
424 };
425
426 assert!(
427 err.to_string()
428 .contains("not backed by a plug-in custom-data handle"),
429 "expected non-plugin custom-data error, was: {err}",
430 );
431 }
432
433 #[derive(Clone, PartialEq)]
434 struct NonUtf8Tick;
435
436 impl PluginCustomData for NonUtf8Tick {
437 const TYPE_NAME: &'static str = "NonUtf8Tick";
438
439 fn ts_event(&self) -> u64 {
440 0
441 }
442
443 fn ts_init(&self) -> u64 {
444 0
445 }
446
447 fn to_json(&self) -> anyhow::Result<Vec<u8>> {
448 Ok(vec![0xff, 0xfe])
449 }
450
451 fn from_json(_payload: &[u8]) -> anyhow::Result<Self> {
452 Ok(Self)
453 }
454
455 fn schema_ipc() -> anyhow::Result<Vec<u8>> {
456 Ok(Vec::new())
457 }
458
459 fn encode_batch(_items: &[&Self]) -> anyhow::Result<Vec<u8>> {
460 Ok(Vec::new())
461 }
462
463 fn decode_batch(
464 _ipc_bytes: &[u8],
465 _metadata: &[(String, String)],
466 ) -> anyhow::Result<Vec<Self>> {
467 Ok(Vec::new())
468 }
469 }
470
471 #[rstest]
472 fn to_json_surfaces_non_utf8_payload_as_error() {
473 let vtable = custom_data_vtable::<NonUtf8Tick>();
474 let handle = Box::into_raw(Box::new(NonUtf8Tick)).cast::<CustomDataHandle>();
475 let value = PluginCustomDataValue {
476 vtable: unsafe { ValidatedCustomDataVTable::from_raw_unchecked(vtable) },
478 handle,
479 type_name: NonUtf8Tick::TYPE_NAME,
480 };
481
482 let err = value
483 .to_json()
484 .expect_err("non-utf8 to_json payload should surface as an error");
485
486 assert!(
487 err.to_string().contains("utf-8"),
488 "expected utf-8 decode error, was: {err}",
489 );
490 }
492
493 #[rstest]
494 #[should_panic(expected = "clone_handle returned a null handle")]
495 fn clone_arc_panics_when_plugin_clone_returns_null() {
496 unsafe extern "C" fn null_clone(_handle: *const CustomDataHandle) -> *mut CustomDataHandle {
497 std::ptr::null_mut()
498 }
499
500 let valid = custom_data_vtable::<BridgeBoundaryTick>();
501 let valid = unsafe { &*valid };
503 let vtable = Box::leak(Box::new(CustomDataVTable {
504 type_name: valid.type_name,
505 schema_ipc: valid.schema_ipc,
506 from_json: valid.from_json,
507 encode_batch: valid.encode_batch,
508 decode_batch: valid.decode_batch,
509 ts_event: valid.ts_event,
510 ts_init: valid.ts_init,
511 to_json: valid.to_json,
512 clone_handle: Some(null_clone),
513 drop_handle: valid.drop_handle,
514 eq_handles: valid.eq_handles,
515 }));
516 let handle =
517 Box::into_raw(Box::new(BridgeBoundaryTick { value: 5 })).cast::<CustomDataHandle>();
518 let value = PluginCustomDataValue {
519 vtable: unsafe {
522 ValidatedCustomDataVTable::from_raw_unchecked(std::ptr::from_ref(&*vtable))
523 },
524 handle,
525 type_name: BridgeBoundaryTick::TYPE_NAME,
526 };
527
528 let _ = value.clone_arc();
531 }
532}