mod subscribe;
use mlua::{Error, IntoLua, UserData, UserDataMethods, Value};
use rdkafka::consumer::BaseConsumer;
use rdkafka::Message;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone)]
pub(super) struct Consumer {
pub consumer: Arc<BaseConsumer>,
}
impl UserData for Consumer {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_method("close", |_lua, _consumer, _args: Value| Ok(()));
methods.add_method("poll", |lua, consumer, timeout_ms: u64| -> Result<Value, Error> {
let dur = Duration::from_millis(timeout_ms);
match consumer.consumer.poll(dur) {
Some(Err(err)) => Err(Error::RuntimeError(err.to_string())),
Some(Ok(message)) => {
let table = lua.create_table()?;
table.set("topic", message.topic())?;
table.set("partition", message.partition())?;
table.set("offset", message.offset())?;
if let Some(timestamp_ms) = message.timestamp().to_millis() {
table.set("timestamp", timestamp_ms as f64 / 1e3_f64)?;
}
if let Some(key) = message.key() {
table.set("key", key)?;
}
if let Some(payload) = message.payload() {
table.set("payload", payload)?;
}
Ok(table.into_lua(lua)?)
}
None => Ok(mlua::Nil),
}
});
methods.add_method("subscribe", subscribe::handle);
}
}