camel_component_opensearch/
lib.rs1pub mod bundle;
8pub mod config;
9pub mod producer;
10
11use camel_component_api::{BoxProcessor, CamelError};
12use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
13
14pub use bundle::OpenSearchBundle;
15pub use config::{OpenSearchConfig, OpenSearchEndpointConfig, OpenSearchOperation};
16pub use producer::OpenSearchProducer;
17
18pub struct OpenSearchComponent {
23 config: Option<OpenSearchConfig>,
24}
25
26impl OpenSearchComponent {
27 pub fn new() -> Self {
30 Self { config: None }
31 }
32
33 pub fn with_config(config: OpenSearchConfig) -> Self {
36 Self {
37 config: Some(config),
38 }
39 }
40
41 pub fn with_optional_config(config: Option<OpenSearchConfig>) -> Self {
44 Self { config }
45 }
46}
47
48impl Default for OpenSearchComponent {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl Component for OpenSearchComponent {
55 fn scheme(&self) -> &str {
56 "opensearch"
57 }
58
59 fn create_endpoint(
60 &self,
61 uri: &str,
62 _ctx: &dyn camel_component_api::ComponentContext,
63 ) -> Result<Box<dyn Endpoint>, CamelError> {
64 let mut config = OpenSearchEndpointConfig::from_uri(uri)?;
65 if let Some(ref global_cfg) = self.config {
67 config = config.merge_with_global(global_cfg);
68 }
69 Ok(Box::new(OpenSearchEndpoint {
70 uri: uri.to_string(),
71 config,
72 }))
73 }
74}
75
76pub struct OpenSearchsComponent {
82 inner: OpenSearchComponent,
83}
84
85impl OpenSearchsComponent {
86 pub fn new() -> Self {
87 Self {
88 inner: OpenSearchComponent::new(),
89 }
90 }
91
92 pub fn with_config(config: OpenSearchConfig) -> Self {
93 Self {
94 inner: OpenSearchComponent::with_config(config),
95 }
96 }
97}
98
99impl Default for OpenSearchsComponent {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl Component for OpenSearchsComponent {
106 fn scheme(&self) -> &str {
107 "opensearchs"
108 }
109
110 fn create_endpoint(
111 &self,
112 uri: &str,
113 ctx: &dyn camel_component_api::ComponentContext,
114 ) -> Result<Box<dyn Endpoint>, CamelError> {
115 self.inner.create_endpoint(uri, ctx)
116 }
117}
118
119struct OpenSearchEndpoint {
124 uri: String,
125 config: OpenSearchEndpointConfig,
126}
127
128impl Endpoint for OpenSearchEndpoint {
129 fn uri(&self) -> &str {
130 &self.uri
131 }
132
133 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
134 Ok(BoxProcessor::new(OpenSearchProducer::new(
135 self.config.clone(),
136 )))
137 }
138
139 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
140 Err(CamelError::EndpointCreationFailed(
141 "OpenSearch component does not support consumers".to_string(),
142 ))
143 }
144}
145
146#[cfg(test)]
151mod tests {
152 use super::*;
153 use camel_component_api::NoOpComponentContext;
154
155 #[test]
156 fn test_opensearch_component_scheme() {
157 let component = OpenSearchComponent::new();
158 assert_eq!(component.scheme(), "opensearch");
159 }
160
161 #[test]
162 fn test_opensearchs_component_scheme() {
163 let component = OpenSearchsComponent::new();
164 assert_eq!(component.scheme(), "opensearchs");
165 }
166
167 #[test]
168 fn test_opensearch_component_creates_endpoint() {
169 let component = OpenSearchComponent::new();
170 let ctx = NoOpComponentContext;
171 let endpoint = component
172 .create_endpoint("opensearch://localhost:9200/myindex?operation=INDEX", &ctx)
173 .expect("endpoint should be created");
174 assert_eq!(
175 endpoint.uri(),
176 "opensearch://localhost:9200/myindex?operation=INDEX"
177 );
178 }
179
180 #[test]
181 fn test_opensearchs_component_creates_endpoint() {
182 let component = OpenSearchsComponent::new();
183 let ctx = NoOpComponentContext;
184 let endpoint = component
185 .create_endpoint(
186 "opensearchs://localhost:9200/myindex?operation=SEARCH",
187 &ctx,
188 )
189 .expect("endpoint should be created");
190 assert_eq!(
191 endpoint.uri(),
192 "opensearchs://localhost:9200/myindex?operation=SEARCH"
193 );
194 }
195
196 #[test]
197 fn test_opensearch_component_rejects_wrong_scheme() {
198 let component = OpenSearchComponent::new();
199 let ctx = NoOpComponentContext;
200 let result = component.create_endpoint("kafka:topic?brokers=localhost:9092", &ctx);
201 assert!(result.is_err(), "wrong scheme should fail");
202 let err = result.err().expect("error must exist");
203 assert!(err.to_string().contains("expected scheme 'opensearch'"));
204 }
205
206 #[test]
207 fn test_opensearch_component_applies_global_defaults() {
208 let global = OpenSearchConfig::default()
209 .with_host("es-global")
210 .with_port(9300);
211 let component = OpenSearchComponent::with_config(global);
212 let ctx = NoOpComponentContext;
213
214 let endpoint = component
215 .create_endpoint("opensearch:///myindex?operation=SEARCH", &ctx)
216 .expect("endpoint should be created with defaults");
217
218 assert_eq!(endpoint.uri(), "opensearch:///myindex?operation=SEARCH");
221 }
222
223 #[test]
224 fn test_endpoint_create_consumer_returns_error() {
225 let component = OpenSearchComponent::new();
226 let ctx = NoOpComponentContext;
227 let endpoint = component
228 .create_endpoint("opensearch://localhost:9200/myindex?operation=INDEX", &ctx)
229 .expect("endpoint should be created");
230
231 let result = endpoint.create_consumer();
232 assert!(result.is_err(), "create_consumer should return an error");
233 let err = result.err().expect("error must exist");
234 assert!(
235 err.to_string().contains("does not support consumers"),
236 "expected consumer-not-supported error, got: {err}"
237 );
238 }
239}