use dsh_sdk::DshKafkaConfig;
use dsh_sdk::utils::dlq::{self, DlqChannel, ErrorToDlq};
use dsh_sdk::utils::graceful_shutdown::Shutdown;
use rdkafka::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::{BorrowedMessage, Message, OwnedMessage};
use std::backtrace::Backtrace;
use thiserror::Error;
const DLQ_DEAD_TOPIC: &str = "scratch.dlq.local-tenant"; const DLQ_RETRY_TOPIC: &str = "scratch.dlq.local-tenant"; const TOPIC: &str = "scratch.topic-name.local-tenant";
#[derive(Error, Debug)]
enum ConsumerError {
#[error("Deserialization error: {0}")]
DeserializeError(#[from] std::string::FromUtf8Error),
}
impl ErrorToDlq for ConsumerError {
fn to_dlq(&self, kafka_message: OwnedMessage) -> dlq::SendToDlq {
let backtrace = Backtrace::force_capture(); dlq::SendToDlq::new(
kafka_message,
self.retryable(),
self.to_string(),
Some(backtrace.to_string()),
)
}
fn retryable(&self) -> dlq::Retryable {
match self {
ConsumerError::DeserializeError(_) => dlq::Retryable::NonRetryable,
}
}
}
fn deserialize(msg: &BorrowedMessage) -> Result<String, ConsumerError> {
match msg.payload() {
Some(payload) => Ok(String::from_utf8(payload.to_vec())?),
None => Ok("".to_string()),
}
}
async fn consume(
consumer: StreamConsumer,
topic: &str,
mut dlq_channel: DlqChannel,
shutdown: Shutdown,
) {
consumer
.subscribe(&[topic])
.expect("Can't subscribe to topic");
loop {
tokio::select! {
msg = consumer.recv() => match msg {
Ok(msg) => {
match deserialize(&msg) {
Err(e) => e.to_dlq(msg.detach()).send(&mut dlq_channel).await,
Ok(payload) => {
println!("Payload: {}", payload)
}
}
}
Err(e) => {
eprintln!("Error while receiving message: {}", e);
}
},
_ = shutdown.signal_listener() => {
println!("Shutting down consumer");
break;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
unsafe {
std::env::set_var("DLQ_DEAD_TOPIC", DLQ_DEAD_TOPIC);
std::env::set_var("DLQ_RETRY_TOPIC", DLQ_RETRY_TOPIC);
}
let shutdown = Shutdown::new();
let consumer: StreamConsumer = ClientConfig::new().set_dsh_consumer_config().create()?;
let dlq_channel = dlq::Dlq::start(shutdown.clone())?;
let shutdown_clone = shutdown.clone();
let consumer_handle = tokio::spawn(async move {
consume(consumer, TOPIC, dlq_channel, shutdown_clone).await;
});
tokio::select! {
_ = consumer_handle => {
println!("Consumer finished");
}
_ = shutdown.signal_listener() => {
println!("Shutting down");
}
}
shutdown.complete().await;
Ok(())
}