flb_plugin/
output.rs

1//! Output Plugin bindings
2//!
3//! ```no_run
4//! struct Hello;
5//! impl flb_plugin::output::Plugin for Hello {
6//!     const NAME: &'static CStr = const_cstr!("hello");
7//!     const DESCRIPTION: &'static CStr = const_cstr!("hello plugin");
8//!
9//!     fn new(config: &output::Config) -> Self {
10//!         let param = config.get_property(const_cstr!("param"));
11//!         println!("[new] param: {:?}", param);
12//!         Hello
13//!     }
14//!
15//!     fn flush(&mut self, tag: &str, mut data: &[u8]) -> Result<(), flb_plugin::Error> {
16//!         let value = rmpv::decode::read_value_ref(&mut data).unwrap();
17//!         println!("[flush] tag: {tag}, data: {:?}", value);
18//!         Ok(())
19//!     }
20//!
21//!     fn exit(self) -> Result<(), flb_plugin::Error> {
22//!         println!("[exit]");
23//!         Ok(())
24//!     }
25//! }
26//! flb_plugin::output_plugin_proxy!(Hello);
27//! ```
28
29use std::{
30    ffi::{c_void, CStr},
31    marker::PhantomData,
32    os::raw::c_int,
33    slice,
34};
35
36use flb_plugin_sys::{
37    flb_plugin_proxy_def, flbgo_output_plugin, FLB_ERROR, FLB_OK, FLB_PROXY_GOLANG,
38    FLB_PROXY_OUTPUT_PLUGIN, FLB_RETRY,
39};
40
41use crate::{instance_from_ctx, Error};
42
43/// An accessor for plugin configurations
44pub struct Config {
45    plugin: *const flbgo_output_plugin,
46}
47
48impl Config {
49    /// Returns the configuration property value.
50    ///
51    /// If the key is absent, this returns `None`.
52    pub fn get_property(&self, key: &CStr) -> Option<&str> {
53        unsafe {
54            let plugin = self.plugin.as_ref()?;
55            let output_get_property = (*(*plugin).api).output_get_property?;
56            let value = output_get_property(key.as_ptr() as *mut i8, plugin.o_ins as *mut _);
57            if value.is_null() {
58                None
59            } else {
60                let cstr = CStr::from_ptr(value);
61                cstr.to_str().ok()
62            }
63        }
64    }
65}
66
67/// A trait for Fluent Bit output plugin
68pub trait Plugin {
69    /// The plugin name.
70    const NAME: &'static CStr;
71
72    /// The plugin description.
73    const DESCRIPTION: &'static CStr;
74
75    /// Creates a new plugin instance.
76    fn new(config: &Config) -> Self;
77
78    /// Handles data passed from Fluent Bit.
79    ///
80    /// `data` is a MessagePack byte buffer.
81    fn flush(&mut self, tag: &str, data: &[u8]) -> Result<(), Error>;
82
83    /// Cleans up the plugin instance.
84    fn exit(self) -> Result<(), Error>;
85}
86
87/// A proxy object for [Plugin]
88///
89/// Use [crate::output_plugin_proxy] instead of using this directly.
90pub struct Proxy<P> {
91    plugin: PhantomData<P>,
92}
93
94impl<P> Proxy<P>
95where
96    P: Plugin,
97{
98    #[allow(clippy::new_without_default)]
99    pub const fn new() -> Self {
100        Self {
101            plugin: PhantomData,
102        }
103    }
104
105    #[allow(clippy::missing_safety_doc)]
106    pub unsafe fn register(&self, def: *mut flb_plugin_proxy_def) -> c_int {
107        let def = def.as_mut().unwrap();
108        def.type_ = FLB_PROXY_OUTPUT_PLUGIN;
109        def.proxy = FLB_PROXY_GOLANG;
110        def.flags = 0;
111        def.name = P::NAME.as_ptr() as *mut _;
112        def.description = P::NAME.as_ptr() as *mut _;
113        0
114    }
115
116    #[allow(clippy::missing_safety_doc)]
117    pub unsafe fn unregister(&self, _def: *mut flb_plugin_proxy_def) -> c_int {
118        0
119    }
120
121    #[allow(clippy::missing_safety_doc)]
122    pub unsafe fn init(&self, plugin: *mut flbgo_output_plugin) -> c_int {
123        let config = Config { plugin };
124        let instance = P::new(&config);
125        let ctx = Box::new(instance);
126        let ctx = Box::into_raw(ctx);
127        (*(*plugin).context).remote_context = ctx as *mut _;
128        FLB_OK
129    }
130
131    #[allow(clippy::missing_safety_doc)]
132    pub unsafe fn flush(
133        &self,
134        ctx: *mut c_void,
135        data: *const u8,
136        len: c_int,
137        tag: *const i8,
138    ) -> c_int {
139        let mut instance = match instance_from_ctx::<P>(ctx) {
140            Some(instance) => instance,
141            None => return FLB_ERROR,
142        };
143        let tag = CStr::from_ptr(tag);
144        let tag = match tag.to_str() {
145            Ok(s) => s,
146            Err(_) => return FLB_ERROR,
147        };
148        let data = slice::from_raw_parts(data, len as usize);
149        let ret = instance.flush(tag, data);
150        Box::leak(instance);
151        match ret {
152            Ok(_) => FLB_OK,
153            Err(Error::Error) => FLB_ERROR,
154            Err(Error::Retry) => FLB_RETRY,
155        }
156    }
157
158    #[allow(clippy::missing_safety_doc)]
159    pub unsafe fn exit(&self, ctx: *mut c_void) -> c_int {
160        let instance = match instance_from_ctx::<P>(ctx) {
161            Some(instance) => instance,
162            None => return FLB_ERROR,
163        };
164        let ret = instance.exit();
165        match ret {
166            Ok(_) => FLB_OK,
167            Err(Error::Error) => FLB_ERROR,
168            Err(Error::Retry) => FLB_RETRY,
169        }
170    }
171}
172
173/// Defines proxy functions (`FLBPluginRegister`, `FLBPluginInit`, ...)
174#[macro_export]
175macro_rules! output_plugin_proxy {
176    ($proxy:path) => {
177        const PROXY: $crate::output::Proxy<$proxy> = $crate::output::Proxy::new();
178
179        #[allow(clippy::missing_safety_doc)]
180        #[no_mangle]
181        pub unsafe extern "C" fn FLBPluginRegister(
182            def: *mut $crate::sys::flb_plugin_proxy_def,
183        ) -> c_int {
184            PROXY.register(def)
185        }
186
187        #[allow(clippy::missing_safety_doc)]
188        #[no_mangle]
189        pub unsafe extern "C" fn FLBPluginUnregister(
190            def: *mut $crate::sys::flb_plugin_proxy_def,
191        ) -> c_int {
192            PROXY.unregister(def)
193        }
194
195        #[allow(clippy::missing_safety_doc)]
196        #[no_mangle]
197        pub unsafe extern "C" fn FLBPluginInit(
198            plugin: *mut $crate::sys::flbgo_output_plugin,
199        ) -> c_int {
200            PROXY.init(plugin)
201        }
202
203        #[allow(clippy::missing_safety_doc)]
204        #[no_mangle]
205        pub unsafe extern "C" fn FLBPluginFlushCtx(
206            ctx: *mut c_void,
207            data: *const u8,
208            len: c_int,
209            tag: *const i8,
210        ) -> c_int {
211            PROXY.flush(ctx, data, len, tag)
212        }
213
214        #[allow(clippy::missing_safety_doc)]
215        #[no_mangle]
216        pub unsafe extern "C" fn FLBPluginExitCtx(ctx: *mut c_void) -> c_int {
217            PROXY.exit(ctx)
218        }
219    };
220}