#[cfg(test)]
pub mod integration_tests;
mod consumer;
mod producer;
use consumer::Consumer;
use mlua::{Error, Lua, Table};
use producer::Producer;
use rdkafka::consumer::BaseConsumer;
use rdkafka::producer::BaseProducer;
use rdkafka::ClientConfig;
use std::sync::Arc;
fn new_config(_lua: &Lua, settings: Table) -> Result<ClientConfig, Error> {
let mut config = ClientConfig::new();
for pair in settings.pairs::<String, String>() {
let (k, v) = pair?;
config.set(k, v);
}
Ok(config)
}
fn new_consumer(lua: &Lua, settings: Table) -> Result<Consumer, Error> {
let config = new_config(lua, settings)?;
let consumer: Arc<BaseConsumer> = Arc::new(config.create().map_err(|err| Error::RuntimeError(err.to_string()))?);
Ok(Consumer { consumer })
}
fn new_producer(lua: &Lua, settings: Table) -> Result<Producer, Error> {
let config = new_config(lua, settings)?;
let producer: Arc<BaseProducer> = Arc::new(config.create().map_err(|err| Error::RuntimeError(err.to_string()))?);
Ok(Producer { producer })
}
pub fn preload(lua: &Lua) -> Result<(), Box<dyn std::error::Error>> {
let table = lua.create_table()?;
table.set("consumer", lua.create_function(new_consumer)?)?;
table.set("producer", lua.create_function(new_producer)?)?;
let globals = lua.globals();
let package = globals.get::<_, Table>("package")?;
let loaded = package.get::<_, Table>("loaded")?;
loaded.set("kafka", table)?;
Ok(())
}
#[cfg(test)]
mod tests {
use mlua::{Lua, Table};
use std::error::Error;
#[test]
fn load() -> Result<(), Box<dyn Error>> {
let lua = Lua::new();
super::preload(&lua)?;
let module: Table = lua.load("return require('kafka')").eval()?;
assert!(module.contains_key("producer")?);
Ok(())
}
}