streamkit_plugin_sdk_native/
lib.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! StreamKit Native Plugin SDK
6//!
7//! This SDK provides an ergonomic Rust interface for writing native plugins that use
8//! a stable C ABI. While the interface feels like pure Rust, under the hood it generates
9//! C-compatible exports for maximum binary compatibility.
10//!
11//! # Example
12//!
13//! ```no_run
14//! use streamkit_plugin_sdk_native::prelude::*;
15//!
16//! pub struct MyPlugin {
17//!     // plugin state
18//! }
19//!
20//! impl NativeProcessorNode for MyPlugin {
21//!     fn metadata() -> NodeMetadata {
22//!         NodeMetadata::builder("my_plugin")
23//!             .input("in", &[PacketType::Any])
24//!             .output("out", PacketType::Any)
25//!             .build()
26//!     }
27//!
28//!     fn new(_params: Option<serde_json::Value>, _logger: Logger) -> Result<Self, String> {
29//!         Ok(Self {})
30//!     }
31//!
32//!     fn process(
33//!         &mut self,
34//!         _pin: &str,
35//!         packet: Packet,
36//!         output: &OutputSender,
37//!     ) -> Result<(), String> {
38//!         output.send("out", &packet)?;
39//!         Ok(())
40//!     }
41//! }
42//!
43//! native_plugin_entry!(MyPlugin);
44//! ```
45
46pub mod conversions;
47pub mod logger;
48pub mod types;
49
50use std::ffi::CString;
51use streamkit_core::types::{Packet, PacketType};
52use streamkit_core::{InputPin, OutputPin, PinCardinality, Resource};
53
54use logger::Logger;
55
56pub use streamkit_core;
57pub use types::*;
58
59/// Re-export commonly used types
60pub mod prelude {
61    pub use crate::logger::Logger;
62    pub use crate::types::{CLogCallback, CLogLevel};
63    pub use crate::{
64        native_plugin_entry, plugin_debug, plugin_error, plugin_info, plugin_log, plugin_trace,
65        plugin_warn, NativeProcessorNode, NodeMetadata, OutputSender, ResourceSupport,
66    };
67    pub use streamkit_core::types::{AudioFrame, Packet, PacketType};
68    pub use streamkit_core::{InputPin, OutputPin, PinCardinality, Resource};
69}
70
71/// Metadata about a node type
72pub struct NodeMetadata {
73    pub kind: String,
74    pub description: Option<String>,
75    pub inputs: Vec<InputPin>,
76    pub outputs: Vec<OutputPin>,
77    pub param_schema: serde_json::Value,
78    pub categories: Vec<String>,
79}
80
81impl NodeMetadata {
82    /// Create a builder for node metadata
83    pub fn builder(kind: &str) -> NodeMetadataBuilder {
84        NodeMetadataBuilder {
85            kind: kind.to_string(),
86            description: None,
87            inputs: Vec::new(),
88            outputs: Vec::new(),
89            param_schema: serde_json::json!({}),
90            categories: Vec::new(),
91        }
92    }
93}
94
95/// Builder for NodeMetadata
96pub struct NodeMetadataBuilder {
97    kind: String,
98    description: Option<String>,
99    inputs: Vec<InputPin>,
100    outputs: Vec<OutputPin>,
101    param_schema: serde_json::Value,
102    categories: Vec<String>,
103}
104
105impl NodeMetadataBuilder {
106    /// Set the node description
107    #[must_use]
108    pub fn description(mut self, description: impl Into<String>) -> Self {
109        self.description = Some(description.into());
110        self
111    }
112
113    /// Add an input pin
114    #[must_use]
115    pub fn input(mut self, name: &str, accepts_types: &[PacketType]) -> Self {
116        self.inputs.push(InputPin {
117            name: name.to_string(),
118            accepts_types: accepts_types.to_vec(),
119            cardinality: PinCardinality::One,
120        });
121        self
122    }
123
124    /// Add an output pin
125    #[must_use]
126    pub fn output(mut self, name: &str, produces_type: PacketType) -> Self {
127        self.outputs.push(OutputPin {
128            name: name.to_string(),
129            produces_type,
130            cardinality: PinCardinality::Broadcast,
131        });
132        self
133    }
134
135    /// Set parameter schema
136    #[must_use]
137    pub fn param_schema(mut self, schema: serde_json::Value) -> Self {
138        self.param_schema = schema;
139        self
140    }
141
142    /// Add a category
143    #[must_use]
144    pub fn category(mut self, category: &str) -> Self {
145        self.categories.push(category.to_string());
146        self
147    }
148
149    /// Build the metadata
150    pub fn build(self) -> NodeMetadata {
151        NodeMetadata {
152            kind: self.kind,
153            description: self.description,
154            inputs: self.inputs,
155            outputs: self.outputs,
156            param_schema: self.param_schema,
157            categories: self.categories,
158        }
159    }
160}
161
162/// Output sender for sending packets to output pins
163pub struct OutputSender {
164    output_callback: COutputCallback,
165    output_user_data: *mut std::os::raw::c_void,
166    telemetry_callback: types::CTelemetryCallback,
167    telemetry_user_data: *mut std::os::raw::c_void,
168}
169
170impl OutputSender {
171    /// Create a new output sender from C callback
172    pub fn from_callback(callback: COutputCallback, user_data: *mut std::os::raw::c_void) -> Self {
173        Self {
174            output_callback: callback,
175            output_user_data: user_data,
176            telemetry_callback: None,
177            telemetry_user_data: std::ptr::null_mut(),
178        }
179    }
180
181    /// Create a new output sender from C callbacks.
182    ///
183    /// `telemetry_callback` may be null if the host doesn't provide telemetry support.
184    pub fn from_callbacks(
185        output_callback: COutputCallback,
186        output_user_data: *mut std::os::raw::c_void,
187        telemetry_callback: types::CTelemetryCallback,
188        telemetry_user_data: *mut std::os::raw::c_void,
189    ) -> Self {
190        Self { output_callback, output_user_data, telemetry_callback, telemetry_user_data }
191    }
192
193    /// Send a packet to an output pin
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if:
198    /// - The pin name contains null bytes
199    /// - The C callback returns an error
200    pub fn send(&self, pin: &str, packet: &Packet) -> Result<(), String> {
201        let pin_c = CString::new(pin).map_err(|e| format!("Invalid pin name: {e}"))?;
202
203        let packet_repr = conversions::packet_to_c(packet);
204        let result = (self.output_callback)(
205            pin_c.as_ptr(),
206            &raw const packet_repr.packet,
207            self.output_user_data,
208        );
209
210        if result.success {
211            Ok(())
212        } else {
213            let error_msg = if result.error_message.is_null() {
214                "Unknown error".to_string()
215            } else {
216                unsafe {
217                    conversions::c_str_to_string(result.error_message)
218                        .unwrap_or_else(|_| "Unknown error".to_string())
219                }
220            };
221            Err(error_msg)
222        }
223    }
224
225    /// Emit a telemetry event to the host telemetry bus (best-effort).
226    ///
227    /// `data` is encoded as JSON and forwarded out-of-band; it does not flow through graph pins.
228    ///
229    /// If the host doesn't provide a telemetry callback, this is a no-op.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if:
234    /// - `event_type` contains an interior NUL byte (invalid C string),
235    /// - `data` cannot be serialized to JSON,
236    /// - the host telemetry callback reports an error.
237    pub fn emit_telemetry(
238        &self,
239        event_type: &str,
240        data: &serde_json::Value,
241        timestamp_us: Option<u64>,
242    ) -> Result<(), String> {
243        let Some(cb) = self.telemetry_callback else {
244            return Ok(());
245        };
246
247        let event_type_c =
248            CString::new(event_type).map_err(|e| format!("Invalid event_type: {e}"))?;
249        let data_json = serde_json::to_vec(data)
250            .map_err(|e| format!("Failed to serialize telemetry JSON: {e}"))?;
251
252        let meta = timestamp_us.map(|ts| types::CPacketMetadata {
253            timestamp_us: ts,
254            has_timestamp_us: true,
255            duration_us: 0,
256            has_duration_us: false,
257            sequence: 0,
258            has_sequence: false,
259        });
260        let meta_ptr = meta.as_ref().map_or(std::ptr::null(), std::ptr::from_ref);
261
262        let result = cb(
263            event_type_c.as_ptr(),
264            data_json.as_ptr(),
265            data_json.len(),
266            meta_ptr,
267            self.telemetry_user_data,
268        );
269
270        if result.success {
271            Ok(())
272        } else {
273            let error_msg = if result.error_message.is_null() {
274                "Unknown error".to_string()
275            } else {
276                unsafe {
277                    conversions::c_str_to_string(result.error_message)
278                        .unwrap_or_else(|_| "Unknown error".to_string())
279                }
280            };
281            Err(error_msg)
282        }
283    }
284}
285
286/// Trait that plugin authors implement
287/// This provides an ergonomic Rust interface that gets wrapped with C ABI exports
288pub trait NativeProcessorNode: Sized + Send + 'static {
289    /// Return metadata about this node type
290    fn metadata() -> NodeMetadata;
291
292    /// Create a new instance of the node
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if initialization fails (e.g., invalid parameters)
297    fn new(params: Option<serde_json::Value>, logger: Logger) -> Result<Self, String>;
298
299    /// Process an incoming packet
300    ///
301    /// # Errors
302    ///
303    /// Returns an error if packet processing fails
304    fn process(&mut self, pin: &str, packet: Packet, output: &OutputSender) -> Result<(), String>;
305
306    /// Update runtime parameters (optional)
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if parameter update fails (e.g., invalid values)
311    fn update_params(&mut self, _params: Option<serde_json::Value>) -> Result<(), String> {
312        Ok(())
313    }
314
315    /// Flush any buffered data when input stream ends (optional)
316    ///
317    /// Called when the input stream closes, allowing plugins to process any
318    /// remaining buffered data before cleanup. This is useful for nodes that
319    /// buffer input (e.g., sentence splitting in TTS, frame buffering in codecs).
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if flushing fails
324    fn flush(&mut self, _output: &OutputSender) -> Result<(), String> {
325        Ok(())
326    }
327
328    /// Clean up resources (optional)
329    fn cleanup(&mut self) {}
330}
331
332/// Optional trait for plugins that need shared resource management (e.g., ML models).
333///
334/// Plugins that implement this trait can have their resources (models) automatically
335/// cached and shared across multiple node instances. This avoids loading the same
336/// model multiple times in memory.
337///
338/// # Example
339///
340/// ```ignore
341/// use streamkit_plugin_sdk_native::prelude::*;
342/// use std::sync::Arc;
343///
344/// pub struct MyModelResource {
345///     model_data: Vec<f32>,
346/// }
347///
348/// impl Resource for MyModelResource {
349///     fn size_bytes(&self) -> usize {
350///         self.model_data.len() * std::mem::size_of::<f32>()
351///     }
352///     fn resource_type(&self) -> &str { "ml_model" }
353/// }
354///
355/// pub struct MyPlugin {
356///     resource: Arc<MyModelResource>,
357/// }
358///
359/// // Note: MyPlugin must also implement NativeProcessorNode for this to compile
360/// impl ResourceSupport for MyPlugin {
361///     type Resource = MyModelResource;
362///
363///     fn compute_resource_key(params: Option<&serde_json::Value>) -> String {
364///         // Hash only the params that affect resource creation
365///         format!("{:?}", params)
366///     }
367///
368///     fn init_resource(params: Option<serde_json::Value>) -> Result<Self::Resource, String> {
369///         // Load model (can be expensive, but only happens once per unique params)
370///         Ok(MyModelResource { model_data: vec![0.0; 1000] })
371///     }
372/// }
373/// ```
374pub trait ResourceSupport: NativeProcessorNode {
375    /// The type of resource this plugin uses
376    type Resource: Resource + 'static;
377
378    /// Compute a cache key from parameters.
379    ///
380    /// This should hash only the parameters that affect resource initialization
381    /// (e.g., model path, GPU device ID). Different parameters that produce the
382    /// same key will share the same cached resource.
383    fn compute_resource_key(params: Option<&serde_json::Value>) -> String;
384
385    /// Initialize/load the resource.
386    ///
387    /// This is called once per unique cache key. The result is cached and shared
388    /// across all node instances with matching parameters.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if resource initialization fails (e.g., model file not found,
393    /// GPU initialization error).
394    ///
395    /// # Note
396    ///
397    /// This method may be called from a blocking thread pool to avoid blocking
398    /// async execution during model loading.
399    fn init_resource(params: Option<serde_json::Value>) -> Result<Self::Resource, String>;
400
401    /// Optional cleanup when the resource is being unloaded.
402    ///
403    /// This is called when the last reference to the resource is dropped
404    /// (typically during plugin unload or LRU eviction).
405    fn deinit_resource(_resource: Self::Resource) {
406        // Default: just drop it
407    }
408}
409
410/// Macro to generate C ABI exports for a plugin
411///
412/// This macro should be called once per plugin with the type that implements
413/// `NativeProcessorNode`.
414///
415/// # Example
416/// ```no_run
417/// # use streamkit_plugin_sdk_native::prelude::*;
418/// # struct MyPlugin;
419/// # impl NativeProcessorNode for MyPlugin {
420/// #     fn metadata() -> NodeMetadata { unimplemented!() }
421/// #     fn new(_: Option<serde_json::Value>, _: Logger) -> Result<Self, String> { unimplemented!() }
422/// #     fn process(&mut self, _: &str, _: Packet, _: &OutputSender) -> Result<(), String> { unimplemented!() }
423/// # }
424/// native_plugin_entry!(MyPlugin);
425/// ```
426#[macro_export]
427macro_rules! native_plugin_entry {
428    ($plugin_type:ty) => {
429        // Static metadata storage
430        static mut METADATA: std::sync::OnceLock<(
431            $crate::types::CNodeMetadata,
432            Vec<$crate::types::CInputPin>,
433            Vec<$crate::types::COutputPin>,
434            Vec<std::ffi::CString>,
435            Vec<Vec<$crate::types::CPacketTypeInfo>>,
436            Vec<Vec<Option<$crate::types::CAudioFormat>>>,
437            Vec<Vec<Option<std::ffi::CString>>>,
438            Vec<std::ffi::CString>,
439            Vec<Option<$crate::types::CAudioFormat>>,
440            Vec<Option<std::ffi::CString>>,
441            Vec<std::ffi::CString>,
442            Vec<*const std::os::raw::c_char>,
443            std::ffi::CString,
444            Option<std::ffi::CString>,
445            std::ffi::CString,
446        )> = std::sync::OnceLock::new();
447
448        #[no_mangle]
449        pub extern "C" fn streamkit_native_plugin_api() -> *const $crate::types::CNativePluginAPI {
450            static API: $crate::types::CNativePluginAPI = $crate::types::CNativePluginAPI {
451                version: $crate::types::NATIVE_PLUGIN_API_VERSION,
452                get_metadata: __plugin_get_metadata,
453                create_instance: __plugin_create_instance,
454                process_packet: __plugin_process_packet,
455                update_params: __plugin_update_params,
456                flush: __plugin_flush,
457                destroy_instance: __plugin_destroy_instance,
458            };
459            &API
460        }
461
462        extern "C" fn __plugin_get_metadata() -> *const $crate::types::CNodeMetadata {
463            unsafe {
464                let metadata = METADATA.get_or_init(|| {
465                    let meta = <$plugin_type as $crate::NativeProcessorNode>::metadata();
466
467                    // Convert inputs
468                    let mut c_inputs = Vec::new();
469                    let mut input_names = Vec::new();
470                    let mut input_types = Vec::new();
471                    let mut input_audio_formats = Vec::new();
472                    let mut input_custom_type_ids = Vec::new();
473
474                    for input in &meta.inputs {
475                        let name = std::ffi::CString::new(input.name.as_str())
476                            .expect("Input pin name should not contain null bytes");
477                        let mut types_info = Vec::new();
478                        let mut formats = Vec::new();
479                        let mut custom_type_ids = Vec::new();
480
481                        // First, collect all the audio formats
482                        for pt in &input.accepts_types {
483                            let (_type_info, audio_format) =
484                                $crate::conversions::packet_type_to_c(pt);
485                            formats.push(audio_format);
486                            let custom_type_id = match pt {
487                                $crate::streamkit_core::types::PacketType::Custom { type_id } => {
488                                    Some(std::ffi::CString::new(type_id.as_str()).expect(
489                                        "Custom type_id should not contain null bytes",
490                                    ))
491                                }
492                                _ => None,
493                            };
494                            custom_type_ids.push(custom_type_id);
495                        }
496
497                        // Now create CPacketTypeInfo with stable pointers to the stored formats
498                        for (idx, pt) in input.accepts_types.iter().enumerate() {
499                            let type_discriminant = match pt {
500                                $crate::streamkit_core::types::PacketType::RawAudio(_) => {
501                                    $crate::types::CPacketType::RawAudio
502                                }
503                                $crate::streamkit_core::types::PacketType::OpusAudio => {
504                                    $crate::types::CPacketType::OpusAudio
505                                }
506                                $crate::streamkit_core::types::PacketType::Text => {
507                                    $crate::types::CPacketType::Text
508                                }
509                                $crate::streamkit_core::types::PacketType::Transcription => {
510                                    $crate::types::CPacketType::Transcription
511                                }
512                                $crate::streamkit_core::types::PacketType::Custom { .. } => {
513                                    $crate::types::CPacketType::Custom
514                                }
515                                $crate::streamkit_core::types::PacketType::Binary => {
516                                    $crate::types::CPacketType::Binary
517                                }
518                                $crate::streamkit_core::types::PacketType::Any => {
519                                    $crate::types::CPacketType::Any
520                                }
521                                $crate::streamkit_core::types::PacketType::Passthrough => {
522                                    $crate::types::CPacketType::Any
523                                }
524                            };
525
526                            let audio_format_ptr = if let Some(ref fmt) = formats[idx] {
527                                fmt as *const $crate::types::CAudioFormat
528                            } else {
529                                std::ptr::null()
530                            };
531
532                            let custom_type_id_ptr = if let Some(ref s) = custom_type_ids[idx] {
533                                s.as_ptr()
534                            } else {
535                                std::ptr::null()
536                            };
537
538                            types_info.push($crate::types::CPacketTypeInfo {
539                                type_discriminant,
540                                audio_format: audio_format_ptr,
541                                custom_type_id: custom_type_id_ptr,
542                            });
543                        }
544
545                        c_inputs.push($crate::types::CInputPin {
546                            name: name.as_ptr(),
547                            accepts_types: types_info.as_ptr(),
548                            accepts_types_count: types_info.len(),
549                        });
550
551                        input_names.push(name);
552                        input_types.push(types_info);
553                        input_audio_formats.push(formats);
554                        input_custom_type_ids.push(custom_type_ids);
555                    }
556
557                    // Convert outputs
558                    let mut c_outputs = Vec::new();
559                    let mut output_names = Vec::new();
560                    let mut output_audio_formats = Vec::new();
561                    let mut output_custom_type_ids = Vec::new();
562
563                    for output in &meta.outputs {
564                        let name = std::ffi::CString::new(output.name.as_str())
565                            .expect("Output pin name should not contain null bytes");
566
567                        // First, store the audio format
568                        let (_type_info, audio_format) =
569                            $crate::conversions::packet_type_to_c(&output.produces_type);
570                        output_audio_formats.push(audio_format);
571                        let output_custom_type_id = match &output.produces_type {
572                            $crate::streamkit_core::types::PacketType::Custom { type_id } => {
573                                Some(std::ffi::CString::new(type_id.as_str()).expect(
574                                    "Custom type_id should not contain null bytes",
575                                ))
576                            }
577                            _ => None,
578                        };
579                        output_custom_type_ids.push(output_custom_type_id);
580
581                        // Now create CPacketTypeInfo with stable pointer to the stored format
582                        let type_discriminant = match output.produces_type {
583                            $crate::streamkit_core::types::PacketType::RawAudio(_) => {
584                                $crate::types::CPacketType::RawAudio
585                            }
586                            $crate::streamkit_core::types::PacketType::OpusAudio => {
587                                $crate::types::CPacketType::OpusAudio
588                            }
589                            $crate::streamkit_core::types::PacketType::Text => {
590                                $crate::types::CPacketType::Text
591                            }
592                            $crate::streamkit_core::types::PacketType::Transcription => {
593                                $crate::types::CPacketType::Transcription
594                            }
595                            $crate::streamkit_core::types::PacketType::Custom { .. } => {
596                                $crate::types::CPacketType::Custom
597                            }
598                            $crate::streamkit_core::types::PacketType::Binary => {
599                                $crate::types::CPacketType::Binary
600                            }
601                            $crate::streamkit_core::types::PacketType::Any => {
602                                $crate::types::CPacketType::Any
603                            }
604                            $crate::streamkit_core::types::PacketType::Passthrough => {
605                                $crate::types::CPacketType::Any
606                            }
607                        };
608
609                        // SAFETY: We just pushed an element, so last() is guaranteed to be Some
610                        #[allow(clippy::unwrap_used)]
611                        let audio_format_ptr =
612                            if let Some(ref fmt) = output_audio_formats.last().unwrap() {
613                                fmt as *const $crate::types::CAudioFormat
614                            } else {
615                                std::ptr::null()
616                            };
617
618                        // SAFETY: We just pushed an element, so last() is guaranteed to be Some
619                        #[allow(clippy::unwrap_used)]
620                        let custom_type_id_ptr =
621                            if let Some(ref s) = output_custom_type_ids.last().unwrap() {
622                                s.as_ptr()
623                            } else {
624                                std::ptr::null()
625                            };
626
627                        let type_info = $crate::types::CPacketTypeInfo {
628                            type_discriminant,
629                            audio_format: audio_format_ptr,
630                            custom_type_id: custom_type_id_ptr,
631                        };
632
633                        c_outputs.push($crate::types::COutputPin {
634                            name: name.as_ptr(),
635                            produces_type: type_info,
636                        });
637                        output_names.push(name);
638                    }
639
640                    // Convert categories
641                    let mut category_strings = Vec::new();
642                    let mut category_ptrs = Vec::new();
643
644                    for cat in &meta.categories {
645                        let c_str = std::ffi::CString::new(cat.as_str())
646                            .expect("Category name should not contain null bytes");
647                        category_ptrs.push(c_str.as_ptr());
648                        category_strings.push(c_str);
649                    }
650
651                    let kind = std::ffi::CString::new(meta.kind.as_str())
652                        .expect("Node kind should not contain null bytes");
653                    let description = meta.description.as_ref().map(|d| {
654                        std::ffi::CString::new(d.as_str())
655                            .expect("Description should not contain null bytes")
656                    });
657                    let param_schema = std::ffi::CString::new(meta.param_schema.to_string())
658                        .expect("Param schema JSON should not contain null bytes");
659
660                    let c_metadata = $crate::types::CNodeMetadata {
661                        kind: kind.as_ptr(),
662                        description: description.as_ref().map_or(std::ptr::null(), |d| d.as_ptr()),
663                        inputs: c_inputs.as_ptr(),
664                        inputs_count: c_inputs.len(),
665                        outputs: c_outputs.as_ptr(),
666                        outputs_count: c_outputs.len(),
667                        param_schema: param_schema.as_ptr(),
668                        categories: category_ptrs.as_ptr(),
669                        categories_count: category_ptrs.len(),
670                    };
671
672                    (
673                        c_metadata,
674                        c_inputs,
675                        c_outputs,
676                        input_names,
677                        input_types,
678                        input_audio_formats,
679                        input_custom_type_ids,
680                        output_names,
681                        output_audio_formats,
682                        output_custom_type_ids,
683                        category_strings,
684                        category_ptrs,
685                        kind,
686                        description,
687                        param_schema,
688                    )
689                });
690
691                &metadata.0
692            }
693        }
694
695        extern "C" fn __plugin_create_instance(
696            params: *const std::os::raw::c_char,
697            log_callback: $crate::types::CLogCallback,
698            log_user_data: *mut std::os::raw::c_void,
699        ) -> $crate::types::CPluginHandle {
700            let params_json = if params.is_null() {
701                None
702            } else {
703                match unsafe { $crate::conversions::c_str_to_string(params) } {
704                    Ok(s) if s.is_empty() => None,
705                    Ok(s) => match serde_json::from_str(&s) {
706                        Ok(v) => Some(v),
707                        Err(_) => return std::ptr::null_mut(),
708                    },
709                    Err(_) => return std::ptr::null_mut(),
710                }
711            };
712
713            // Create logger for this plugin instance
714            let logger = $crate::logger::Logger::new(log_callback, log_user_data, module_path!());
715
716            match <$plugin_type as $crate::NativeProcessorNode>::new(params_json, logger) {
717                Ok(instance) => Box::into_raw(Box::new(instance)) as $crate::types::CPluginHandle,
718                Err(_) => std::ptr::null_mut(),
719            }
720        }
721
722        extern "C" fn __plugin_process_packet(
723            handle: $crate::types::CPluginHandle,
724            input_pin: *const std::os::raw::c_char,
725            packet: *const $crate::types::CPacket,
726            output_callback: $crate::types::COutputCallback,
727            callback_data: *mut std::os::raw::c_void,
728            telemetry_callback: $crate::types::CTelemetryCallback,
729            telemetry_callback_data: *mut std::os::raw::c_void,
730        ) -> $crate::types::CResult {
731            if handle.is_null() || input_pin.is_null() || packet.is_null() {
732                return $crate::types::CResult::error(std::ptr::null());
733            }
734
735            let instance = unsafe { &mut *(handle as *mut $plugin_type) };
736
737            let pin_name = match unsafe { $crate::conversions::c_str_to_string(input_pin) } {
738                Ok(s) => s,
739                Err(e) => {
740                    let err_msg = $crate::conversions::error_to_c(format!("Invalid pin name: {}", e));
741                    return $crate::types::CResult::error(err_msg);
742                }
743            };
744
745            let rust_packet = match unsafe { $crate::conversions::packet_from_c(packet) } {
746                Ok(p) => p,
747                Err(e) => {
748                    let err_msg = $crate::conversions::error_to_c(format!("Invalid packet: {}", e));
749                    return $crate::types::CResult::error(err_msg);
750                }
751            };
752
753            let output = $crate::OutputSender::from_callbacks(
754                output_callback,
755                callback_data,
756                telemetry_callback,
757                telemetry_callback_data,
758            );
759
760            match instance.process(&pin_name, rust_packet, &output) {
761                Ok(()) => $crate::types::CResult::success(),
762                Err(e) => {
763                    let err_msg = $crate::conversions::error_to_c(e);
764                    $crate::types::CResult::error(err_msg)
765                }
766            }
767        }
768
769        extern "C" fn __plugin_update_params(
770            handle: $crate::types::CPluginHandle,
771            params: *const std::os::raw::c_char,
772        ) -> $crate::types::CResult {
773            if handle.is_null() {
774                let err_msg = $crate::conversions::error_to_c("Invalid handle (null)");
775                return $crate::types::CResult::error(err_msg);
776            }
777
778            let instance = unsafe { &mut *(handle as *mut $plugin_type) };
779
780            let params_json = if params.is_null() {
781                None
782            } else {
783                match unsafe { $crate::conversions::c_str_to_string(params) } {
784                    Ok(s) if s.is_empty() => None,
785                    Ok(s) => match serde_json::from_str(&s) {
786                        Ok(v) => Some(v),
787                        Err(e) => {
788                            let err_msg =
789                                $crate::conversions::error_to_c(format!("Invalid params JSON: {e}"));
790                            return $crate::types::CResult::error(err_msg);
791                        },
792                    },
793                    Err(e) => {
794                        let err_msg =
795                            $crate::conversions::error_to_c(format!("Invalid params string: {e}"));
796                        return $crate::types::CResult::error(err_msg);
797                    },
798                }
799            };
800
801            match instance.update_params(params_json) {
802                Ok(()) => $crate::types::CResult::success(),
803                Err(e) => {
804                    let err_msg = $crate::conversions::error_to_c(e);
805                    $crate::types::CResult::error(err_msg)
806                },
807            }
808        }
809
810        extern "C" fn __plugin_flush(
811            handle: $crate::types::CPluginHandle,
812            callback: $crate::types::COutputCallback,
813            callback_data: *mut std::os::raw::c_void,
814            telemetry_callback: $crate::types::CTelemetryCallback,
815            telemetry_callback_data: *mut std::os::raw::c_void,
816        ) -> $crate::types::CResult {
817            tracing::info!("__plugin_flush called");
818            if handle.is_null() {
819                tracing::error!("Handle is null");
820                let err_msg = $crate::conversions::error_to_c("Invalid handle (null)");
821                return $crate::types::CResult::error(err_msg);
822            }
823
824            let instance = unsafe { &mut *(handle as *mut $plugin_type) };
825            tracing::info!("Got instance pointer");
826
827            // Create OutputSender wrapper for the callback
828            let output_sender = $crate::OutputSender::from_callbacks(
829                callback,
830                callback_data,
831                telemetry_callback,
832                telemetry_callback_data,
833            );
834            tracing::info!("Created OutputSender, calling instance.flush()");
835
836            match instance.flush(&output_sender) {
837                Ok(()) => {
838                    tracing::info!("instance.flush() returned Ok");
839                    $crate::types::CResult::success()
840                },
841                Err(e) => {
842                    tracing::error!(error = %e, "instance.flush() returned Err");
843                    let err_msg = $crate::conversions::error_to_c(e);
844                    $crate::types::CResult::error(err_msg)
845                },
846            }
847        }
848
849        extern "C" fn __plugin_destroy_instance(handle: $crate::types::CPluginHandle) {
850            if !handle.is_null() {
851                let mut instance = unsafe { Box::from_raw(handle as *mut $plugin_type) };
852                instance.cleanup();
853            }
854        }
855    };
856}