use std::env;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, bail};
use kafkit_client::{
AutoOffsetReset, KafkaAdmin, KafkaClient, KafkaMessage, NewTopic, RecordHeader,
};
use tokio::time::Instant;
const DEFAULT_BOOTSTRAP: &str = "localhost:9092";
const DEFAULT_TOPIC: &str = "kafkit.orders";
const EXPECTED_RECORDS: usize = 5;
#[tokio::main]
async fn main() -> Result<()> {
let bootstrap =
env::var("KAFKIT_BOOTSTRAP_SERVERS").unwrap_or_else(|_| DEFAULT_BOOTSTRAP.to_owned());
let topic = env::var("KAFKIT_TOPIC").unwrap_or_else(|_| DEFAULT_TOPIC.to_owned());
let group_id = env::var("KAFKIT_GROUP_ID").unwrap_or_else(|_| {
format!(
"kafkit-order-example-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock is before unix epoch")
.as_millis()
)
});
println!("bootstrap: {bootstrap}");
println!("topic: {topic}");
println!("group: {group_id}");
ensure_topic(&bootstrap, &topic).await?;
let client = KafkaClient::new(bootstrap).topic(topic.clone());
let producer = client
.producer()
.with_client_id("kafkit-order-example-producer")
.connect()
.await
.context("failed to connect producer")?;
for event in sample_order_events() {
producer
.send_message(
KafkaMessage::new(event.payload)
.with_key(event.order_id)
.with_header(RecordHeader::new("event-type", event.event_type))
.with_header(RecordHeader::new("example", "order-workflow")),
)
.await
.with_context(|| format!("failed to produce {}", event.order_id))?;
}
producer.flush().await.context("failed to flush producer")?;
println!("produced {EXPECTED_RECORDS} order events");
let consumer = client
.consumer(group_id)
.with_client_id("kafkit-order-example-consumer")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_poll_timeout(Duration::from_millis(500))
.connect()
.await
.context("failed to connect consumer")?;
let deadline = Instant::now() + Duration::from_secs(30);
let mut consumed = 0usize;
while consumed < EXPECTED_RECORDS {
if Instant::now() >= deadline {
consumer.shutdown().await.ok();
producer.shutdown().await.ok();
bail!("timed out waiting for {EXPECTED_RECORDS} records; consumed {consumed}");
}
let records = consumer.poll().await.context("failed to poll consumer")?;
if records.is_empty() {
continue;
}
for record in records.iter() {
let value = record
.value
.as_deref()
.map(String::from_utf8_lossy)
.map(|value| value.into_owned())
.unwrap_or_else(|| "<tombstone>".to_owned());
let key = record
.key
.as_deref()
.map(String::from_utf8_lossy)
.map(|key| key.into_owned())
.unwrap_or_else(|| "<none>".to_owned());
println!(
"consumed key={key} topic={} partition={} offset={} value={value}",
record.topic, record.partition, record.offset
);
consumed += 1;
}
consumer
.commit(&records)
.await
.context("failed to commit consumed records")?;
}
consumer
.shutdown()
.await
.context("failed to shut down consumer")?;
producer
.shutdown()
.await
.context("failed to shut down producer")?;
println!("done");
Ok(())
}
async fn ensure_topic(bootstrap: &str, topic: &str) -> Result<()> {
let admin = KafkaAdmin::connect(kafkit_client::AdminConfig::new(bootstrap))
.await
.context("failed to connect admin client")?;
let topics = admin.list_topics().await.context("failed to list topics")?;
if topics.iter().any(|listing| listing.name == topic) {
println!("topic already exists");
return Ok(());
}
admin
.create_topics([NewTopic::new(topic, 3, 1).with_config("cleanup.policy", "delete")])
.await
.with_context(|| format!("failed to create topic {topic}"))?;
println!("created topic {topic}");
Ok(())
}
struct OrderEvent {
order_id: &'static str,
event_type: &'static str,
payload: &'static str,
}
fn sample_order_events() -> [OrderEvent; EXPECTED_RECORDS] {
[
OrderEvent {
order_id: "order-1001",
event_type: "order.created",
payload: r#"{"order_id":"order-1001","status":"created","total":42.50}"#,
},
OrderEvent {
order_id: "order-1002",
event_type: "order.created",
payload: r#"{"order_id":"order-1002","status":"created","total":17.25}"#,
},
OrderEvent {
order_id: "order-1001",
event_type: "order.paid",
payload: r#"{"order_id":"order-1001","status":"paid","total":42.50}"#,
},
OrderEvent {
order_id: "order-1003",
event_type: "order.created",
payload: r#"{"order_id":"order-1003","status":"created","total":99.00}"#,
},
OrderEvent {
order_id: "order-1001",
event_type: "order.shipped",
payload: r#"{"order_id":"order-1001","status":"shipped","carrier":"demo-post"}"#,
},
]
}