mlua-kafka 0.1.6

A Rust-native implementation of lua-kafka for mlua.
use mlua::{Lua, Table, Value};
use std::env;
use std::error::Error;

const KAFKA_BROKERS: &str = "KAFKA_BROKERS";
const KAFKA_TOPIC: &str = "KAFKA_TOPIC";

#[test]
fn produce() -> Result<(), Box<dyn Error>> {
    let brokers = match env::var(KAFKA_BROKERS) {
        Ok(ok) => ok,
        Err(_e) => {
            log::warn!("Skipping test because no {} is set", KAFKA_BROKERS);
            return Ok(());
        }
    };
    let topic = match env::var(KAFKA_TOPIC) {
        Ok(ok) => ok,
        Err(_e) => {
            log::warn!("Skipping test because no {} is set", KAFKA_TOPIC);
            return Ok(());
        }
    };
    let lua = Lua::new();
    super::preload(&lua)?;
    let script = r#"
        local settings = {
            ['bootstrap.servers'] = '_brokers_',
        }
        local producer = require('kafka').producer(settings)
        producer:produce('_topic_', 'key', 'value')
        producer:flush(100)
    "#
    .replace("_brokers_", &brokers)
    .replace("_topic_", &topic);
    lua.load(script).exec()?;
    Ok(())
}

#[test]
fn consume() -> Result<(), Box<dyn Error>> {
    let brokers = match env::var(KAFKA_BROKERS) {
        Ok(ok) => ok,
        Err(_e) => {
            log::warn!("Skipping test because no {} is set", KAFKA_BROKERS);
            return Ok(());
        }
    };
    let topic = match env::var(KAFKA_TOPIC) {
        Ok(ok) => ok,
        Err(_e) => {
            log::warn!("Skipping test because no {} is set", KAFKA_TOPIC);
            return Ok(());
        }
    };
    let lua = Lua::new();
    super::preload(&lua)?;
    let script = r#"
        local settings = {
            ['bootstrap.servers'] = '_brokers_',
            ['auto.offset.reset'] = 'earliest',
            ['group.id'] = 'mlua-kafka/src/integration_tests.rs',
        }
        local consumer = require('kafka').consumer(settings)
        consumer:subscribe('_topic_')
        return consumer:poll(1000)
    "#
    .replace("_brokers_", &brokers)
    .replace("_topic_", &topic);
    let message: Table = lua.load(script).eval()?;
    let timestamp: f64 = message.get("timestamp")?;
    let topic: String = message.get("topic")?;
    let partition: u32 = message.get("partition")?;
    let offset: u64 = message.get("offset")?;
    let key: Vec<u8> = message.get("key")?;
    let payload: Vec<u8> = message.get("payload")?;
    eprintln!(
        "got: timestamp={} topic={} partition={} offset={} key={:?} payload={:?}",
        timestamp, topic, partition, offset, key, payload
    );
    Ok(())
}

#[test]
fn consuming_empty_topic_returns_nil() -> Result<(), Box<dyn Error>> {
    let brokers = match env::var(KAFKA_BROKERS) {
        Ok(ok) => ok,
        Err(_e) => {
            log::warn!("Skipping test because no {} is set", KAFKA_BROKERS);
            return Ok(());
        }
    };
    let topic = match env::var(KAFKA_TOPIC) {
        Ok(ok) => ok,
        Err(_e) => {
            log::warn!("Skipping test because no {} is set", KAFKA_TOPIC);
            return Ok(());
        }
    };
    let lua = Lua::new();
    super::preload(&lua)?;
    let script = r#"
        local settings = {
            ['bootstrap.servers'] = '_brokers_',
            ['auto.offset.reset'] = 'latest',
            ['group.id'] = 'mlua-kafka/src/integration_tests.rs',
        }
        local consumer = require('kafka').consumer(settings)
        consumer:subscribe('_topic_')
        return consumer:poll(10)
    "#
    .replace("_brokers_", &brokers)
    .replace("_topic_", &topic);
    let message: Value = lua.load(script).eval()?;
    assert_eq!(message, mlua::Nil);
    Ok(())
}