Skip to main content

camel_component_opensearch/
lib.rs

1//! OpenSearch/Elasticsearch component for rust-camel — indexes, searches, and
2//! manages documents with support for CRUD, bulk, and multi-get operations.
3//!
4//! Main types: `OpenSearchBundle`, `OpenSearchComponent`, `OpenSearchConfig`,
5//! `OpenSearchProducer`. Main modules: `bundle`, `config`, `producer`.
6
7pub mod bundle;
8pub mod config;
9pub mod producer;
10
11use camel_component_api::{BoxProcessor, CamelError};
12use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
13
14pub use bundle::OpenSearchBundle;
15pub use config::{OpenSearchConfig, OpenSearchEndpointConfig, OpenSearchOperation};
16pub use producer::OpenSearchProducer;
17
18// ---------------------------------------------------------------------------
19// OpenSearchComponent — handles the `opensearch://` scheme (HTTP)
20// ---------------------------------------------------------------------------
21
22pub struct OpenSearchComponent {
23    config: Option<OpenSearchConfig>,
24}
25
26impl OpenSearchComponent {
27    /// Create a new OpenSearchComponent without global config defaults.
28    /// Endpoint configs will fall back to hardcoded defaults via `merge_with_global()`.
29    pub fn new() -> Self {
30        Self { config: None }
31    }
32
33    /// Create an OpenSearchComponent with global config defaults.
34    /// These will be applied to endpoint configs when specific values aren't provided.
35    pub fn with_config(config: OpenSearchConfig) -> Self {
36        Self {
37            config: Some(config),
38        }
39    }
40
41    /// Create an OpenSearchComponent with optional global config defaults.
42    /// If `None`, behaves like `new()` (uses hardcoded defaults only).
43    pub fn with_optional_config(config: Option<OpenSearchConfig>) -> Self {
44        Self { config }
45    }
46}
47
48impl Default for OpenSearchComponent {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl Component for OpenSearchComponent {
55    fn scheme(&self) -> &str {
56        "opensearch"
57    }
58
59    fn create_endpoint(
60        &self,
61        uri: &str,
62        _ctx: &dyn camel_component_api::ComponentContext,
63    ) -> Result<Box<dyn Endpoint>, CamelError> {
64        let mut config = OpenSearchEndpointConfig::from_uri(uri)?;
65        // Apply global config defaults if available
66        if let Some(ref global_cfg) = self.config {
67            config = config.merge_with_global(global_cfg);
68        }
69        Ok(Box::new(OpenSearchEndpoint {
70            uri: uri.to_string(),
71            config,
72        }))
73    }
74}
75
76// ---------------------------------------------------------------------------
77// OpenSearchsComponent — handles the `opensearchs://` scheme (HTTPS/TLS)
78// Delegates to OpenSearchComponent internally.
79// ---------------------------------------------------------------------------
80
81pub struct OpenSearchsComponent {
82    inner: OpenSearchComponent,
83}
84
85impl OpenSearchsComponent {
86    pub fn new() -> Self {
87        Self {
88            inner: OpenSearchComponent::new(),
89        }
90    }
91
92    pub fn with_config(config: OpenSearchConfig) -> Self {
93        Self {
94            inner: OpenSearchComponent::with_config(config),
95        }
96    }
97}
98
99impl Default for OpenSearchsComponent {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl Component for OpenSearchsComponent {
106    fn scheme(&self) -> &str {
107        "opensearchs"
108    }
109
110    fn create_endpoint(
111        &self,
112        uri: &str,
113        ctx: &dyn camel_component_api::ComponentContext,
114    ) -> Result<Box<dyn Endpoint>, CamelError> {
115        self.inner.create_endpoint(uri, ctx)
116    }
117}
118
119// ---------------------------------------------------------------------------
120// OpenSearchEndpoint — producer-only. create_consumer() returns an error.
121// ---------------------------------------------------------------------------
122
123struct OpenSearchEndpoint {
124    uri: String,
125    config: OpenSearchEndpointConfig,
126}
127
128impl Endpoint for OpenSearchEndpoint {
129    fn uri(&self) -> &str {
130        &self.uri
131    }
132
133    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
134        Ok(BoxProcessor::new(OpenSearchProducer::new(
135            self.config.clone(),
136        )))
137    }
138
139    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
140        Err(CamelError::EndpointCreationFailed(
141            "OpenSearch component does not support consumers".to_string(),
142        ))
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Tests
148// ---------------------------------------------------------------------------
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use camel_component_api::NoOpComponentContext;
154
155    #[test]
156    fn test_opensearch_component_scheme() {
157        let component = OpenSearchComponent::new();
158        assert_eq!(component.scheme(), "opensearch");
159    }
160
161    #[test]
162    fn test_opensearchs_component_scheme() {
163        let component = OpenSearchsComponent::new();
164        assert_eq!(component.scheme(), "opensearchs");
165    }
166
167    #[test]
168    fn test_opensearch_component_creates_endpoint() {
169        let component = OpenSearchComponent::new();
170        let ctx = NoOpComponentContext;
171        let endpoint = component
172            .create_endpoint("opensearch://localhost:9200/myindex?operation=INDEX", &ctx)
173            .expect("endpoint should be created");
174        assert_eq!(
175            endpoint.uri(),
176            "opensearch://localhost:9200/myindex?operation=INDEX"
177        );
178    }
179
180    #[test]
181    fn test_opensearchs_component_creates_endpoint() {
182        let component = OpenSearchsComponent::new();
183        let ctx = NoOpComponentContext;
184        let endpoint = component
185            .create_endpoint(
186                "opensearchs://localhost:9200/myindex?operation=SEARCH",
187                &ctx,
188            )
189            .expect("endpoint should be created");
190        assert_eq!(
191            endpoint.uri(),
192            "opensearchs://localhost:9200/myindex?operation=SEARCH"
193        );
194    }
195
196    #[test]
197    fn test_opensearch_component_rejects_wrong_scheme() {
198        let component = OpenSearchComponent::new();
199        let ctx = NoOpComponentContext;
200        let result = component.create_endpoint("kafka:topic?brokers=localhost:9092", &ctx);
201        assert!(result.is_err(), "wrong scheme should fail");
202        let err = result.err().expect("error must exist");
203        assert!(err.to_string().contains("expected scheme 'opensearch'"));
204    }
205
206    #[test]
207    fn test_opensearch_component_applies_global_defaults() {
208        let global = OpenSearchConfig::default()
209            .with_host("es-global")
210            .with_port(9300);
211        let component = OpenSearchComponent::with_config(global);
212        let ctx = NoOpComponentContext;
213
214        let endpoint = component
215            .create_endpoint("opensearch:///myindex?operation=SEARCH", &ctx)
216            .expect("endpoint should be created with defaults");
217
218        // Verify the endpoint was created — producer creation will fail until
219        // Task 2 (producer) is complete, but the endpoint config should have defaults
220        assert_eq!(endpoint.uri(), "opensearch:///myindex?operation=SEARCH");
221    }
222
223    #[test]
224    fn test_endpoint_create_consumer_returns_error() {
225        let component = OpenSearchComponent::new();
226        let ctx = NoOpComponentContext;
227        let endpoint = component
228            .create_endpoint("opensearch://localhost:9200/myindex?operation=INDEX", &ctx)
229            .expect("endpoint should be created");
230
231        let result = endpoint.create_consumer();
232        assert!(result.is_err(), "create_consumer should return an error");
233        let err = result.err().expect("error must exist");
234        assert!(
235            err.to_string().contains("does not support consumers"),
236            "expected consumer-not-supported error, got: {err}"
237        );
238    }
239}