use std::future::Future;
use std::pin::Pin;
use std::process;
use std::time::Duration;
use clap::{Arg, Command};
use futures::stream::StreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::AsyncRuntime;
use crate::example_utils::setup_logger;
mod example_utils;
pub struct AsyncStdRuntime;
impl AsyncRuntime for AsyncStdRuntime {
type Delay = Pin<Box<dyn Future<Output = ()> + Send>>;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static,
{
async_std::task::spawn(task);
}
fn delay_for(duration: Duration) -> Self::Delay {
Box::pin(async_std::task::sleep(duration))
}
}
#[async_std::main]
async fn main() {
let matches = Command::new("smol runtime example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Demonstrates using rust-rdkafka with a custom async runtime")
.arg(
Arg::new("brokers")
.short('b')
.long("brokers")
.help("Broker list in kafka format")
.default_value("localhost:9092"),
)
.arg(Arg::new("topic").long("topic").help("topic").required(true))
.arg(
Arg::new("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')"),
)
.get_matches();
setup_logger(true, matches.get_one("log-conf"));
let brokers = matches.get_one::<String>("brokers").unwrap();
let topic = matches.get_one::<String>("topic").unwrap().to_owned();
let producer: FutureProducer<_, AsyncStdRuntime> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let delivery_status = producer
.send::<Vec<u8>, _, _>(
FutureRecord::to(&topic).payload("hello from async-std"),
Duration::from_secs(0),
)
.await;
if let Err((e, _)) = delivery_status {
eprintln!("unable to send message: {}", e);
process::exit(1);
}
let consumer: StreamConsumer<_, AsyncStdRuntime> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set("group.id", "rust-rdkafka-smol-runtime-example")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&[&topic]).unwrap();
let mut stream = consumer.stream();
let message = stream.next().await;
match message {
Some(Ok(message)) => println!(
"Received message: {}",
match message.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(_)) => "<invalid utf-8>",
}
),
Some(Err(e)) => {
eprintln!("Error receiving message: {}", e);
process::exit(1);
}
None => {
eprintln!("Consumer unexpectedly returned no messages");
process::exit(1);
}
}
}