use std::thread;
use std::time::Duration;
use clap::{Arg, Command};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use crate::example_utils::setup_logger;
mod example_utils;
async fn record_borrowed_message_receipt(msg: &BorrowedMessage<'_>) {
info!("Message received: {}", msg.offset());
}
async fn record_owned_message_receipt(_msg: &OwnedMessage) {
}
fn expensive_computation(msg: OwnedMessage) -> String {
info!("Starting expensive computation on message {}", msg.offset());
thread::sleep(Duration::from_millis(rand::random::<u64>() % 5000));
info!(
"Expensive computation completed on message {}",
msg.offset()
);
match msg.payload_view::<str>() {
Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
Some(Err(_)) => "Message payload is not a string".to_owned(),
None => "No payload".to_owned(),
}
}
async fn run_async_processor(
brokers: String,
group_id: String,
input_topic: String,
output_topic: String,
) {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", &brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer
.subscribe(&[&input_topic])
.expect("Can't subscribe to specified topic");
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
let producer = producer.clone();
let output_topic = output_topic.to_string();
async move {
record_borrowed_message_receipt(&borrowed_message).await;
let owned_message = borrowed_message.detach();
record_owned_message_receipt(&owned_message).await;
tokio::spawn(async move {
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
Duration::from_secs(0),
);
match produce_future.await {
Ok(delivery) => println!("Sent: {:?}", delivery),
Err((e, _)) => println!("Error: {:?}", e),
}
});
Ok(())
}
});
info!("Starting event loop");
stream_processor.await.expect("stream processing failed");
info!("Stream processing terminated");
}
#[tokio::main]
async fn main() {
let matches = Command::new("Async example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Asynchronous computation example")
.arg(
Arg::new("brokers")
.short('b')
.long("brokers")
.help("Broker list in kafka format")
.default_value("localhost:9092"),
)
.arg(
Arg::new("group-id")
.short('g')
.long("group-id")
.help("Consumer group id")
.default_value("example_consumer_group_id"),
)
.arg(
Arg::new("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')"),
)
.arg(
Arg::new("input-topic")
.long("input-topic")
.help("Input topic")
.required(true),
)
.arg(
Arg::new("output-topic")
.long("output-topic")
.help("Output topic")
.required(true),
)
.arg(
Arg::new("num-workers")
.long("num-workers")
.help("Number of workers")
.value_parser(clap::value_parser!(usize))
.default_value("1"),
)
.get_matches();
setup_logger(true, matches.get_one("log-conf"));
let brokers = matches.get_one::<String>("brokers").unwrap();
let group_id = matches.get_one::<String>("group-id").unwrap();
let input_topic = matches.get_one::<String>("input-topic").unwrap();
let output_topic = matches.get_one::<String>("output-topic").unwrap();
let num_workers = *matches.get_one::<usize>("num-workers").unwrap();
(0..num_workers)
.map(|_| {
tokio::spawn(run_async_processor(
brokers.to_owned(),
group_id.to_owned(),
input_topic.to_owned(),
output_topic.to_owned(),
))
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await
}