Skip to main content

camel_component_kafka/
lib.rs

1pub mod config;
2pub mod consumer;
3pub mod manual_commit;
4pub mod producer;
5
6pub use config::{KafkaConfig, KafkaEndpointConfig};
7pub use consumer::KafkaConsumer;
8pub use manual_commit::KafkaManualCommit;
9pub use producer::KafkaProducer;
10
11use camel_api::{BoxProcessor, CamelError};
12use camel_component::{Component, Consumer, Endpoint, ProducerContext};
13
14pub struct KafkaComponent {
15    config: Option<KafkaConfig>,
16}
17
18impl KafkaComponent {
19    /// Create a new KafkaComponent without global config defaults.
20    /// Endpoint configs will fall back to hardcoded defaults via `resolve_defaults()`.
21    pub fn new() -> Self {
22        Self { config: None }
23    }
24
25    /// Create a KafkaComponent with global config defaults.
26    /// These will be applied to endpoint configs before `resolve_defaults()`.
27    pub fn with_config(config: KafkaConfig) -> Self {
28        Self {
29            config: Some(config),
30        }
31    }
32
33    /// Create a KafkaComponent with optional global config defaults.
34    /// If `None`, behaves like `new()` (uses hardcoded defaults only).
35    pub fn with_optional_config(config: Option<KafkaConfig>) -> Self {
36        Self { config }
37    }
38}
39
40impl Default for KafkaComponent {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl Component for KafkaComponent {
47    fn scheme(&self) -> &str {
48        "kafka"
49    }
50
51    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
52        let mut config = KafkaEndpointConfig::from_uri(uri)?;
53        // Apply global config defaults if available
54        if let Some(ref global_cfg) = self.config {
55            config.apply_defaults(global_cfg);
56        }
57        // Resolve any remaining None fields to hardcoded defaults
58        config.resolve_defaults();
59        Ok(Box::new(KafkaEndpoint {
60            uri: uri.to_string(),
61            config,
62        }))
63    }
64}
65
66struct KafkaEndpoint {
67    uri: String,
68    config: KafkaEndpointConfig,
69}
70
71impl Endpoint for KafkaEndpoint {
72    fn uri(&self) -> &str {
73        &self.uri
74    }
75
76    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
77        Ok(BoxProcessor::new(KafkaProducer::new(self.config.clone())?))
78    }
79
80    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
81        Ok(Box::new(KafkaConsumer::new(self.config.clone())))
82    }
83}