1pub mod conversions;
47pub mod logger;
48pub mod types;
49
50use std::ffi::CString;
51use streamkit_core::types::{Packet, PacketType};
52use streamkit_core::{InputPin, OutputPin, PinCardinality, Resource};
53
54use logger::Logger;
55
56pub use streamkit_core;
57pub use types::*;
58
59pub mod prelude {
61 pub use crate::logger::Logger;
62 pub use crate::types::{CLogCallback, CLogLevel};
63 pub use crate::{
64 native_plugin_entry, plugin_debug, plugin_error, plugin_info, plugin_log, plugin_trace,
65 plugin_warn, NativeProcessorNode, NodeMetadata, OutputSender, ResourceSupport,
66 };
67 pub use streamkit_core::types::{AudioFrame, Packet, PacketType};
68 pub use streamkit_core::{InputPin, OutputPin, PinCardinality, Resource};
69}
70
71pub struct NodeMetadata {
73 pub kind: String,
74 pub description: Option<String>,
75 pub inputs: Vec<InputPin>,
76 pub outputs: Vec<OutputPin>,
77 pub param_schema: serde_json::Value,
78 pub categories: Vec<String>,
79}
80
81impl NodeMetadata {
82 pub fn builder(kind: &str) -> NodeMetadataBuilder {
84 NodeMetadataBuilder {
85 kind: kind.to_string(),
86 description: None,
87 inputs: Vec::new(),
88 outputs: Vec::new(),
89 param_schema: serde_json::json!({}),
90 categories: Vec::new(),
91 }
92 }
93}
94
95pub struct NodeMetadataBuilder {
97 kind: String,
98 description: Option<String>,
99 inputs: Vec<InputPin>,
100 outputs: Vec<OutputPin>,
101 param_schema: serde_json::Value,
102 categories: Vec<String>,
103}
104
105impl NodeMetadataBuilder {
106 #[must_use]
108 pub fn description(mut self, description: impl Into<String>) -> Self {
109 self.description = Some(description.into());
110 self
111 }
112
113 #[must_use]
115 pub fn input(mut self, name: &str, accepts_types: &[PacketType]) -> Self {
116 self.inputs.push(InputPin {
117 name: name.to_string(),
118 accepts_types: accepts_types.to_vec(),
119 cardinality: PinCardinality::One,
120 });
121 self
122 }
123
124 #[must_use]
126 pub fn output(mut self, name: &str, produces_type: PacketType) -> Self {
127 self.outputs.push(OutputPin {
128 name: name.to_string(),
129 produces_type,
130 cardinality: PinCardinality::Broadcast,
131 });
132 self
133 }
134
135 #[must_use]
137 pub fn param_schema(mut self, schema: serde_json::Value) -> Self {
138 self.param_schema = schema;
139 self
140 }
141
142 #[must_use]
144 pub fn category(mut self, category: &str) -> Self {
145 self.categories.push(category.to_string());
146 self
147 }
148
149 pub fn build(self) -> NodeMetadata {
151 NodeMetadata {
152 kind: self.kind,
153 description: self.description,
154 inputs: self.inputs,
155 outputs: self.outputs,
156 param_schema: self.param_schema,
157 categories: self.categories,
158 }
159 }
160}
161
162pub struct OutputSender {
164 output_callback: COutputCallback,
165 output_user_data: *mut std::os::raw::c_void,
166 telemetry_callback: types::CTelemetryCallback,
167 telemetry_user_data: *mut std::os::raw::c_void,
168}
169
170impl OutputSender {
171 pub fn from_callback(callback: COutputCallback, user_data: *mut std::os::raw::c_void) -> Self {
173 Self {
174 output_callback: callback,
175 output_user_data: user_data,
176 telemetry_callback: None,
177 telemetry_user_data: std::ptr::null_mut(),
178 }
179 }
180
181 pub fn from_callbacks(
185 output_callback: COutputCallback,
186 output_user_data: *mut std::os::raw::c_void,
187 telemetry_callback: types::CTelemetryCallback,
188 telemetry_user_data: *mut std::os::raw::c_void,
189 ) -> Self {
190 Self { output_callback, output_user_data, telemetry_callback, telemetry_user_data }
191 }
192
193 pub fn send(&self, pin: &str, packet: &Packet) -> Result<(), String> {
201 let pin_c = CString::new(pin).map_err(|e| format!("Invalid pin name: {e}"))?;
202
203 let packet_repr = conversions::packet_to_c(packet);
204 let result = (self.output_callback)(
205 pin_c.as_ptr(),
206 &raw const packet_repr.packet,
207 self.output_user_data,
208 );
209
210 if result.success {
211 Ok(())
212 } else {
213 let error_msg = if result.error_message.is_null() {
214 "Unknown error".to_string()
215 } else {
216 unsafe {
217 conversions::c_str_to_string(result.error_message)
218 .unwrap_or_else(|_| "Unknown error".to_string())
219 }
220 };
221 Err(error_msg)
222 }
223 }
224
225 pub fn emit_telemetry(
238 &self,
239 event_type: &str,
240 data: &serde_json::Value,
241 timestamp_us: Option<u64>,
242 ) -> Result<(), String> {
243 let Some(cb) = self.telemetry_callback else {
244 return Ok(());
245 };
246
247 let event_type_c =
248 CString::new(event_type).map_err(|e| format!("Invalid event_type: {e}"))?;
249 let data_json = serde_json::to_vec(data)
250 .map_err(|e| format!("Failed to serialize telemetry JSON: {e}"))?;
251
252 let meta = timestamp_us.map(|ts| types::CPacketMetadata {
253 timestamp_us: ts,
254 has_timestamp_us: true,
255 duration_us: 0,
256 has_duration_us: false,
257 sequence: 0,
258 has_sequence: false,
259 });
260 let meta_ptr = meta.as_ref().map_or(std::ptr::null(), std::ptr::from_ref);
261
262 let result = cb(
263 event_type_c.as_ptr(),
264 data_json.as_ptr(),
265 data_json.len(),
266 meta_ptr,
267 self.telemetry_user_data,
268 );
269
270 if result.success {
271 Ok(())
272 } else {
273 let error_msg = if result.error_message.is_null() {
274 "Unknown error".to_string()
275 } else {
276 unsafe {
277 conversions::c_str_to_string(result.error_message)
278 .unwrap_or_else(|_| "Unknown error".to_string())
279 }
280 };
281 Err(error_msg)
282 }
283 }
284}
285
286pub trait NativeProcessorNode: Sized + Send + 'static {
289 fn metadata() -> NodeMetadata;
291
292 fn new(params: Option<serde_json::Value>, logger: Logger) -> Result<Self, String>;
298
299 fn process(&mut self, pin: &str, packet: Packet, output: &OutputSender) -> Result<(), String>;
305
306 fn update_params(&mut self, _params: Option<serde_json::Value>) -> Result<(), String> {
312 Ok(())
313 }
314
315 fn flush(&mut self, _output: &OutputSender) -> Result<(), String> {
325 Ok(())
326 }
327
328 fn cleanup(&mut self) {}
330}
331
332pub trait ResourceSupport: NativeProcessorNode {
375 type Resource: Resource + 'static;
377
378 fn compute_resource_key(params: Option<&serde_json::Value>) -> String;
384
385 fn init_resource(params: Option<serde_json::Value>) -> Result<Self::Resource, String>;
400
401 fn deinit_resource(_resource: Self::Resource) {
406 }
408}
409
410#[macro_export]
427macro_rules! native_plugin_entry {
428 ($plugin_type:ty) => {
429 static mut METADATA: std::sync::OnceLock<(
431 $crate::types::CNodeMetadata,
432 Vec<$crate::types::CInputPin>,
433 Vec<$crate::types::COutputPin>,
434 Vec<std::ffi::CString>,
435 Vec<Vec<$crate::types::CPacketTypeInfo>>,
436 Vec<Vec<Option<$crate::types::CAudioFormat>>>,
437 Vec<Vec<Option<std::ffi::CString>>>,
438 Vec<std::ffi::CString>,
439 Vec<Option<$crate::types::CAudioFormat>>,
440 Vec<Option<std::ffi::CString>>,
441 Vec<std::ffi::CString>,
442 Vec<*const std::os::raw::c_char>,
443 std::ffi::CString,
444 Option<std::ffi::CString>,
445 std::ffi::CString,
446 )> = std::sync::OnceLock::new();
447
448 #[no_mangle]
449 pub extern "C" fn streamkit_native_plugin_api() -> *const $crate::types::CNativePluginAPI {
450 static API: $crate::types::CNativePluginAPI = $crate::types::CNativePluginAPI {
451 version: $crate::types::NATIVE_PLUGIN_API_VERSION,
452 get_metadata: __plugin_get_metadata,
453 create_instance: __plugin_create_instance,
454 process_packet: __plugin_process_packet,
455 update_params: __plugin_update_params,
456 flush: __plugin_flush,
457 destroy_instance: __plugin_destroy_instance,
458 };
459 &API
460 }
461
462 extern "C" fn __plugin_get_metadata() -> *const $crate::types::CNodeMetadata {
463 unsafe {
464 let metadata = METADATA.get_or_init(|| {
465 let meta = <$plugin_type as $crate::NativeProcessorNode>::metadata();
466
467 let mut c_inputs = Vec::new();
469 let mut input_names = Vec::new();
470 let mut input_types = Vec::new();
471 let mut input_audio_formats = Vec::new();
472 let mut input_custom_type_ids = Vec::new();
473
474 for input in &meta.inputs {
475 let name = std::ffi::CString::new(input.name.as_str())
476 .expect("Input pin name should not contain null bytes");
477 let mut types_info = Vec::new();
478 let mut formats = Vec::new();
479 let mut custom_type_ids = Vec::new();
480
481 for pt in &input.accepts_types {
483 let (_type_info, audio_format) =
484 $crate::conversions::packet_type_to_c(pt);
485 formats.push(audio_format);
486 let custom_type_id = match pt {
487 $crate::streamkit_core::types::PacketType::Custom { type_id } => {
488 Some(std::ffi::CString::new(type_id.as_str()).expect(
489 "Custom type_id should not contain null bytes",
490 ))
491 }
492 _ => None,
493 };
494 custom_type_ids.push(custom_type_id);
495 }
496
497 for (idx, pt) in input.accepts_types.iter().enumerate() {
499 let type_discriminant = match pt {
500 $crate::streamkit_core::types::PacketType::RawAudio(_) => {
501 $crate::types::CPacketType::RawAudio
502 }
503 $crate::streamkit_core::types::PacketType::OpusAudio => {
504 $crate::types::CPacketType::OpusAudio
505 }
506 $crate::streamkit_core::types::PacketType::Text => {
507 $crate::types::CPacketType::Text
508 }
509 $crate::streamkit_core::types::PacketType::Transcription => {
510 $crate::types::CPacketType::Transcription
511 }
512 $crate::streamkit_core::types::PacketType::Custom { .. } => {
513 $crate::types::CPacketType::Custom
514 }
515 $crate::streamkit_core::types::PacketType::Binary => {
516 $crate::types::CPacketType::Binary
517 }
518 $crate::streamkit_core::types::PacketType::Any => {
519 $crate::types::CPacketType::Any
520 }
521 $crate::streamkit_core::types::PacketType::Passthrough => {
522 $crate::types::CPacketType::Any
523 }
524 };
525
526 let audio_format_ptr = if let Some(ref fmt) = formats[idx] {
527 fmt as *const $crate::types::CAudioFormat
528 } else {
529 std::ptr::null()
530 };
531
532 let custom_type_id_ptr = if let Some(ref s) = custom_type_ids[idx] {
533 s.as_ptr()
534 } else {
535 std::ptr::null()
536 };
537
538 types_info.push($crate::types::CPacketTypeInfo {
539 type_discriminant,
540 audio_format: audio_format_ptr,
541 custom_type_id: custom_type_id_ptr,
542 });
543 }
544
545 c_inputs.push($crate::types::CInputPin {
546 name: name.as_ptr(),
547 accepts_types: types_info.as_ptr(),
548 accepts_types_count: types_info.len(),
549 });
550
551 input_names.push(name);
552 input_types.push(types_info);
553 input_audio_formats.push(formats);
554 input_custom_type_ids.push(custom_type_ids);
555 }
556
557 let mut c_outputs = Vec::new();
559 let mut output_names = Vec::new();
560 let mut output_audio_formats = Vec::new();
561 let mut output_custom_type_ids = Vec::new();
562
563 for output in &meta.outputs {
564 let name = std::ffi::CString::new(output.name.as_str())
565 .expect("Output pin name should not contain null bytes");
566
567 let (_type_info, audio_format) =
569 $crate::conversions::packet_type_to_c(&output.produces_type);
570 output_audio_formats.push(audio_format);
571 let output_custom_type_id = match &output.produces_type {
572 $crate::streamkit_core::types::PacketType::Custom { type_id } => {
573 Some(std::ffi::CString::new(type_id.as_str()).expect(
574 "Custom type_id should not contain null bytes",
575 ))
576 }
577 _ => None,
578 };
579 output_custom_type_ids.push(output_custom_type_id);
580
581 let type_discriminant = match output.produces_type {
583 $crate::streamkit_core::types::PacketType::RawAudio(_) => {
584 $crate::types::CPacketType::RawAudio
585 }
586 $crate::streamkit_core::types::PacketType::OpusAudio => {
587 $crate::types::CPacketType::OpusAudio
588 }
589 $crate::streamkit_core::types::PacketType::Text => {
590 $crate::types::CPacketType::Text
591 }
592 $crate::streamkit_core::types::PacketType::Transcription => {
593 $crate::types::CPacketType::Transcription
594 }
595 $crate::streamkit_core::types::PacketType::Custom { .. } => {
596 $crate::types::CPacketType::Custom
597 }
598 $crate::streamkit_core::types::PacketType::Binary => {
599 $crate::types::CPacketType::Binary
600 }
601 $crate::streamkit_core::types::PacketType::Any => {
602 $crate::types::CPacketType::Any
603 }
604 $crate::streamkit_core::types::PacketType::Passthrough => {
605 $crate::types::CPacketType::Any
606 }
607 };
608
609 #[allow(clippy::unwrap_used)]
611 let audio_format_ptr =
612 if let Some(ref fmt) = output_audio_formats.last().unwrap() {
613 fmt as *const $crate::types::CAudioFormat
614 } else {
615 std::ptr::null()
616 };
617
618 #[allow(clippy::unwrap_used)]
620 let custom_type_id_ptr =
621 if let Some(ref s) = output_custom_type_ids.last().unwrap() {
622 s.as_ptr()
623 } else {
624 std::ptr::null()
625 };
626
627 let type_info = $crate::types::CPacketTypeInfo {
628 type_discriminant,
629 audio_format: audio_format_ptr,
630 custom_type_id: custom_type_id_ptr,
631 };
632
633 c_outputs.push($crate::types::COutputPin {
634 name: name.as_ptr(),
635 produces_type: type_info,
636 });
637 output_names.push(name);
638 }
639
640 let mut category_strings = Vec::new();
642 let mut category_ptrs = Vec::new();
643
644 for cat in &meta.categories {
645 let c_str = std::ffi::CString::new(cat.as_str())
646 .expect("Category name should not contain null bytes");
647 category_ptrs.push(c_str.as_ptr());
648 category_strings.push(c_str);
649 }
650
651 let kind = std::ffi::CString::new(meta.kind.as_str())
652 .expect("Node kind should not contain null bytes");
653 let description = meta.description.as_ref().map(|d| {
654 std::ffi::CString::new(d.as_str())
655 .expect("Description should not contain null bytes")
656 });
657 let param_schema = std::ffi::CString::new(meta.param_schema.to_string())
658 .expect("Param schema JSON should not contain null bytes");
659
660 let c_metadata = $crate::types::CNodeMetadata {
661 kind: kind.as_ptr(),
662 description: description.as_ref().map_or(std::ptr::null(), |d| d.as_ptr()),
663 inputs: c_inputs.as_ptr(),
664 inputs_count: c_inputs.len(),
665 outputs: c_outputs.as_ptr(),
666 outputs_count: c_outputs.len(),
667 param_schema: param_schema.as_ptr(),
668 categories: category_ptrs.as_ptr(),
669 categories_count: category_ptrs.len(),
670 };
671
672 (
673 c_metadata,
674 c_inputs,
675 c_outputs,
676 input_names,
677 input_types,
678 input_audio_formats,
679 input_custom_type_ids,
680 output_names,
681 output_audio_formats,
682 output_custom_type_ids,
683 category_strings,
684 category_ptrs,
685 kind,
686 description,
687 param_schema,
688 )
689 });
690
691 &metadata.0
692 }
693 }
694
695 extern "C" fn __plugin_create_instance(
696 params: *const std::os::raw::c_char,
697 log_callback: $crate::types::CLogCallback,
698 log_user_data: *mut std::os::raw::c_void,
699 ) -> $crate::types::CPluginHandle {
700 let params_json = if params.is_null() {
701 None
702 } else {
703 match unsafe { $crate::conversions::c_str_to_string(params) } {
704 Ok(s) if s.is_empty() => None,
705 Ok(s) => match serde_json::from_str(&s) {
706 Ok(v) => Some(v),
707 Err(_) => return std::ptr::null_mut(),
708 },
709 Err(_) => return std::ptr::null_mut(),
710 }
711 };
712
713 let logger = $crate::logger::Logger::new(log_callback, log_user_data, module_path!());
715
716 match <$plugin_type as $crate::NativeProcessorNode>::new(params_json, logger) {
717 Ok(instance) => Box::into_raw(Box::new(instance)) as $crate::types::CPluginHandle,
718 Err(_) => std::ptr::null_mut(),
719 }
720 }
721
722 extern "C" fn __plugin_process_packet(
723 handle: $crate::types::CPluginHandle,
724 input_pin: *const std::os::raw::c_char,
725 packet: *const $crate::types::CPacket,
726 output_callback: $crate::types::COutputCallback,
727 callback_data: *mut std::os::raw::c_void,
728 telemetry_callback: $crate::types::CTelemetryCallback,
729 telemetry_callback_data: *mut std::os::raw::c_void,
730 ) -> $crate::types::CResult {
731 if handle.is_null() || input_pin.is_null() || packet.is_null() {
732 return $crate::types::CResult::error(std::ptr::null());
733 }
734
735 let instance = unsafe { &mut *(handle as *mut $plugin_type) };
736
737 let pin_name = match unsafe { $crate::conversions::c_str_to_string(input_pin) } {
738 Ok(s) => s,
739 Err(e) => {
740 let err_msg = $crate::conversions::error_to_c(format!("Invalid pin name: {}", e));
741 return $crate::types::CResult::error(err_msg);
742 }
743 };
744
745 let rust_packet = match unsafe { $crate::conversions::packet_from_c(packet) } {
746 Ok(p) => p,
747 Err(e) => {
748 let err_msg = $crate::conversions::error_to_c(format!("Invalid packet: {}", e));
749 return $crate::types::CResult::error(err_msg);
750 }
751 };
752
753 let output = $crate::OutputSender::from_callbacks(
754 output_callback,
755 callback_data,
756 telemetry_callback,
757 telemetry_callback_data,
758 );
759
760 match instance.process(&pin_name, rust_packet, &output) {
761 Ok(()) => $crate::types::CResult::success(),
762 Err(e) => {
763 let err_msg = $crate::conversions::error_to_c(e);
764 $crate::types::CResult::error(err_msg)
765 }
766 }
767 }
768
769 extern "C" fn __plugin_update_params(
770 handle: $crate::types::CPluginHandle,
771 params: *const std::os::raw::c_char,
772 ) -> $crate::types::CResult {
773 if handle.is_null() {
774 let err_msg = $crate::conversions::error_to_c("Invalid handle (null)");
775 return $crate::types::CResult::error(err_msg);
776 }
777
778 let instance = unsafe { &mut *(handle as *mut $plugin_type) };
779
780 let params_json = if params.is_null() {
781 None
782 } else {
783 match unsafe { $crate::conversions::c_str_to_string(params) } {
784 Ok(s) if s.is_empty() => None,
785 Ok(s) => match serde_json::from_str(&s) {
786 Ok(v) => Some(v),
787 Err(e) => {
788 let err_msg =
789 $crate::conversions::error_to_c(format!("Invalid params JSON: {e}"));
790 return $crate::types::CResult::error(err_msg);
791 },
792 },
793 Err(e) => {
794 let err_msg =
795 $crate::conversions::error_to_c(format!("Invalid params string: {e}"));
796 return $crate::types::CResult::error(err_msg);
797 },
798 }
799 };
800
801 match instance.update_params(params_json) {
802 Ok(()) => $crate::types::CResult::success(),
803 Err(e) => {
804 let err_msg = $crate::conversions::error_to_c(e);
805 $crate::types::CResult::error(err_msg)
806 },
807 }
808 }
809
810 extern "C" fn __plugin_flush(
811 handle: $crate::types::CPluginHandle,
812 callback: $crate::types::COutputCallback,
813 callback_data: *mut std::os::raw::c_void,
814 telemetry_callback: $crate::types::CTelemetryCallback,
815 telemetry_callback_data: *mut std::os::raw::c_void,
816 ) -> $crate::types::CResult {
817 tracing::info!("__plugin_flush called");
818 if handle.is_null() {
819 tracing::error!("Handle is null");
820 let err_msg = $crate::conversions::error_to_c("Invalid handle (null)");
821 return $crate::types::CResult::error(err_msg);
822 }
823
824 let instance = unsafe { &mut *(handle as *mut $plugin_type) };
825 tracing::info!("Got instance pointer");
826
827 let output_sender = $crate::OutputSender::from_callbacks(
829 callback,
830 callback_data,
831 telemetry_callback,
832 telemetry_callback_data,
833 );
834 tracing::info!("Created OutputSender, calling instance.flush()");
835
836 match instance.flush(&output_sender) {
837 Ok(()) => {
838 tracing::info!("instance.flush() returned Ok");
839 $crate::types::CResult::success()
840 },
841 Err(e) => {
842 tracing::error!(error = %e, "instance.flush() returned Err");
843 let err_msg = $crate::conversions::error_to_c(e);
844 $crate::types::CResult::error(err_msg)
845 },
846 }
847 }
848
849 extern "C" fn __plugin_destroy_instance(handle: $crate::types::CPluginHandle) {
850 if !handle.is_null() {
851 let mut instance = unsafe { Box::from_raw(handle as *mut $plugin_type) };
852 instance.cleanup();
853 }
854 }
855 };
856}