mlua-kafka 0.1.7

A Rust-native implementation of lua-kafka for mlua.
mod subscribe;

use mlua::{Error, IntoLua, UserData, UserDataMethods, Value};
use rdkafka::consumer::BaseConsumer;
use rdkafka::Message;
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone)]
pub(super) struct Consumer {
    pub consumer: Arc<BaseConsumer>,
}

impl UserData for Consumer {
    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
        methods.add_method("close", |_lua, _consumer, _args: Value| Ok(()));
        methods.add_method("poll", |lua, consumer, timeout_ms: u64| -> Result<Value, Error> {
            let dur = Duration::from_millis(timeout_ms);
            match consumer.consumer.poll(dur) {
                Some(Err(err)) => Err(Error::RuntimeError(err.to_string())),
                Some(Ok(message)) => {
                    let table = lua.create_table()?;
                    table.set("topic", message.topic())?;
                    table.set("partition", message.partition())?;
                    table.set("offset", message.offset())?;
                    if let Some(timestamp_ms) = message.timestamp().to_millis() {
                        table.set("timestamp", timestamp_ms as f64 / 1e3_f64)?;
                    }
                    if let Some(key) = message.key() {
                        table.set("key", key)?;
                    }
                    if let Some(payload) = message.payload() {
                        table.set("payload", payload)?;
                    }
                    Ok(table.into_lua(lua)?)
                }
                None => Ok(mlua::Nil),
            }
        });
        methods.add_method("subscribe", subscribe::handle);
    }
}