1use camel_component_api::CamelError;
2use camel_component_api::parse_uri;
3use std::convert::TryFrom;
4use std::fmt;
5use std::str::FromStr;
6
7#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize)]
11#[serde(try_from = "String")]
12pub enum OpenSearchOperation {
13 INDEX,
14 SEARCH,
15 GET,
16 DELETE,
17 UPDATE,
18 BULK,
19 MULTIGET,
20 UNKNOWN(String),
21}
22
23impl FromStr for OpenSearchOperation {
24 type Err = CamelError;
25
26 fn from_str(s: &str) -> Result<Self, Self::Err> {
27 match s.to_uppercase().as_str() {
28 "INDEX" => Ok(OpenSearchOperation::INDEX),
29 "SEARCH" => Ok(OpenSearchOperation::SEARCH),
30 "GET" => Ok(OpenSearchOperation::GET),
31 "DELETE" => Ok(OpenSearchOperation::DELETE),
32 "UPDATE" => Ok(OpenSearchOperation::UPDATE),
33 "BULK" => Ok(OpenSearchOperation::BULK),
34 "MULTIGET" => Ok(OpenSearchOperation::MULTIGET),
35 other => Ok(OpenSearchOperation::UNKNOWN(other.to_string())),
36 }
37 }
38}
39
40impl fmt::Display for OpenSearchOperation {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 OpenSearchOperation::INDEX => write!(f, "INDEX"),
44 OpenSearchOperation::SEARCH => write!(f, "SEARCH"),
45 OpenSearchOperation::GET => write!(f, "GET"),
46 OpenSearchOperation::DELETE => write!(f, "DELETE"),
47 OpenSearchOperation::UPDATE => write!(f, "UPDATE"),
48 OpenSearchOperation::BULK => write!(f, "BULK"),
49 OpenSearchOperation::MULTIGET => write!(f, "MULTIGET"),
50 OpenSearchOperation::UNKNOWN(s) => write!(f, "{}", s),
51 }
52 }
53}
54
55impl TryFrom<String> for OpenSearchOperation {
56 type Error = String;
57
58 fn try_from(s: String) -> Result<Self, Self::Error> {
59 OpenSearchOperation::from_str(&s).map_err(|e| e.to_string())
60 }
61}
62
63#[derive(Clone, serde::Deserialize)]
70pub struct OpenSearchConfig {
71 #[serde(default = "OpenSearchConfig::default_host")]
72 pub host: String,
73 #[serde(default = "OpenSearchConfig::default_port")]
74 pub port: u16,
75 #[serde(default)]
76 pub username: Option<String>,
77 #[serde(default)]
78 pub password: Option<String>,
79 #[serde(default)]
80 pub default_operation: Option<OpenSearchOperation>,
81 #[serde(default)]
82 pub index_name: Option<String>,
83}
84
85impl OpenSearchConfig {
86 fn default_host() -> String {
87 "localhost".to_string()
88 }
89
90 fn default_port() -> u16 {
91 9200
92 }
93
94 pub fn with_host(mut self, v: impl Into<String>) -> Self {
95 self.host = v.into();
96 self
97 }
98
99 pub fn with_port(mut self, v: u16) -> Self {
100 self.port = v;
101 self
102 }
103
104 pub fn with_default_operation(mut self, v: OpenSearchOperation) -> Self {
105 self.default_operation = Some(v);
106 self
107 }
108
109 pub fn with_username(mut self, v: impl Into<String>) -> Self {
110 self.username = Some(v.into());
111 self
112 }
113
114 pub fn with_password(mut self, v: impl Into<String>) -> Self {
115 self.password = Some(v.into());
116 self
117 }
118}
119
120impl Default for OpenSearchConfig {
121 fn default() -> Self {
122 Self {
123 host: Self::default_host(),
124 port: Self::default_port(),
125 username: None,
126 password: None,
127 default_operation: None,
128 index_name: None,
129 }
130 }
131}
132
133impl fmt::Debug for OpenSearchConfig {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("OpenSearchConfig")
136 .field("host", &self.host)
137 .field("port", &self.port)
138 .field("username", &self.username)
139 .field("password", &self.password.as_ref().map(|_| "<redacted>"))
140 .field("default_operation", &self.default_operation)
141 .field("index_name", &self.index_name)
142 .finish()
143 }
144}
145
146#[derive(Clone)]
172pub struct OpenSearchEndpointConfig {
173 pub host: Option<String>,
175
176 pub port: Option<u16>,
178
179 pub username: Option<String>,
181
182 pub password: Option<String>,
184
185 pub index_name: String,
187
188 pub operation: OpenSearchOperation,
190
191 pub is_tls: bool,
193}
194
195impl fmt::Debug for OpenSearchEndpointConfig {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("OpenSearchEndpointConfig")
198 .field("host", &self.host)
199 .field("port", &self.port)
200 .field("username", &self.username)
201 .field("password", &self.password.as_ref().map(|_| "<redacted>"))
202 .field("index_name", &self.index_name)
203 .field("operation", &self.operation)
204 .field("is_tls", &self.is_tls)
205 .finish()
206 }
207}
208
209impl OpenSearchEndpointConfig {
210 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
211 let parts = parse_uri(uri)?;
212
213 let is_tls = parts.scheme == "opensearchs";
214
215 if parts.scheme != "opensearch" && parts.scheme != "opensearchs" {
216 return Err(CamelError::InvalidUri(format!(
217 "expected scheme 'opensearch' or 'opensearchs', got '{}'",
218 parts.scheme
219 )));
220 }
221
222 let (host, port, path_remainder) = if parts.path.starts_with("//") {
224 let path = &parts.path[2..]; if path.is_empty() {
226 (None, None, "")
228 } else {
229 let (authority, remainder) = match path.split_once('/') {
231 Some((auth, rem)) => (auth, rem),
232 None => (path, ""),
233 };
234
235 let (host_part, port_part) = match authority.split_once(':') {
236 Some((h, p)) => (h, Some(p)),
237 None => (authority, None),
238 };
239
240 let host = if host_part.is_empty() {
241 None
242 } else {
243 Some(host_part.to_string())
244 };
245 let port = port_part.and_then(|p| p.parse().ok());
246 (host, port, remainder)
247 }
248 } else {
249 (None, None, parts.path.as_str())
250 };
251
252 let index_name = path_remainder
254 .split('/')
255 .find(|s| !s.is_empty())
256 .ok_or_else(|| CamelError::InvalidUri("missing index name in URI path".to_string()))?
257 .to_string();
258
259 if index_name.contains('\0') {
261 return Err(CamelError::InvalidUri(
262 "index name must not contain null bytes".into(),
263 ));
264 }
265 if index_name.contains("..") {
266 return Err(CamelError::InvalidUri(
267 "index name must not contain '..'".into(),
268 ));
269 }
270 if !index_name
271 .chars()
272 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
273 {
274 return Err(CamelError::InvalidUri(
275 "index name must contain only lowercase letters, digits, hyphens, and underscores"
276 .into(),
277 ));
278 }
279 if index_name.len() > 255 {
280 return Err(CamelError::InvalidUri(
281 "index name must be at most 255 bytes".into(),
282 ));
283 }
284
285 let operation = parts
287 .params
288 .get("operation")
289 .map(|s| OpenSearchOperation::from_str(s))
290 .transpose()?
291 .unwrap_or(OpenSearchOperation::SEARCH);
292
293 let username = parts.params.get("username").cloned();
295 let password = parts.params.get("password").cloned();
296
297 Ok(Self {
298 host,
299 port,
300 username,
301 password,
302 index_name,
303 operation,
304 is_tls,
305 })
306 }
307
308 pub fn merge_with_global(&self, global: &OpenSearchConfig) -> Self {
314 let operation = match &self.operation {
315 OpenSearchOperation::UNKNOWN(_) => global
316 .default_operation
317 .clone()
318 .unwrap_or(OpenSearchOperation::SEARCH),
319 op => op.clone(),
320 };
321 Self {
322 host: self.host.clone().or_else(|| Some(global.host.clone())),
323 port: self.port.or(Some(global.port)),
324 username: self.username.clone().or_else(|| global.username.clone()),
325 password: self.password.clone().or_else(|| global.password.clone()),
326 index_name: if self.index_name.is_empty() {
327 global.index_name.clone().unwrap_or_default()
328 } else {
329 self.index_name.clone()
330 },
331 operation,
332 is_tls: self.is_tls,
333 }
334 }
335
336 pub fn base_url(&self) -> String {
341 let scheme = if self.is_tls { "https" } else { "http" };
342 let host = self.host.as_deref().unwrap_or("localhost");
343 let port = self.port.unwrap_or(9200);
344 format!("{}://{}:{}", scheme, host, port)
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
355 fn test_operation_from_str() {
356 assert_eq!(
357 OpenSearchOperation::from_str("INDEX").unwrap(),
358 OpenSearchOperation::INDEX
359 );
360 assert_eq!(
361 OpenSearchOperation::from_str("SEARCH").unwrap(),
362 OpenSearchOperation::SEARCH
363 );
364 assert_eq!(
365 OpenSearchOperation::from_str("GET").unwrap(),
366 OpenSearchOperation::GET
367 );
368 assert_eq!(
369 OpenSearchOperation::from_str("DELETE").unwrap(),
370 OpenSearchOperation::DELETE
371 );
372 assert_eq!(
373 OpenSearchOperation::from_str("UPDATE").unwrap(),
374 OpenSearchOperation::UPDATE
375 );
376 assert_eq!(
377 OpenSearchOperation::from_str("BULK").unwrap(),
378 OpenSearchOperation::BULK
379 );
380 assert_eq!(
381 OpenSearchOperation::from_str("MULTIGET").unwrap(),
382 OpenSearchOperation::MULTIGET
383 );
384 assert_eq!(
386 OpenSearchOperation::from_str("index").unwrap(),
387 OpenSearchOperation::INDEX
388 );
389 assert_eq!(
390 OpenSearchOperation::from_str("Search").unwrap(),
391 OpenSearchOperation::SEARCH
392 );
393 match OpenSearchOperation::from_str("CUSTOM_OP").unwrap() {
395 OpenSearchOperation::UNKNOWN(s) => assert_eq!(s, "CUSTOM_OP"),
396 other => panic!("expected UNKNOWN, got {:?}", other),
397 }
398 }
399
400 #[test]
401 fn test_operation_display() {
402 assert_eq!(OpenSearchOperation::INDEX.to_string(), "INDEX");
403 assert_eq!(OpenSearchOperation::SEARCH.to_string(), "SEARCH");
404 assert_eq!(OpenSearchOperation::GET.to_string(), "GET");
405 assert_eq!(OpenSearchOperation::DELETE.to_string(), "DELETE");
406 assert_eq!(OpenSearchOperation::UPDATE.to_string(), "UPDATE");
407 assert_eq!(OpenSearchOperation::BULK.to_string(), "BULK");
408 assert_eq!(OpenSearchOperation::MULTIGET.to_string(), "MULTIGET");
409 assert_eq!(
410 OpenSearchOperation::UNKNOWN("CUSTOM".to_string()).to_string(),
411 "CUSTOM"
412 );
413
414 for s in &[
416 "INDEX", "SEARCH", "GET", "DELETE", "UPDATE", "BULK", "MULTIGET",
417 ] {
418 let op = OpenSearchOperation::from_str(s).unwrap();
419 assert_eq!(op.to_string(), *s);
420 }
421 }
422
423 #[test]
426 fn test_endpoint_config_basic() {
427 let cfg = OpenSearchEndpointConfig::from_uri(
428 "opensearch://localhost:9200/myindex?operation=INDEX",
429 )
430 .unwrap();
431 assert_eq!(cfg.host, Some("localhost".to_string()));
432 assert_eq!(cfg.port, Some(9200));
433 assert_eq!(cfg.index_name, "myindex");
434 assert_eq!(cfg.operation, OpenSearchOperation::INDEX);
435 assert!(!cfg.is_tls);
436 }
437
438 #[test]
439 fn test_endpoint_config_https() {
440 let cfg = OpenSearchEndpointConfig::from_uri("opensearchs://host:443/idx?operation=SEARCH")
441 .unwrap();
442 assert_eq!(cfg.host, Some("host".to_string()));
443 assert_eq!(cfg.port, Some(443));
444 assert_eq!(cfg.index_name, "idx");
445 assert_eq!(cfg.operation, OpenSearchOperation::SEARCH);
446 assert!(cfg.is_tls);
447 }
448
449 #[test]
450 fn test_endpoint_config_defaults() {
451 let cfg =
453 OpenSearchEndpointConfig::from_uri("opensearch://localhost:9200/myindex").unwrap();
454 assert_eq!(cfg.operation, OpenSearchOperation::SEARCH);
455 }
456
457 #[test]
458 fn test_endpoint_config_with_auth() {
459 let cfg = OpenSearchEndpointConfig::from_uri(
460 "opensearch://localhost:9200/myindex?operation=GET&username=admin&password=secret",
461 )
462 .unwrap();
463 assert_eq!(cfg.username, Some("admin".to_string()));
464 assert_eq!(cfg.password, Some("secret".to_string()));
465 assert_eq!(cfg.operation, OpenSearchOperation::GET);
466 }
467
468 #[test]
469 fn test_endpoint_config_host_only_no_port() {
470 let cfg =
471 OpenSearchEndpointConfig::from_uri("opensearch://localhost/myindex?operation=GET")
472 .unwrap();
473 assert_eq!(cfg.host, Some("localhost".to_string()));
474 assert_eq!(cfg.port, None);
475 assert_eq!(cfg.operation, OpenSearchOperation::GET);
476 }
477
478 #[test]
479 fn test_endpoint_config_wrong_scheme() {
480 let result = OpenSearchEndpointConfig::from_uri("http://localhost:9200/myindex");
481 assert!(result.is_err());
482 }
483
484 #[test]
485 fn test_endpoint_config_missing_index() {
486 let result = OpenSearchEndpointConfig::from_uri("opensearch://localhost:9200");
487 assert!(result.is_err());
488 }
489
490 #[test]
493 fn test_merge_with_global() {
494 let ep = OpenSearchEndpointConfig::from_uri("opensearch://localhost/myindex?operation=GET")
495 .unwrap();
496 assert_eq!(ep.host, Some("localhost".to_string()));
497 assert_eq!(ep.port, None);
498 assert_eq!(ep.username, None);
499 assert_eq!(ep.password, None);
500
501 let global = OpenSearchConfig::default()
502 .with_port(9200)
503 .with_host("global-host")
504 .with_default_operation(OpenSearchOperation::SEARCH);
505
506 let merged = ep.merge_with_global(&global);
507
508 assert_eq!(merged.host, Some("localhost".to_string()));
509 assert_eq!(merged.port, Some(9200));
510 assert_eq!(merged.username, None);
511 assert_eq!(merged.password, None);
512 assert_eq!(merged.operation, OpenSearchOperation::GET);
513 }
514
515 #[test]
516 fn test_merge_with_global_fills_all_nones() {
517 let ep =
518 OpenSearchEndpointConfig::from_uri("opensearch:///myindex?operation=SEARCH").unwrap();
519 assert_eq!(ep.host, None);
520 assert_eq!(ep.port, None);
521 assert_eq!(ep.index_name, "myindex");
522
523 let global = OpenSearchConfig::default()
524 .with_host("es-server")
525 .with_port(9300)
526 .with_username("admin")
527 .with_password("secret");
528
529 let merged = ep.merge_with_global(&global);
530 assert_eq!(merged.host, Some("es-server".to_string()));
531 assert_eq!(merged.port, Some(9300));
532 assert_eq!(merged.username, Some("admin".to_string()));
533 assert_eq!(merged.password, Some("secret".to_string()));
534 }
535
536 #[test]
537 fn test_merge_with_global_default_operation_fallback() {
538 let ep = OpenSearchEndpointConfig::from_uri(
539 "opensearch://localhost:9200/myindex?operation=UNKNOWN_OP",
540 )
541 .unwrap();
542 assert!(matches!(ep.operation, OpenSearchOperation::UNKNOWN(_)));
543
544 let global = OpenSearchConfig::default().with_default_operation(OpenSearchOperation::INDEX);
545
546 let merged = ep.merge_with_global(&global);
547 assert_eq!(merged.operation, OpenSearchOperation::INDEX);
548 }
549
550 #[test]
553 fn test_base_url_http() {
554 let cfg = OpenSearchEndpointConfig::from_uri(
555 "opensearch://es-host:9200/myindex?operation=SEARCH",
556 )
557 .unwrap();
558 assert_eq!(cfg.base_url(), "http://es-host:9200");
559 }
560
561 #[test]
562 fn test_base_url_https() {
563 let cfg = OpenSearchEndpointConfig::from_uri(
564 "opensearchs://es-host:443/myindex?operation=SEARCH",
565 )
566 .unwrap();
567 assert_eq!(cfg.base_url(), "https://es-host:443");
568 }
569
570 #[test]
571 fn test_base_url_defaults() {
572 let cfg =
574 OpenSearchEndpointConfig::from_uri("opensearch:///myindex?operation=SEARCH").unwrap();
575 assert_eq!(cfg.host, None);
576 assert_eq!(cfg.port, None);
577 assert_eq!(cfg.base_url(), "http://localhost:9200");
578 }
579
580 #[test]
583 fn test_config_defaults() {
584 let cfg = OpenSearchConfig::default();
585 assert_eq!(cfg.host, "localhost");
586 assert_eq!(cfg.port, 9200);
587 assert!(cfg.username.is_none());
588 assert!(cfg.password.is_none());
589 assert!(cfg.default_operation.is_none());
590 assert!(cfg.index_name.is_none());
591 }
592
593 #[test]
594 fn test_config_builder() {
595 let cfg = OpenSearchConfig::default()
596 .with_host("es-prod")
597 .with_port(9200)
598 .with_default_operation(OpenSearchOperation::BULK)
599 .with_username("admin")
600 .with_password("secret");
601 assert_eq!(cfg.host, "es-prod");
602 assert_eq!(cfg.port, 9200);
603 assert_eq!(cfg.default_operation, Some(OpenSearchOperation::BULK));
604 assert_eq!(cfg.username, Some("admin".to_string()));
605 assert_eq!(cfg.password, Some("secret".to_string()));
606 }
607
608 #[test]
609 fn test_opensearch_config_debug_redacts_password() {
610 let cfg = OpenSearchConfig::default()
611 .with_host("es-prod")
612 .with_password("hunter2");
613 let debug_output = format!("{:?}", cfg);
614 assert!(
615 !debug_output.contains("hunter2"),
616 "debug output must not contain the real password: {}",
617 debug_output
618 );
619 assert!(
620 debug_output.contains("<redacted>"),
621 "debug output must contain <redacted>: {}",
622 debug_output
623 );
624 }
625
626 #[test]
627 fn test_opensearch_endpoint_config_debug_redacts_password() {
628 let cfg = OpenSearchEndpointConfig::from_uri(
629 "opensearch://localhost:9200/myindex?operation=GET&username=admin&password=hunter2",
630 )
631 .unwrap();
632 let debug_output = format!("{:?}", cfg);
633 assert!(
634 !debug_output.contains("hunter2"),
635 "debug output must not contain the real password: {}",
636 debug_output
637 );
638 assert!(
639 debug_output.contains("<redacted>"),
640 "debug output must contain <redacted>: {}",
641 debug_output
642 );
643 }
644
645 #[test]
646 fn test_opensearch_config_debug_no_password_shows_none() {
647 let cfg = OpenSearchConfig::default();
648 let debug_output = format!("{:?}", cfg);
649 assert!(
650 !debug_output.contains("<redacted>"),
651 "debug output must not contain <redacted> when password is None: {}",
652 debug_output
653 );
654 }
655
656 #[test]
657 fn test_opensearch_endpoint_config_debug_no_password_shows_none() {
658 let cfg =
659 OpenSearchEndpointConfig::from_uri("opensearch://localhost:9200/myindex?operation=GET")
660 .unwrap();
661 let debug_output = format!("{:?}", cfg);
662 assert!(
663 !debug_output.contains("<redacted>"),
664 "debug output must not contain <redacted> when password is None: {}",
665 debug_output
666 );
667 }
668
669 #[test]
670 fn test_index_name_null_bytes_rejected() {
671 let result = OpenSearchEndpointConfig::from_uri(
672 "opensearch://localhost:9200/my%00index?operation=SEARCH",
673 );
674 assert!(result.is_err());
675 }
676
677 #[test]
678 fn test_index_name_dotdot_rejected() {
679 let result = OpenSearchEndpointConfig::from_uri(
680 "opensearch://localhost:9200/my..index?operation=SEARCH",
681 );
682 assert!(result.is_err());
683 }
684
685 #[test]
686 fn test_index_name_uppercase_rejected() {
687 let result = OpenSearchEndpointConfig::from_uri(
688 "opensearch://localhost:9200/MyIndex?operation=SEARCH",
689 );
690 assert!(result.is_err());
691 }
692
693 #[test]
694 fn test_index_name_special_chars_rejected() {
695 let result = OpenSearchEndpointConfig::from_uri(
696 "opensearch://localhost:9200/my@index?operation=SEARCH",
697 );
698 assert!(result.is_err());
699 }
700
701 #[test]
702 fn test_index_name_valid_lowercase_with_digits_hyphens_underscores() {
703 let cfg = OpenSearchEndpointConfig::from_uri(
704 "opensearch://localhost:9200/my-index_01?operation=SEARCH",
705 )
706 .unwrap();
707 assert_eq!(cfg.index_name, "my-index_01");
708 }
709}