camel_component_kafka/
lib.rs1pub mod config;
2pub mod consumer;
3pub mod manual_commit;
4pub mod producer;
5
6pub use config::{KafkaConfig, KafkaEndpointConfig};
7pub use consumer::KafkaConsumer;
8pub use manual_commit::KafkaManualCommit;
9pub use producer::KafkaProducer;
10
11use camel_api::{BoxProcessor, CamelError};
12use camel_component::{Component, Consumer, Endpoint, ProducerContext};
13
14pub struct KafkaComponent {
15 config: Option<KafkaConfig>,
16}
17
18impl KafkaComponent {
19 pub fn new() -> Self {
22 Self { config: None }
23 }
24
25 pub fn with_config(config: KafkaConfig) -> Self {
28 Self {
29 config: Some(config),
30 }
31 }
32
33 pub fn with_optional_config(config: Option<KafkaConfig>) -> Self {
36 Self { config }
37 }
38}
39
40impl Default for KafkaComponent {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl Component for KafkaComponent {
47 fn scheme(&self) -> &str {
48 "kafka"
49 }
50
51 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
52 let mut config = KafkaEndpointConfig::from_uri(uri)?;
53 if let Some(ref global_cfg) = self.config {
55 config.apply_defaults(global_cfg);
56 }
57 config.resolve_defaults();
59 Ok(Box::new(KafkaEndpoint {
60 uri: uri.to_string(),
61 config,
62 }))
63 }
64}
65
66struct KafkaEndpoint {
67 uri: String,
68 config: KafkaEndpointConfig,
69}
70
71impl Endpoint for KafkaEndpoint {
72 fn uri(&self) -> &str {
73 &self.uri
74 }
75
76 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
77 Ok(BoxProcessor::new(KafkaProducer::new(self.config.clone())?))
78 }
79
80 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
81 Ok(Box::new(KafkaConsumer::new(self.config.clone())))
82 }
83}