use crate::KafkaEndpoint;
use async_trait::async_trait;
use bytes::Bytes;
use coreon_core::{
message::{Body, Message},
CamelError, Consumer, Endpoint, Exchange, Processor, Producer, Result,
};
use rdkafka::{
config::ClientConfig,
consumer::{Consumer as KConsumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
Message as KMessage,
};
use std::{sync::Arc, time::Duration};
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{debug, warn};
#[async_trait]
impl Endpoint for KafkaEndpoint {
fn uri(&self) -> &str {
&self.uri
}
async fn create_producer(&self) -> Result<Arc<dyn Producer>> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &self.brokers)
.create()
.map_err(|e| CamelError::Endpoint(format!("kafka producer init: {e}")))?;
Ok(Arc::new(KafkaProducer {
topic: self.topic.clone(),
producer,
}))
}
async fn create_consumer(&self, pipeline: Arc<dyn Processor>) -> Result<Arc<dyn Consumer>> {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", &self.brokers)
.set("group.id", &self.group)
.set("enable.auto.commit", "true")
.set("auto.offset.reset", &self.auto_offset_reset)
.create()
.map_err(|e| CamelError::Endpoint(format!("kafka consumer init: {e}")))?;
consumer
.subscribe(&[&self.topic])
.map_err(|e| CamelError::Endpoint(format!("kafka subscribe: {e}")))?;
Ok(Arc::new(KafkaConsumer {
topic: self.topic.clone(),
consumer: Arc::new(consumer),
pipeline,
task: Mutex::new(None),
}))
}
}
pub struct KafkaProducer {
topic: String,
producer: FutureProducer,
}
#[async_trait]
impl Producer for KafkaProducer {
async fn send(&self, exchange: &mut Exchange) -> Result<()> {
let payload: Bytes = match &exchange.r#in.body {
Body::Empty => Bytes::new(),
Body::Text(s) => Bytes::from(s.clone().into_bytes()),
Body::Bytes(b) => b.clone(),
Body::Custom(_) => {
return Err(CamelError::Processor(
"kafka: cannot serialize Body::Custom".into(),
))
}
};
let key_owned = exchange.r#in.header("CamelKafkaKey").map(str::to_owned);
let mut record = FutureRecord::to(&self.topic).payload(payload.as_ref());
if let Some(k) = key_owned.as_deref() {
record = record.key(k);
}
self.producer
.send(record, Duration::from_secs(10))
.await
.map_err(|(e, _)| CamelError::Processor(format!("kafka send: {e}")))?;
debug!(topic = %self.topic, bytes = payload.len(), "kafka: sent");
Ok(())
}
}
pub struct KafkaConsumer {
topic: String,
consumer: Arc<StreamConsumer>,
pipeline: Arc<dyn Processor>,
task: Mutex<Option<JoinHandle<()>>>,
}
#[async_trait]
impl Consumer for KafkaConsumer {
async fn start(&self) -> Result<()> {
let consumer = self.consumer.clone();
let pipeline = self.pipeline.clone();
let topic = self.topic.clone();
let handle = tokio::spawn(async move {
loop {
match consumer.recv().await {
Err(e) => warn!(topic = %topic, error = %e, "kafka: recv error"),
Ok(msg) => {
let payload = msg.payload().unwrap_or_default().to_vec();
let key = msg
.key()
.and_then(|b| std::str::from_utf8(b).ok())
.unwrap_or("")
.to_owned();
let m = Message {
headers: std::iter::once(("CamelKafkaKey".to_owned(), key))
.chain(std::iter::once((
"CamelKafkaTopic".to_owned(),
topic.clone(),
)))
.collect(),
body: Body::Bytes(Bytes::from(payload)),
};
let mut ex = Exchange {
r#in: m,
..Exchange::default()
};
if let Err(e) = pipeline.process(&mut ex).await {
warn!(topic = %topic, error = %e, "kafka: pipeline failed");
}
}
}
}
});
*self.task.lock().await = Some(handle);
Ok(())
}
async fn stop(&self) -> Result<()> {
if let Some(h) = self.task.lock().await.take() {
h.abort();
}
Ok(())
}
}