mlua-kafka 0.1.4

A Rust-native implementation of lua-kafka for mlua.
#[cfg(test)]
pub mod integration_tests;

mod consumer;
mod producer;

use consumer::Consumer;
use mlua::{Error, Lua, Table};
use producer::Producer;
use rdkafka::consumer::BaseConsumer;
use rdkafka::producer::BaseProducer;
use rdkafka::ClientConfig;
use std::sync::Arc;

fn new_config(_lua: &Lua, settings: Table) -> Result<ClientConfig, Error> {
    let mut config = ClientConfig::new();
    for pair in settings.pairs::<String, String>() {
        let (k, v) = pair?;
        config.set(k, v);
    }
    Ok(config)
}

fn new_consumer(lua: &Lua, settings: Table) -> Result<Consumer, Error> {
    let config = new_config(lua, settings)?;
    let consumer: Arc<BaseConsumer> = Arc::new(config.create().map_err(|err| Error::RuntimeError(err.to_string()))?);
    Ok(Consumer { consumer })
}

fn new_producer(lua: &Lua, settings: Table) -> Result<Producer, Error> {
    let config = new_config(lua, settings)?;
    let producer: Arc<BaseProducer> = Arc::new(config.create().map_err(|err| Error::RuntimeError(err.to_string()))?);
    Ok(Producer { producer })
}

pub fn preload(lua: &Lua) -> Result<(), Box<dyn std::error::Error>> {
    // Configure module table
    let table = lua.create_table()?;
    table.set("consumer", lua.create_function(new_consumer)?)?;
    table.set("producer", lua.create_function(new_producer)?)?;

    // Preload module
    let globals = lua.globals();
    let package = globals.get::<_, Table>("package")?;
    let loaded = package.get::<_, Table>("loaded")?;
    loaded.set("kafka", table)?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use mlua::{Lua, Table};
    use std::error::Error;

    #[test]
    fn load() -> Result<(), Box<dyn Error>> {
        let lua = Lua::new();
        super::preload(&lua)?;
        let module: Table = lua.load("return require('kafka')").eval()?;
        assert!(module.contains_key("producer")?);
        Ok(())
    }
}