collectd_plugin/
internal.rs1use crate::api::{
4 empty_to_none, get_default_interval, log_err, CdTime, ConfigItem, LogLevel, ValueList,
5};
6use crate::bindings::{
7 cdtime_t, data_set_t, oconfig_item_t, plugin_register_complex_read, plugin_register_flush,
8 plugin_register_log, plugin_register_write, user_data_t, value_list_t,
9};
10use crate::errors::FfiError;
11use crate::plugins::{Plugin, PluginManager, PluginManagerCapabilities, PluginRegistration};
12use std::ffi::{CStr, CString};
13use std::ops::Deref;
14use std::os::raw::{c_char, c_int, c_void};
15use std::panic::{self, catch_unwind};
16use std::ptr;
17use std::sync::atomic::{AtomicBool, Ordering};
18
19extern "C" fn plugin_read(dt: *mut user_data_t) -> c_int {
20 let plugin = unsafe { &mut *((*dt).data as *mut Box<dyn Plugin>) };
21 let res = catch_unwind(|| plugin.read_values())
22 .map_err(|_| FfiError::Panic)
23 .and_then(|x| x.map_err(FfiError::Plugin));
24
25 if let Err(ref e) = res {
26 log_err("read", e);
27 }
28
29 res.map(|_| 0).unwrap_or(-1)
30}
31
32extern "C" fn plugin_log(severity: c_int, message: *const c_char, dt: *mut user_data_t) {
33 let plugin = unsafe { &mut *((*dt).data as *mut Box<dyn Plugin>) };
34
35 if message.is_null() {
37 return;
38 }
39
40 let msg = unsafe { CStr::from_ptr(message).to_string_lossy() };
44 let res = LogLevel::try_from(severity as u32)
45 .ok_or(FfiError::UnknownSeverity(severity))
46 .and_then(|lvl| {
47 catch_unwind(|| plugin.log(lvl, Deref::deref(&msg)))
48 .map_err(|_| FfiError::Panic)
49 .and_then(|x| x.map_err(FfiError::Plugin))
50 });
51
52 if let Err(ref e) = res {
53 log_err("logging", e);
54 }
55}
56
57extern "C" fn plugin_write(
58 ds: *const data_set_t,
59 vl: *const value_list_t,
60 dt: *mut user_data_t,
61) -> c_int {
62 let plugin = unsafe { &mut *((*dt).data as *mut Box<dyn Plugin>) };
63 let res = unsafe { ValueList::from(&*ds, &*vl) }
64 .map_err(|e| FfiError::Collectd(Box::new(e)))
65 .and_then(|list| {
66 catch_unwind(|| plugin.write_values(list))
67 .map_err(|_| FfiError::Panic)
68 .and_then(|x| x.map_err(FfiError::Plugin))
69 });
70
71 if let Err(ref e) = res {
72 log_err("writing", e);
73 }
74
75 res.map(|_| 0).unwrap_or(-1)
76}
77
78extern "C" fn plugin_flush(
79 timeout: cdtime_t,
80 identifier: *const c_char,
81 dt: *mut user_data_t,
82) -> c_int {
83 let plugin = unsafe { &mut *((*dt).data as *mut Box<dyn crate::Plugin>) };
84
85 let dur = if timeout == 0 {
86 None
87 } else {
88 Some(CdTime::from(timeout).into())
89 };
90
91 let ident = if identifier.is_null() {
92 Ok(None)
93 } else {
94 unsafe { CStr::from_ptr(identifier) }
95 .to_str()
96 .map(empty_to_none)
97 .map_err(|e| FfiError::Utf8("flush identifier", e))
98 };
99
100 let res = ident.and_then(|id| {
101 catch_unwind(|| plugin.flush(dur, id))
102 .map_err(|_| FfiError::Panic)
103 .and_then(|x| x.map_err(FfiError::Plugin))
104 });
105
106 if let Err(ref e) = res {
107 log_err("flush", e);
108 }
109
110 res.map(|_| 0).unwrap_or(-1)
111}
112
113unsafe extern "C" fn plugin_free_user_data(raw: *mut c_void) {
114 let ptr = raw as *mut Box<dyn Plugin>;
115 drop(Box::from_raw(ptr));
116}
117
118fn plugin_registration(name: &str, plugin: Box<dyn Plugin>) {
119 let pl: Box<Box<dyn Plugin>> = Box::new(plugin);
120
121 let should_read = pl.capabilities().has_read();
123 let should_log = pl.capabilities().has_log();
124 let should_write = pl.capabilities().has_write();
125 let should_flush = pl.capabilities().has_flush();
126
127 let s = CString::new(name).expect("Plugin name to not contain nulls");
128
129 unsafe {
130 let plugin_ptr = Box::into_raw(pl) as *mut c_void;
131
132 let mut data = user_data_t {
136 data: plugin_ptr,
137 free_func: Some(plugin_free_user_data),
138 };
139
140 let mut no_free_data = user_data_t {
144 data: plugin_ptr,
145 free_func: None,
146 };
147
148 if should_read {
149 plugin_register_complex_read(
150 ptr::null(),
151 s.as_ptr(),
152 Some(plugin_read),
153 get_default_interval(),
154 &data,
155 );
156 }
157
158 if should_write {
159 let d = if !should_read {
160 &mut data
161 } else {
162 &mut no_free_data
163 };
164
165 plugin_register_write(s.as_ptr(), Some(plugin_write), d);
166 }
167
168 if should_log {
169 let d = if !should_read && !should_write {
170 &mut data
171 } else {
172 &mut no_free_data
173 };
174
175 plugin_register_log(s.as_ptr(), Some(plugin_log), d);
176 }
177
178 if should_flush {
179 let d = if !should_read && !should_write && !should_log {
180 &mut data
181 } else {
182 &mut no_free_data
183 };
184
185 plugin_register_flush(s.as_ptr(), Some(plugin_flush), d);
186 }
187 }
188}
189
190fn register_all_plugins<T: PluginManager>(config: Option<&[ConfigItem<'_>]>) -> c_int {
191 let res = catch_unwind(|| T::plugins(config))
192 .map_err(|_| FfiError::Panic)
193 .and_then(|reged| reged.map_err(FfiError::Plugin))
194 .map(|registration| match registration {
195 PluginRegistration::Single(pl) => plugin_registration(T::name(), pl),
196 PluginRegistration::Multiple(v) => {
197 for (id, pl) in v {
198 let name = format!("{}/{}", T::name(), id);
199
200 plugin_registration(name.as_str(), pl)
201 }
202 }
203 });
204
205 if let Err(ref e) = res {
206 log_err("collectd config", e);
207 }
208 res.map(|_| 0).unwrap_or(-1)
209}
210
211pub fn plugin_init<T: PluginManager>(config_seen: &AtomicBool) -> c_int {
212 let mut result = if !config_seen.swap(true, Ordering::SeqCst) {
213 register_all_plugins::<T>(None)
214 } else {
215 0
216 };
217
218 let capabilities = T::capabilities();
219 if capabilities.intersects(PluginManagerCapabilities::INIT) {
220 let res = catch_unwind(T::initialize)
221 .map_err(|_e| FfiError::Panic)
222 .and_then(|init| init.map_err(FfiError::Plugin));
223
224 if let Err(ref e) = res {
225 result = -1;
226 log_err("init", e);
227 }
228 }
229
230 result
231}
232
233pub fn plugin_shutdown<T: PluginManager>() -> c_int {
234 let mut result = 0;
235
236 let capabilities = T::capabilities();
237 if capabilities.intersects(PluginManagerCapabilities::INIT) {
238 let res = catch_unwind(T::shutdown)
239 .map_err(|_e| FfiError::Panic)
240 .and_then(|r| r.map_err(FfiError::Plugin));
241
242 if let Err(ref e) = res {
243 result = -1;
244 log_err("shutdown", e);
245 }
246 }
247
248 result
249}
250
251pub unsafe fn plugin_complex_config<T: PluginManager>(
255 config_seen: &AtomicBool,
256 config: *mut oconfig_item_t,
257) -> c_int {
258 if config_seen.swap(true, Ordering::SeqCst) {
261 log_err("config", &FfiError::MultipleConfig);
262 return -1;
263 }
264
265 match ConfigItem::from(&*config) {
266 Ok(config) => register_all_plugins::<T>(Some(&config.children)),
267 Err(e) => {
268 log_err(
269 "collectd config conversion",
270 &FfiError::Collectd(Box::new(e)),
271 );
272 -1
273 }
274 }
275}
276
277pub fn register_panic_handler() {
278 panic::set_hook(Box::new(|info| {
279 log_err("panic hook", &FfiError::PanicHook(info));
280 }));
281}