kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation
use std::env;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{Context, Result, bail};
use kafkit_client::{
    AutoOffsetReset, KafkaAdmin, KafkaClient, KafkaMessage, NewTopic, RecordHeader,
};
use tokio::time::Instant;

const DEFAULT_BOOTSTRAP: &str = "localhost:9092";
const DEFAULT_TOPIC: &str = "kafkit.orders";
const EXPECTED_RECORDS: usize = 5;

#[tokio::main]
async fn main() -> Result<()> {
    let bootstrap =
        env::var("KAFKIT_BOOTSTRAP_SERVERS").unwrap_or_else(|_| DEFAULT_BOOTSTRAP.to_owned());
    let topic = env::var("KAFKIT_TOPIC").unwrap_or_else(|_| DEFAULT_TOPIC.to_owned());
    let group_id = env::var("KAFKIT_GROUP_ID").unwrap_or_else(|_| {
        format!(
            "kafkit-order-example-{}",
            SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .expect("system clock is before unix epoch")
                .as_millis()
        )
    });

    println!("bootstrap: {bootstrap}");
    println!("topic: {topic}");
    println!("group: {group_id}");

    ensure_topic(&bootstrap, &topic).await?;

    let client = KafkaClient::new(bootstrap).topic(topic.clone());
    let producer = client
        .producer()
        .with_client_id("kafkit-order-example-producer")
        .connect()
        .await
        .context("failed to connect producer")?;

    for event in sample_order_events() {
        producer
            .send_message(
                KafkaMessage::new(event.payload)
                    .with_key(event.order_id)
                    .with_header(RecordHeader::new("event-type", event.event_type))
                    .with_header(RecordHeader::new("example", "order-workflow")),
            )
            .await
            .with_context(|| format!("failed to produce {}", event.order_id))?;
    }
    producer.flush().await.context("failed to flush producer")?;
    println!("produced {EXPECTED_RECORDS} order events");

    let consumer = client
        .consumer(group_id)
        .with_client_id("kafkit-order-example-consumer")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_poll_timeout(Duration::from_millis(500))
        .connect()
        .await
        .context("failed to connect consumer")?;

    let deadline = Instant::now() + Duration::from_secs(30);
    let mut consumed = 0usize;

    while consumed < EXPECTED_RECORDS {
        if Instant::now() >= deadline {
            consumer.shutdown().await.ok();
            producer.shutdown().await.ok();
            bail!("timed out waiting for {EXPECTED_RECORDS} records; consumed {consumed}");
        }

        let records = consumer.poll().await.context("failed to poll consumer")?;
        if records.is_empty() {
            continue;
        }

        for record in records.iter() {
            let value = record
                .value
                .as_deref()
                .map(String::from_utf8_lossy)
                .map(|value| value.into_owned())
                .unwrap_or_else(|| "<tombstone>".to_owned());
            let key = record
                .key
                .as_deref()
                .map(String::from_utf8_lossy)
                .map(|key| key.into_owned())
                .unwrap_or_else(|| "<none>".to_owned());

            println!(
                "consumed key={key} topic={} partition={} offset={} value={value}",
                record.topic, record.partition, record.offset
            );
            consumed += 1;
        }

        consumer
            .commit(&records)
            .await
            .context("failed to commit consumed records")?;
    }

    consumer
        .shutdown()
        .await
        .context("failed to shut down consumer")?;
    producer
        .shutdown()
        .await
        .context("failed to shut down producer")?;
    println!("done");

    Ok(())
}

async fn ensure_topic(bootstrap: &str, topic: &str) -> Result<()> {
    let admin = KafkaAdmin::connect(kafkit_client::AdminConfig::new(bootstrap))
        .await
        .context("failed to connect admin client")?;

    let topics = admin.list_topics().await.context("failed to list topics")?;
    if topics.iter().any(|listing| listing.name == topic) {
        println!("topic already exists");
        return Ok(());
    }

    admin
        .create_topics([NewTopic::new(topic, 3, 1).with_config("cleanup.policy", "delete")])
        .await
        .with_context(|| format!("failed to create topic {topic}"))?;
    println!("created topic {topic}");

    Ok(())
}

struct OrderEvent {
    order_id: &'static str,
    event_type: &'static str,
    payload: &'static str,
}

fn sample_order_events() -> [OrderEvent; EXPECTED_RECORDS] {
    [
        OrderEvent {
            order_id: "order-1001",
            event_type: "order.created",
            payload: r#"{"order_id":"order-1001","status":"created","total":42.50}"#,
        },
        OrderEvent {
            order_id: "order-1002",
            event_type: "order.created",
            payload: r#"{"order_id":"order-1002","status":"created","total":17.25}"#,
        },
        OrderEvent {
            order_id: "order-1001",
            event_type: "order.paid",
            payload: r#"{"order_id":"order-1001","status":"paid","total":42.50}"#,
        },
        OrderEvent {
            order_id: "order-1003",
            event_type: "order.created",
            payload: r#"{"order_id":"order-1003","status":"created","total":99.00}"#,
        },
        OrderEvent {
            order_id: "order-1001",
            event_type: "order.shipped",
            payload: r#"{"order_id":"order-1001","status":"shipped","carrier":"demo-post"}"#,
        },
    ]
}