use mlua::{Lua, Table, Value};
use std::env;
use std::error::Error;
const KAFKA_BROKERS: &str = "KAFKA_BROKERS";
const KAFKA_TOPIC: &str = "KAFKA_TOPIC";
#[test]
fn produce() -> Result<(), Box<dyn Error>> {
let brokers = match env::var(KAFKA_BROKERS) {
Ok(ok) => ok,
Err(_e) => {
log::warn!("Skipping test because no {} is set", KAFKA_BROKERS);
return Ok(());
}
};
let topic = match env::var(KAFKA_TOPIC) {
Ok(ok) => ok,
Err(_e) => {
log::warn!("Skipping test because no {} is set", KAFKA_TOPIC);
return Ok(());
}
};
let lua = Lua::new();
super::preload(&lua)?;
let script = r#"
local settings = {
['bootstrap.servers'] = '_brokers_',
}
local producer = require('kafka').producer(settings)
producer:produce('_topic_', 'key', 'value')
producer:flush(100)
"#
.replace("_brokers_", &brokers)
.replace("_topic_", &topic);
lua.load(script).exec()?;
Ok(())
}
#[test]
fn consume() -> Result<(), Box<dyn Error>> {
let brokers = match env::var(KAFKA_BROKERS) {
Ok(ok) => ok,
Err(_e) => {
log::warn!("Skipping test because no {} is set", KAFKA_BROKERS);
return Ok(());
}
};
let topic = match env::var(KAFKA_TOPIC) {
Ok(ok) => ok,
Err(_e) => {
log::warn!("Skipping test because no {} is set", KAFKA_TOPIC);
return Ok(());
}
};
let lua = Lua::new();
super::preload(&lua)?;
let script = r#"
local settings = {
['bootstrap.servers'] = '_brokers_',
['auto.offset.reset'] = 'earliest',
['group.id'] = 'mlua-kafka/src/integration_tests.rs',
}
local consumer = require('kafka').consumer(settings)
consumer:subscribe('_topic_')
return consumer:poll(1000)
"#
.replace("_brokers_", &brokers)
.replace("_topic_", &topic);
let message: Table = lua.load(script).eval()?;
let timestamp: f64 = message.get("timestamp")?;
let topic: String = message.get("topic")?;
let partition: u32 = message.get("partition")?;
let offset: u64 = message.get("offset")?;
let key: Vec<u8> = message.get("key")?;
let payload: Vec<u8> = message.get("payload")?;
eprintln!(
"got: timestamp={} topic={} partition={} offset={} key={:?} payload={:?}",
timestamp, topic, partition, offset, key, payload
);
Ok(())
}
#[test]
fn consuming_empty_topic_returns_nil() -> Result<(), Box<dyn Error>> {
let brokers = match env::var(KAFKA_BROKERS) {
Ok(ok) => ok,
Err(_e) => {
log::warn!("Skipping test because no {} is set", KAFKA_BROKERS);
return Ok(());
}
};
let topic = match env::var(KAFKA_TOPIC) {
Ok(ok) => ok,
Err(_e) => {
log::warn!("Skipping test because no {} is set", KAFKA_TOPIC);
return Ok(());
}
};
let lua = Lua::new();
super::preload(&lua)?;
let script = r#"
local settings = {
['bootstrap.servers'] = '_brokers_',
['auto.offset.reset'] = 'latest',
['group.id'] = 'mlua-kafka/src/integration_tests.rs',
}
local consumer = require('kafka').consumer(settings)
consumer:subscribe('_topic_')
return consumer:poll(10)
"#
.replace("_brokers_", &brokers)
.replace("_topic_", &topic);
let message: Value = lua.load(script).eval()?;
assert_eq!(message, mlua::Nil);
Ok(())
}