1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
//! Module used exclusively to setup the `collectd_plugin!` macro. No public functions from here
//! should be used.
use api::{empty_to_none, get_default_interval, log_err, CdTime, ConfigItem, LogLevel, ValueList};
use bindings::{
    cdtime_t, data_set_t, oconfig_item_t, plugin_register_complex_read, plugin_register_flush,
    plugin_register_log, plugin_register_write, user_data_t, value_list_t,
};
use errors::FfiError;
use plugins::{Plugin, PluginManager, PluginManagerCapabilities, PluginRegistration};
use std::ffi::{CStr, CString};
use std::ops::Deref;
use std::os::raw::{c_char, c_int, c_void};
use std::panic::{self, catch_unwind};
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};

extern "C" fn plugin_read(dt: *mut user_data_t) -> c_int {
    let plugin = unsafe { &mut *((*dt).data as *mut Box<Plugin>) };
    let res = catch_unwind(|| plugin.read_values())
        .map_err(|_| FfiError::Panic)
        .and_then(|x| x.map_err(FfiError::Plugin));

    if let Err(ref e) = res {
        log_err("read", e);
    }

    res.map(|_| 0).unwrap_or(-1)
}

extern "C" fn plugin_log(severity: c_int, message: *const c_char, dt: *mut user_data_t) {
    let plugin = unsafe { &mut *((*dt).data as *mut Box<Plugin>) };

    // Guard against potential null messages even if they are not supposed to happen.
    if message.is_null() {
        return;
    }

    // Here we allow the potential allocation of a string that contains replacement
    // characters as it wouldn't be right if collectd-plugin stopped the logging of an
    // important message when a small portion of the message may be illegible.
    let msg = unsafe { CStr::from_ptr(message).to_string_lossy() };
    let res = LogLevel::try_from(severity as u32)
        .ok_or_else(|| FfiError::UnknownSeverity(severity))
        .and_then(|lvl| {
            catch_unwind(|| plugin.log(lvl, Deref::deref(&msg)))
                .map_err(|_| FfiError::Panic)
                .and_then(|x| x.map_err(FfiError::Plugin))
        });

    if let Err(ref e) = res {
        log_err("logging", e);
    }
}

extern "C" fn plugin_write(
    ds: *const data_set_t,
    vl: *const value_list_t,
    dt: *mut user_data_t,
) -> c_int {
    let plugin = unsafe { &mut *((*dt).data as *mut Box<Plugin>) };
    let res = unsafe { ValueList::from(&*ds, &*vl) }
        .map_err(|e| FfiError::Collectd(Box::new(e)))
        .and_then(|list| {
            catch_unwind(|| plugin.write_values(list))
                .map_err(|_| FfiError::Panic)
                .and_then(|x| x.map_err(FfiError::Plugin))
        });

    if let Err(ref e) = res {
        log_err("writing", e);
    }

    res.map(|_| 0).unwrap_or(-1)
}

extern "C" fn plugin_flush(
    timeout: cdtime_t,
    identifier: *const c_char,
    dt: *mut user_data_t,
) -> c_int {
    let plugin = unsafe { &mut *((*dt).data as *mut Box<::Plugin>) };

    let dur = if timeout == 0 {
        None
    } else {
        Some(CdTime::from(timeout).into())
    };

    let ident = if identifier.is_null() {
        Ok(None)
    } else {
        unsafe { CStr::from_ptr(identifier) }
            .to_str()
            .map(empty_to_none)
            .map_err(|e| FfiError::Utf8("flush identifier", e))
    };

    let res = ident.and_then(|id| {
        catch_unwind(|| plugin.flush(dur, id))
            .map_err(|_| FfiError::Panic)
            .and_then(|x| x.map_err(FfiError::Plugin))
    });

    if let Err(ref e) = res {
        log_err("flush", e);
    }

    res.map(|_| 0).unwrap_or(-1)
}

unsafe extern "C" fn plugin_free_user_data(raw: *mut c_void) {
    let ptr = raw as *mut Box<Plugin>;
    drop(Box::from_raw(ptr));
}

