coreon-kafka 0.1.0

kafka:* component for camel-rs (producer + consumer via rdkafka).
Documentation
//! rdkafka-backed Producer / Consumer / Endpoint. Built only when the
//! `rdkafka-backend` feature is enabled.

use crate::KafkaEndpoint;
use async_trait::async_trait;
use bytes::Bytes;
use coreon_core::{
    message::{Body, Message},
    CamelError, Consumer, Endpoint, Exchange, Processor, Producer, Result,
};
use rdkafka::{
    config::ClientConfig,
    consumer::{Consumer as KConsumer, StreamConsumer},
    producer::{FutureProducer, FutureRecord},
    Message as KMessage,
};
use std::{sync::Arc, time::Duration};
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{debug, warn};

#[async_trait]
impl Endpoint for KafkaEndpoint {
    fn uri(&self) -> &str {
        &self.uri
    }

    async fn create_producer(&self) -> Result<Arc<dyn Producer>> {
        let producer: FutureProducer = ClientConfig::new()
            .set("bootstrap.servers", &self.brokers)
            .create()
            .map_err(|e| CamelError::Endpoint(format!("kafka producer init: {e}")))?;
        Ok(Arc::new(KafkaProducer {
            topic: self.topic.clone(),
            producer,
        }))
    }

    async fn create_consumer(&self, pipeline: Arc<dyn Processor>) -> Result<Arc<dyn Consumer>> {
        let consumer: StreamConsumer = ClientConfig::new()
            .set("bootstrap.servers", &self.brokers)
            .set("group.id", &self.group)
            .set("enable.auto.commit", "true")
            .set("auto.offset.reset", &self.auto_offset_reset)
            .create()
            .map_err(|e| CamelError::Endpoint(format!("kafka consumer init: {e}")))?;
        consumer
            .subscribe(&[&self.topic])
            .map_err(|e| CamelError::Endpoint(format!("kafka subscribe: {e}")))?;
        Ok(Arc::new(KafkaConsumer {
            topic: self.topic.clone(),
            consumer: Arc::new(consumer),
            pipeline,
            task: Mutex::new(None),
        }))
    }
}

pub struct KafkaProducer {
    topic: String,
    producer: FutureProducer,
}

#[async_trait]
impl Producer for KafkaProducer {
    async fn send(&self, exchange: &mut Exchange) -> Result<()> {
        let payload: Bytes = match &exchange.r#in.body {
            Body::Empty => Bytes::new(),
            Body::Text(s) => Bytes::from(s.clone().into_bytes()),
            Body::Bytes(b) => b.clone(),
            Body::Custom(_) => {
                return Err(CamelError::Processor(
                    "kafka: cannot serialize Body::Custom".into(),
                ))
            }
        };
        let key_owned = exchange.r#in.header("CamelKafkaKey").map(str::to_owned);
        let mut record = FutureRecord::to(&self.topic).payload(payload.as_ref());
        if let Some(k) = key_owned.as_deref() {
            record = record.key(k);
        }
        self.producer
            .send(record, Duration::from_secs(10))
            .await
            .map_err(|(e, _)| CamelError::Processor(format!("kafka send: {e}")))?;
        debug!(topic = %self.topic, bytes = payload.len(), "kafka: sent");
        Ok(())
    }
}

pub struct KafkaConsumer {
    topic: String,
    consumer: Arc<StreamConsumer>,
    pipeline: Arc<dyn Processor>,
    task: Mutex<Option<JoinHandle<()>>>,
}

#[async_trait]
impl Consumer for KafkaConsumer {
    async fn start(&self) -> Result<()> {
        let consumer = self.consumer.clone();
        let pipeline = self.pipeline.clone();
        let topic = self.topic.clone();
        let handle = tokio::spawn(async move {
            loop {
                match consumer.recv().await {
                    Err(e) => warn!(topic = %topic, error = %e, "kafka: recv error"),
                    Ok(msg) => {
                        let payload = msg.payload().unwrap_or_default().to_vec();
                        let key = msg
                            .key()
                            .and_then(|b| std::str::from_utf8(b).ok())
                            .unwrap_or("")
                            .to_owned();
                        let m = Message {
                            headers: std::iter::once(("CamelKafkaKey".to_owned(), key))
                                .chain(std::iter::once((
                                    "CamelKafkaTopic".to_owned(),
                                    topic.clone(),
                                )))
                                .collect(),
                            body: Body::Bytes(Bytes::from(payload)),
                        };
                        let mut ex = Exchange {
                            r#in: m,
                            ..Exchange::default()
                        };
                        if let Err(e) = pipeline.process(&mut ex).await {
                            warn!(topic = %topic, error = %e, "kafka: pipeline failed");
                        }
                    }
                }
            }
        });
        *self.task.lock().await = Some(handle);
        Ok(())
    }

    async fn stop(&self) -> Result<()> {
        if let Some(h) = self.task.lock().await.take() {
            h.abort();
        }
        Ok(())
    }
}