rdkafka 0.29.0

Rust wrapper for librdkafka
Documentation
//! Producer and consumer configuration.
//!
//! ## C library configuration
//!
//! The Rust library will forward all the configuration to the C library. The
//! most frequently used parameters are listed here.
//!
//! ### Frequently used parameters
//!
//! For producer-specific and consumer-specific parameters check the producer
//! and consumer modules documentation. The full list of available parameters is
//! available in the [librdkafka documentation][librdkafka-config].
//!
//! - `client.id`: Client identifier. Default: `rdkafka`.
//! - `bootstrap.servers`: Initial list of brokers as a CSV list of broker host
//!    or host:port. Default: empty.
//! - `message.max.bytes`: Maximum message size. Default: 1000000.
//! - `debug`: A comma-separated list of debug contexts to enable. Use 'all' to
//!    print all the debugging information. Default: empty (off).
//! - `statistics.interval.ms`: how often the statistic callback
//!    specified in the [`ClientContext`] will be called. Default: 0 (disabled).
//!
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::iter::FromIterator;
use std::os::raw::c_char;
use std::ptr;

use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::client::ClientContext;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{log_enabled, DEBUG, INFO, WARN};
use crate::util::{ErrBuf, KafkaDrop, NativePtr};

/// The log levels supported by librdkafka.
#[derive(Copy, Clone, Debug)]
pub enum RDKafkaLogLevel {
    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
    /// crate.
    Emerg = 0,
    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
    /// crate.
    Alert = 1,
    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
    /// crate.
    Critical = 2,
    /// Equivalent to [`Level::Error`](log::Level::Error) from the log crate.
    Error = 3,
    /// Equivalent to [`Level::Warn`](log::Level::Warn) from the log crate.
    Warning = 4,
    /// Higher priority then [`Level::Info`](log::Level::Info) from the log
    /// crate.
    Notice = 5,
    /// Equivalent to [`Level::Info`](log::Level::Info) from the log crate.
    Info = 6,
    /// Equivalent to [`Level::Debug`](log::Level::Debug) from the log crate.
    Debug = 7,
}

impl RDKafkaLogLevel {
    pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
        match level {
            0 => RDKafkaLogLevel::Emerg,
            1 => RDKafkaLogLevel::Alert,
            2 => RDKafkaLogLevel::Critical,
            3 => RDKafkaLogLevel::Error,
            4 => RDKafkaLogLevel::Warning,
            5 => RDKafkaLogLevel::Notice,
            6 => RDKafkaLogLevel::Info,
            _ => RDKafkaLogLevel::Debug,
        }
    }
}

//
// ********** CLIENT CONFIG **********
//

/// A native rdkafka-sys client config.
pub struct NativeClientConfig {
    ptr: NativePtr<RDKafkaConf>,
}

unsafe impl KafkaDrop for RDKafkaConf {
    const TYPE: &'static str = "client config";
    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
}

impl NativeClientConfig {
    /// Wraps a pointer to an `RDKafkaConfig` object and returns a new `NativeClientConfig`.
    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
        NativeClientConfig {
            ptr: NativePtr::from_ptr(ptr).unwrap(),
        }
    }

    /// Returns the pointer to the librdkafka RDKafkaConf structure.
    pub fn ptr(&self) -> *mut RDKafkaConf {
        self.ptr.ptr()
    }

    /// Gets the value of a parameter in the configuration.
    ///
    /// This method reflects librdkafka's view of the current value of the
    /// parameter. If the parameter was overridden by the user, it returns the
    /// user-specified value. Otherwise, it returns librdkafka's default value
    /// for the parameter.
    pub fn get(&self, key: &str) -> KafkaResult<String> {
        let make_err = |res| {
            KafkaError::ClientConfig(
                res,
                match res {
                    RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
                    RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
                    RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
                }
                .into(),
                key.into(),
                "".into(),
            )
        };
        let key_c = CString::new(key.to_string())?;

        // Call with a `NULL` buffer to determine the size of the string.
        let mut size = 0_usize;
        let res = unsafe {
            rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
        };
        if res.is_error() {
            return Err(make_err(res));
        }

        // Allocate a buffer of that size and call again to get the actual
        // string.
        let mut buf = vec![0_u8; size];
        let res = unsafe {
            rdsys::rd_kafka_conf_get(
                self.ptr(),
                key_c.as_ptr(),
                buf.as_mut_ptr() as *mut c_char,
                &mut size,
            )
        };
        if res.is_error() {
            return Err(make_err(res));
        }

        // Convert the C string to a Rust string.
        Ok(CStr::from_bytes_with_nul(&buf)
            .unwrap()
            .to_string_lossy()
            .into())
    }
}