fn plugin_registration(name: &str, plugin: Box<Plugin>) {
    let pl: Box<Box<Plugin>> = Box::new(plugin);

    // Grab all the properties we need until `into_raw` away
    let should_read = pl.capabilities().has_read();
    let should_log = pl.capabilities().has_log();
    let should_write = pl.capabilities().has_write();
    let should_flush = pl.capabilities().has_flush();

    let s = CString::new(name).expect("Plugin name to not contain nulls");

    // Plugin registration differs only a tiny bit between collectd-57 and older
    // versions. The one difference is that user_data_t went from mutable to not
    // mutable. The code duplication is annoying, but it's better to have it
    // encapsulated in a single crate instead of many others.
    #[cfg_attr(feature = "cargo-clippy", allow(clippy::unnecessary_mut_passed))]
    unsafe {
        let plugin_ptr = Box::into_raw(pl) as *mut c_void;

        // The user data that is passed to read, writes, logs, etc. It is not passed to
        // config or init. Since user_data_t implements copy, we don't need to forget about
        // it. See clippy suggestion (forget_copy)
        let mut data = user_data_t {
            data: plugin_ptr,
            free_func: Some(plugin_free_user_data),
        };

        // If a plugin registers more than one callback, we make sure to deregister the
        // free function to avoid data being freed twice:
        // https://collectd.org/wiki/index.php/User_data_t
        let mut no_free_data = user_data_t {
            data: plugin_ptr,
            free_func: None,
        };

        if should_read {
            plugin_register_complex_read(
                ptr::null(),
                s.as_ptr(),
                Some(plugin_read),
                get_default_interval(),
                &mut data,
            );
        }

        if should_write {
            let d = if !should_read {
                &mut data
            } else {
                &mut no_free_data
            };

            plugin_register_write(s.as_ptr(), Some(plugin_write), d);
        }

        if should_log {
            let d = if !should_read && !should_write {
                &mut data
            } else {
                &mut no_free_data
            };

            plugin_register_log(s.as_ptr(), Some(plugin_log), d);
        }

        if should_flush {
            let d = if !should_read && !should_write && !should_log {
                &mut data
            } else {
                &mut no_free_data
            };

            plugin_register_flush(s.as_ptr(), Some(plugin_flush), d);
        }
    }
}

fn register_all_plugins<T: PluginManager>(config: Option<&[ConfigItem]>) -> c_int {
    let res = catch_unwind(|| T::plugins(config))
        .map_err(|_| FfiError::Panic)
        .and_then(|reged| reged.map_err(FfiError::Plugin))
        .and_then(|registration| {
            match registration {
                PluginRegistration::Single(pl) => {
                    plugin_registration(T::name(), pl);
                }
                PluginRegistration::Multiple(v) => {
                    for (id, pl) in v {
                        let name = format!("{}/{}", T::name(), id);

                        plugin_registration(name.as_str(), pl);
                    }
                }
            }

            Ok(())
        });

    if let Err(ref e) = res {
        log_err("collectd config", e);
    }
    res.map(|_| 0).unwrap_or(-1)
}

pub fn plugin_init<T: PluginManager>(config_seen: &AtomicBool) -> c_int {
    let mut result = if !config_seen.swap(true, Ordering::Relaxed) {
        register_all_plugins::<T>(None)
    } else {
        0
    };

    let capabilities = T::capabilities();
    if capabilities.intersects(PluginManagerCapabilities::INIT) {
        let res = catch_unwind(T::initialize)
            .map_err(|_e| FfiError::Panic)
            .and_then(|init| init.map_err(FfiError::Plugin));

        if let Err(ref e) = res {
            result = -1;
            log_err("init", e);
        }
    }

    result
}

pub unsafe fn plugin_complex_config<T: PluginManager>(
    config_seen: &AtomicBool,
    config: *mut oconfig_item_t,
) -> c_int {
    // If we've already seen the config, let's error out as one shouldn't use multiple
    // sections of configuration (group them under nodes like write_graphite)
    if config_seen.swap(true, Ordering::Relaxed) {
        log_err("config", &FfiError::MultipleConfig);
        return -1;
    }

    match ConfigItem::from(&*config) {
        Ok(config) => register_all_plugins::<T>(Some(&config.children)),
        Err(e) => {
            log_err(
                "collectd config conversion",
                &FfiError::Collectd(Box::new(e)),
            );
            -1
        }
    }
}

pub fn register_panic_handler() {
    panic::set_hook(Box::new(|info| {
        log_err("panic hook", &FfiError::PanicHook(info));
    }));
}