tiny_kafka 1.0.6

A tiny Kafka client library with producer and consumer functionalities.
Documentation
use std::time::Duration;
use tiny_kafka::{KafkaConsumer as Consumer, KafkaProducer as Producer};
use tokio;
use tracing_test::traced_test;

async fn setup_kafka() {
    // Wait for Kafka to be ready
    tokio::time::sleep(Duration::from_secs(2)).await;
}

#[tokio::test]
#[traced_test]
async fn test_producer_consumer() {
    setup_kafka().await;

    let topic = "test_topic";
    let mut producer = Producer::new("localhost:9092".to_string())
        .await
        .expect("Failed to create producer");
    let mut consumer = Consumer::new(
        "localhost:9092".to_string(),
        "test_group".to_string(),
        topic.to_string(),
    )
    .await
    .expect("Failed to create consumer");

    let test_message = "test message";

    // Produce message
    producer
        .send_message(topic, 0, test_message)
        .await
        .expect("Failed to send message");

    // Consume message
    let messages = consumer
        .consume()
        .await
        .expect("Failed to consume messages");

    // Verify message was received
    assert!(!messages.is_empty());
    assert_eq!(String::from_utf8_lossy(&messages[0]), test_message);

    // Clean up
    consumer.close().await.expect("Failed to close consumer");
}

#[tokio::test]
#[traced_test]
async fn test_multiple_messages() {
    setup_kafka().await;

    let topic = "test_topic_multiple";
    let mut producer = Producer::new("localhost:9092".to_string())
        .await
        .expect("Failed to create producer");
    let mut consumer = Consumer::new(
        "localhost:9092".to_string(),
        "test_group_multiple".to_string(),
        topic.to_string(),
    )
    .await
    .expect("Failed to create consumer");

    let test_messages = vec!["message1", "message2", "message3"];

    // Produce messages
    for msg in &test_messages {
        producer
            .send_message(topic, 0, msg)
            .await
            .expect("Failed to send message");
    }

    // Consume messages
    let messages = consumer
        .consume()
        .await
        .expect("Failed to consume messages");

    assert_eq!(messages.len(), test_messages.len());
    for (received, original) in messages.iter().zip(test_messages.iter()) {
        assert_eq!(String::from_utf8_lossy(&received), original.to_string());
    }

    // Clean up
    consumer.close().await.expect("Failed to close consumer");
}

#[tokio::test]
#[traced_test]
async fn test_empty_topic() {
    setup_kafka().await;

    let topic = "empty_topic";
    let mut consumer = Consumer::new(
        "localhost:9092".to_string(),
        "test_group_empty".to_string(),
        topic.to_string(),
    )
    .await
    .expect("Failed to create consumer");

    // Try to consume from empty topic
    let messages = consumer
        .consume()
        .await
        .expect("Failed to consume messages");

    assert!(messages.is_empty());

    // Clean up
    consumer.close().await.expect("Failed to close consumer");
}