/// Client configuration.
#[derive(Clone, Debug)]
pub struct ClientConfig {
    conf_map: HashMap<String, String>,
    /// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list
    /// of available levels.
    pub log_level: RDKafkaLogLevel,
}

impl Default for ClientConfig {
    fn default() -> Self {
        Self::new()
    }
}

impl ClientConfig {
    /// Creates a new empty configuration.
    pub fn new() -> ClientConfig {
        ClientConfig {
            conf_map: HashMap::new(),
            log_level: log_level_from_global_config(),
        }
    }

    /// Gets the value of a parameter in the configuration.
    ///
    /// Returns the current value set for `key`, or `None` if no value for `key`
    /// exists.
    ///
    /// Note that this method will only ever return values that were installed
    /// by a call to [`ClientConfig::set`]. To retrieve librdkafka's default
    /// value for a parameter, build a [`NativeClientConfig`] and then call
    /// [`NativeClientConfig::get`] on the resulting object.
    pub fn get(&self, key: &str) -> Option<&str> {
        self.conf_map.get(key).map(|val| val.as_str())
    }

    /// Sets a parameter in the configuration.
    ///
    /// If there is an existing value for `key` in the configuration, it is
    /// overridden with the new `value`.
    pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
    where
        K: Into<String>,
        V: Into<String>,
    {
        self.conf_map.insert(key.into(), value.into());
        self
    }

    /// Removes a parameter from the configuration.
    pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
        self.conf_map.remove(key);
        self
    }

    /// Sets the log level of the client. If not specified, the log level will be calculated based
    /// on the global log level of the log crate.
    pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
        self.log_level = log_level;
        self
    }

    /// Builds a native librdkafka configuration.
    pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
        let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
        let mut err_buf = ErrBuf::new();
        for (key, value) in &self.conf_map {
            let key_c = CString::new(key.to_string())?;
            let value_c = CString::new(value.to_string())?;
            let ret = unsafe {
                rdsys::rd_kafka_conf_set(
                    conf.ptr(),
                    key_c.as_ptr(),
                    value_c.as_ptr(),
                    err_buf.as_mut_ptr(),
                    err_buf.capacity(),
                )
            };
            if ret.is_error() {
                return Err(KafkaError::ClientConfig(
                    ret,
                    err_buf.to_string(),
                    key.to_string(),
                    value.to_string(),
                ));
            }
        }
        Ok(conf)
    }

    /// Uses the current configuration to create a new Consumer or Producer.
    pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
        T::from_config(self)
    }

    /// Uses the current configuration and the provided context to create a new Consumer or Producer.
    pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
    where
        C: ClientContext,
        T: FromClientConfigAndContext<C>,
    {
        T::from_config_and_context(self, context)
    }
}

impl FromIterator<(String, String)> for ClientConfig {
    fn from_iter<I>(iter: I) -> ClientConfig
    where
        I: IntoIterator<Item = (String, String)>,
    {
        let mut config = ClientConfig::new();
        config.extend(iter);
        config
    }
}

impl Extend<(String, String)> for ClientConfig {
    fn extend<I>(&mut self, iter: I)
    where
        I: IntoIterator<Item = (String, String)>,
    {
        self.conf_map.extend(iter)
    }
}

/// Return the log level
fn log_level_from_global_config() -> RDKafkaLogLevel {
    if log_enabled!(target: "librdkafka", DEBUG) {
        RDKafkaLogLevel::Debug
    } else if log_enabled!(target: "librdkafka", INFO) {
        RDKafkaLogLevel::Info
    } else if log_enabled!(target: "librdkafka", WARN) {
        RDKafkaLogLevel::Warning
    } else {
        RDKafkaLogLevel::Error
    }
}

/// Create a new client based on the provided configuration.
pub trait FromClientConfig: Sized {
    /// Creates a client from a client configuration. The default client context
    /// will be used.
    fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
}

/// Create a new client based on the provided configuration and context.
pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
    /// Creates a client from a client configuration and a client context.
    fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
}

#[cfg(test)]
mod tests {
    use super::ClientConfig;

    #[test]
    fn test_client_config_set_map() {
        let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
            .into_iter()
            .collect();
        config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);

        assert_eq!(config.get("a").unwrap(), "1");
        assert_eq!(config.get("b").unwrap(), "2");
        assert_eq!(config.get("c").unwrap(), "3");
    }
}