camel_component_redis/
lib.rs1pub mod bundle;
2pub mod commands;
3pub mod config;
4pub mod consumer;
5pub mod producer;
6
7use camel_component_api::{BoxProcessor, CamelError};
8use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
9
10pub use bundle::RedisBundle;
11pub use config::{RedisCommand, RedisConfig, RedisEndpointConfig};
12pub use consumer::RedisConsumer;
13pub use producer::RedisProducer;
14
15pub struct RedisComponent {
16 config: Option<RedisConfig>,
17}
18
19impl RedisComponent {
20 pub fn new() -> Self {
23 Self { config: None }
24 }
25
26 pub fn with_config(config: RedisConfig) -> Self {
29 Self {
30 config: Some(config),
31 }
32 }
33
34 pub fn with_optional_config(config: Option<RedisConfig>) -> Self {
37 Self { config }
38 }
39}
40
41impl Default for RedisComponent {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl Component for RedisComponent {
48 fn scheme(&self) -> &str {
49 "redis"
50 }
51
52 fn create_endpoint(
53 &self,
54 uri: &str,
55 _ctx: &dyn camel_component_api::ComponentContext,
56 ) -> Result<Box<dyn Endpoint>, CamelError> {
57 let mut config = RedisEndpointConfig::from_uri(uri)?;
58 if let Some(ref global_cfg) = self.config {
60 config.apply_defaults(global_cfg);
61 }
62 config.resolve_defaults();
64 Ok(Box::new(RedisEndpoint {
65 uri: uri.to_string(),
66 config,
67 }))
68 }
69}
70
71struct RedisEndpoint {
72 uri: String,
73 config: RedisEndpointConfig,
74}
75
76impl Endpoint for RedisEndpoint {
77 fn uri(&self) -> &str {
78 &self.uri
79 }
80
81 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
82 Ok(BoxProcessor::new(RedisProducer::new(self.config.clone())))
83 }
84
85 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
86 Ok(Box::new(RedisConsumer::new(self.config.clone())))
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use camel_component_api::NoOpComponentContext;
94
95 #[test]
96 fn test_component_scheme() {
97 let component = RedisComponent::new();
98 assert_eq!(component.scheme(), "redis");
99 }
100
101 #[test]
102 fn test_component_creates_endpoint() {
103 let component = RedisComponent::new();
104 let ctx = NoOpComponentContext;
105 let endpoint = component
106 .create_endpoint("redis://localhost:6379?command=GET", &ctx)
107 .expect("endpoint should be created");
108 assert_eq!(endpoint.uri(), "redis://localhost:6379?command=GET");
109 }
110
111 #[test]
112 fn test_component_rejects_wrong_scheme() {
113 let component = RedisComponent::new();
114 let ctx = NoOpComponentContext;
115 let result = component.create_endpoint("kafka:topic?brokers=localhost:9092", &ctx);
116 assert!(result.is_err(), "wrong scheme should fail");
117 let err = result.err().expect("error must exist");
118 assert!(err.to_string().contains("expected scheme 'redis'"));
119 }
120
121 #[test]
122 fn test_component_applies_global_defaults() {
123 let global = RedisConfig::default()
124 .with_host("redis-global")
125 .with_port(6380);
126 let component = RedisComponent::with_config(global);
127 let ctx = NoOpComponentContext;
128
129 let endpoint = component
130 .create_endpoint("redis://?command=GET", &ctx)
131 .expect("endpoint should be created with defaults");
132
133 let _producer = endpoint
134 .create_producer(&ProducerContext::default())
135 .expect("producer should be created");
136 let _consumer = endpoint
137 .create_consumer()
138 .expect("consumer should be created");
139 }
140}