Skip to main content

shape_runtime/plugins/
output_sink.rs

1//! Plugin Output Sink Wrapper
2//!
3//! Provides a Rust-friendly wrapper around the C ABI output sink interface.
4
5use std::ffi::c_void;
6use std::ptr;
7
8use serde_json::Value;
9use shape_abi_v1::OutputSinkVTable;
10
11use shape_ast::error::{Result, ShapeError};
12
13/// Wrapper around a plugin output sink
14///
15/// Used for sending alerts and events to external systems.
16pub struct PluginOutputSink {
17    /// Plugin name
18    name: String,
19    /// Vtable pointer (static lifetime)
20    vtable: &'static OutputSinkVTable,
21    /// Instance pointer (owned by this struct)
22    instance: *mut c_void,
23    /// Tags this sink handles (empty = all)
24    handled_tags: Vec<String>,
25}
26
27impl PluginOutputSink {
28    /// Create a new plugin output sink instance
29    ///
30    /// # Arguments
31    /// * `name` - Plugin name
32    /// * `vtable` - Output sink vtable (must be static)
33    /// * `config` - Configuration value (will be MessagePack encoded)
34    pub fn new(name: String, vtable: &'static OutputSinkVTable, config: &Value) -> Result<Self> {
35        // Serialize config to MessagePack
36        let config_bytes = rmp_serde::to_vec(config).map_err(|e| ShapeError::RuntimeError {
37            message: format!("Failed to serialize plugin config: {}", e),
38            location: None,
39        })?;
40
41        // Initialize the plugin instance
42        let init_fn = vtable.init.ok_or_else(|| ShapeError::RuntimeError {
43            message: format!("Plugin '{}' has no init function", name),
44            location: None,
45        })?;
46
47        let instance = unsafe { init_fn(config_bytes.as_ptr(), config_bytes.len()) };
48        if instance.is_null() {
49            return Err(ShapeError::RuntimeError {
50                message: format!("Plugin '{}' init returned null", name),
51                location: None,
52            });
53        }
54
55        // Get handled tags
56        let handled_tags = Self::get_handled_tags_from_vtable(vtable, instance)?;
57
58        Ok(Self {
59            name,
60            vtable,
61            instance,
62            handled_tags,
63        })
64    }
65
66    /// Get the plugin name
67    pub fn name(&self) -> &str {
68        &self.name
69    }
70
71    /// Get the tags this sink handles (empty = all)
72    pub fn handled_tags(&self) -> &[String] {
73        &self.handled_tags
74    }
75
76    /// Send an alert to the sink
77    ///
78    /// # Arguments
79    /// * `alert` - Alert value to send (will be MessagePack encoded)
80    pub fn send(&self, alert: &Value) -> Result<()> {
81        let send_fn = self.vtable.send.ok_or_else(|| ShapeError::RuntimeError {
82            message: format!("Plugin '{}' has no send function", self.name),
83            location: None,
84        })?;
85
86        let alert_bytes = rmp_serde::to_vec(alert).map_err(|e| ShapeError::RuntimeError {
87            message: format!("Failed to serialize alert: {}", e),
88            location: None,
89        })?;
90
91        let result = unsafe { send_fn(self.instance, alert_bytes.as_ptr(), alert_bytes.len()) };
92
93        if result != 0 {
94            return Err(ShapeError::RuntimeError {
95                message: format!("Plugin '{}' send failed with code {}", self.name, result),
96                location: None,
97            });
98        }
99
100        Ok(())
101    }
102
103    /// Flush any pending alerts
104    pub fn flush(&self) -> Result<()> {
105        let flush_fn = match self.vtable.flush {
106            Some(f) => f,
107            None => return Ok(()), // No flush function, nothing to do
108        };
109
110        let result = unsafe { flush_fn(self.instance) };
111
112        if result != 0 {
113            return Err(ShapeError::RuntimeError {
114                message: format!("Plugin '{}' flush failed with code {}", self.name, result),
115                location: None,
116            });
117        }
118
119        Ok(())
120    }
121
122    // ========================================================================
123    // Private Helpers
124    // ========================================================================
125
126    fn get_handled_tags_from_vtable(
127        vtable: &OutputSinkVTable,
128        instance: *mut c_void,
129    ) -> Result<Vec<String>> {
130        let get_tags_fn = match vtable.get_handled_tags {
131            Some(f) => f,
132            None => return Ok(Vec::new()), // No tag filtering, handles all
133        };
134
135        let mut out_ptr: *mut u8 = ptr::null_mut();
136        let mut out_len: usize = 0;
137
138        unsafe { get_tags_fn(instance, &mut out_ptr, &mut out_len) };
139
140        if out_ptr.is_null() || out_len == 0 {
141            return Ok(Vec::new());
142        }
143
144        let data_slice = unsafe { std::slice::from_raw_parts(out_ptr, out_len) };
145        let tags: Vec<String> = rmp_serde::from_slice(data_slice).unwrap_or_else(|_| Vec::new());
146
147        // Free the buffer
148        if let Some(free_fn) = vtable.free_buffer {
149            unsafe { free_fn(out_ptr, out_len) };
150        }
151
152        Ok(tags)
153    }
154}
155
156impl Drop for PluginOutputSink {
157    fn drop(&mut self) {
158        // Flush before dropping
159        let _ = self.flush();
160
161        if let Some(drop_fn) = self.vtable.drop {
162            unsafe { drop_fn(self.instance) };
163        }
164    }
165}
166
167// SAFETY: The instance pointer is only accessed through the vtable functions
168// which are required to be thread-safe by the plugin contract.
169unsafe impl Send for PluginOutputSink {}
170unsafe impl Sync for PluginOutputSink {}
171
172#[cfg(test)]
173mod tests {
174    #[test]
175    fn test_handled_tags_default() {
176        // Just test that the struct can be created (integration test would use real plugin)
177        let tags: Vec<String> = Vec::new();
178        assert!(tags.is_empty());
179    }
180}