1use std::collections::HashMap;
2use std::fmt;
3use std::str::FromStr;
4use std::sync::Arc;
5
6use camel_api::error::CamelError;
7use camel_auth::oauth2::TokenProvider;
8use camel_component_api::NetworkRetryPolicy;
9use serde::Deserialize;
10use serde::de::{self, MapAccess, Visitor};
11
12#[derive(Debug, Clone, Default, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub struct TlsConfig {
23 #[serde(default)]
24 pub tls_enabled: bool,
25 pub ca_cert_path: Option<String>,
26 pub client_cert_path: Option<String>,
27 pub client_key_path: Option<String>,
28 #[serde(default)]
29 pub insecure_skip_verify: bool,
30}
31
32#[derive(Default)]
36#[non_exhaustive]
37pub enum AuthConfig {
38 #[default]
40 None,
41 Bearer { token: String },
43 GoogleServiceAccount { json_path: String },
47 OAuth2 {
49 token_provider: Arc<dyn TokenProvider>,
50 },
51}
52
53impl<'de> Deserialize<'de> for AuthConfig {
54 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
55 where
56 D: de::Deserializer<'de>,
57 {
58 struct AuthConfigVisitor;
59
60 impl<'de> Visitor<'de> for AuthConfigVisitor {
61 type Value = AuthConfig;
62
63 fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
64 formatter.write_str("an AuthConfig variant")
65 }
66
67 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
68 where
69 E: de::Error,
70 {
71 match v {
72 "None" => Ok(AuthConfig::None),
73 "OAuth2" => Err(de::Error::custom(
74 "OAuth2 variant cannot be deserialized; construct programmatically",
75 )),
76 other => Err(de::Error::unknown_variant(
77 other,
78 &["None", "Bearer", "GoogleServiceAccount", "OAuth2"],
79 )),
80 }
81 }
82
83 fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
84 where
85 M: MapAccess<'de>,
86 {
87 let variant = map
88 .next_key::<String>()?
89 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
90 match variant.as_str() {
91 "None" => Ok(AuthConfig::None),
92 "Bearer" => {
93 let mut token = None;
94 while let Some(key) = map.next_key::<String>()? {
95 if key == "token" {
96 token = Some(map.next_value()?);
97 } else {
98 let _: de::IgnoredAny = map.next_value()?;
99 }
100 }
101 let token = token.ok_or_else(|| de::Error::missing_field("token"))?;
102 Ok(AuthConfig::Bearer { token })
103 }
104 "GoogleServiceAccount" => {
105 let mut json_path = None;
106 while let Some(key) = map.next_key::<String>()? {
107 if key == "json_path" {
108 json_path = Some(map.next_value()?);
109 } else {
110 let _: de::IgnoredAny = map.next_value()?;
111 }
112 }
113 let json_path =
114 json_path.ok_or_else(|| de::Error::missing_field("json_path"))?;
115 Ok(AuthConfig::GoogleServiceAccount { json_path })
116 }
117 "OAuth2" => Err(de::Error::custom(
118 "OAuth2 variant cannot be deserialized; construct programmatically",
119 )),
120 other => Err(de::Error::unknown_variant(
121 other,
122 &["None", "Bearer", "GoogleServiceAccount"],
123 )),
124 }
125 }
126 }
127
128 deserializer.deserialize_any(AuthConfigVisitor)
129 }
130}
131
132impl fmt::Debug for AuthConfig {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 match self {
135 AuthConfig::None => write!(f, "None"), AuthConfig::Bearer { .. } => write!(f, "Bearer {{ token: \"[REDACTED]\" }}"), AuthConfig::GoogleServiceAccount { .. } => {
138 write!(f, "GoogleServiceAccount {{ json_path: \"[REDACTED]\" }}") }
140 AuthConfig::OAuth2 { .. } => write!(f, "OAuth2 {{ token_provider: \"[REDACTED]\" }}"), }
142 }
143}
144
145impl Clone for AuthConfig {
146 fn clone(&self) -> Self {
147 match self {
148 AuthConfig::None => AuthConfig::None,
149 AuthConfig::Bearer { token } => AuthConfig::Bearer {
150 token: token.clone(),
151 },
152 AuthConfig::GoogleServiceAccount { json_path } => AuthConfig::GoogleServiceAccount {
153 json_path: json_path.clone(),
154 },
155 AuthConfig::OAuth2 { token_provider } => AuthConfig::OAuth2 {
156 token_provider: Arc::clone(token_provider),
157 },
158 }
159 }
160}
161
162#[derive(Debug, Clone, Default, Deserialize)]
171#[non_exhaustive]
172pub struct InterceptorConfig {
173 #[serde(default)]
174 pub interceptors: Vec<String>,
175}
176
177#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
181#[serde(rename_all = "camelCase")]
182pub enum ConsumerStrategy {
183 #[default]
184 RoundRobin,
185 First,
186 Last,
187}
188
189impl fmt::Display for ConsumerStrategy {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 match self {
192 ConsumerStrategy::RoundRobin => write!(f, "roundRobin"),
193 ConsumerStrategy::First => write!(f, "first"),
194 ConsumerStrategy::Last => write!(f, "last"),
195 }
196 }
197}
198
199impl FromStr for ConsumerStrategy {
200 type Err = CamelError;
201
202 fn from_str(s: &str) -> Result<Self, Self::Err> {
203 match s {
204 "roundRobin" | "round_robin" => Ok(Self::RoundRobin),
205 "first" => Ok(Self::First),
206 "last" => Ok(Self::Last),
207 _ => Err(CamelError::Config(format!("invalid ConsumerStrategy: {s}"))),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
214#[serde(rename_all = "camelCase")]
215pub enum ProducerStrategy {
216 FireAndForget,
217 #[default]
218 RequestReply,
219}
220
221impl fmt::Display for ProducerStrategy {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 ProducerStrategy::FireAndForget => write!(f, "fireAndForget"),
225 ProducerStrategy::RequestReply => write!(f, "requestReply"),
226 }
227 }
228}
229
230impl FromStr for ProducerStrategy {
231 type Err = CamelError;
232
233 fn from_str(s: &str) -> Result<Self, Self::Err> {
234 match s {
235 "fireAndForget" | "fire_and_forget" => Ok(Self::FireAndForget),
236 "requestReply" | "request_reply" => Ok(Self::RequestReply),
237 _ => Err(CamelError::Config(format!("invalid ProducerStrategy: {s}"))),
238 }
239 }
240}
241
242#[derive(Clone, Deserialize)]
245pub struct GrpcConfig {
246 #[serde(rename = "protoFile")]
247 pub proto_file: Option<String>,
248 pub service: Option<String>,
249 pub method: Option<String>,
250 #[serde(default)]
251 pub reflection: bool,
252 #[serde(default)]
253 pub tls: bool,
254 #[serde(default = "default_max_msg_len")]
255 pub max_receive_message_length: usize,
256 pub deadline_ms: Option<u64>,
257 pub metadata: Option<String>,
258
259 #[serde(default)]
261 pub tls_config: Option<TlsConfig>,
262
263 #[serde(default)]
265 pub auth: AuthConfig,
266
267 #[serde(default)]
269 pub interceptors: InterceptorConfig,
270
271 #[serde(default)]
273 pub consumer_strategy: ConsumerStrategy,
274 #[serde(default)]
275 pub producer_strategy: ProducerStrategy,
276
277 #[serde(default)]
284 pub retry: NetworkRetryPolicy,
285}
286
287impl fmt::Debug for GrpcConfig {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 f.debug_struct("GrpcConfig")
291 .field("proto_file", &self.proto_file)
292 .field("service", &self.service)
293 .field("method", &self.method)
294 .field("reflection", &self.reflection)
295 .field("tls", &self.tls)
296 .field(
297 "max_receive_message_length",
298 &self.max_receive_message_length,
299 )
300 .field("deadline_ms", &self.deadline_ms)
301 .field("metadata", &self.metadata.as_ref().map(|_| "[REDACTED]"))
302 .field("tls_config", &self.tls_config)
303 .field("auth", &self.auth)
304 .field("interceptors", &self.interceptors)
305 .field("consumer_strategy", &self.consumer_strategy)
306 .field("producer_strategy", &self.producer_strategy)
307 .field("retry", &self.retry)
308 .finish()
309 }
310}
311
312fn default_max_msg_len() -> usize {
313 4 * 1024 * 1024
314}
315
316#[derive(Debug, Clone, Default)]
318pub struct GrpcServerConfig {
319 pub max_receive_message_len: Option<usize>,
321}
322
323fn parse_bool_param(val: &str) -> Result<bool, CamelError> {
326 match val.to_ascii_lowercase().as_str() {
327 "true" | "1" | "yes" => Ok(true),
328 "false" | "0" | "no" => Ok(false),
329 _ => Err(CamelError::Config(format!("invalid bool value: {val}"))),
330 }
331}
332
333fn parse_numeric_param<T: std::str::FromStr>(val: &str, field: &str) -> Result<T, CamelError>
334where
335 T::Err: std::fmt::Display,
336{
337 val.parse::<T>()
338 .map_err(|e| CamelError::Config(format!("invalid numeric value for {field}: {val} ({e})")))
339}
340
341fn parse_grpc_query_params(
344 pairs: impl Iterator<Item = (String, String)>,
345) -> Result<GrpcConfig, CamelError> {
346 let mut map: HashMap<String, String> = HashMap::new();
347 for (k, v) in pairs {
348 map.insert(k, v);
349 }
350
351 let proto_file = map.remove("protoFile");
352 let service = map.remove("service");
353 let method = map.remove("method");
354 let metadata = map.remove("metadata");
355
356 let reflection = map
357 .remove("reflection")
358 .map(|v| parse_bool_param(&v))
359 .transpose()?
360 .unwrap_or(false);
361
362 let tls = map
363 .remove("tls")
364 .map(|v| parse_bool_param(&v))
365 .transpose()?
366 .unwrap_or(false);
367
368 let max_receive_message_length = map
369 .remove("max_receive_message_length")
370 .map(|v| parse_numeric_param(&v, "max_receive_message_length"))
371 .transpose()?
372 .unwrap_or_else(default_max_msg_len);
373
374 let deadline_ms = map
375 .remove("deadline_ms")
376 .map(|v| parse_numeric_param(&v, "deadline_ms"))
377 .transpose()?;
378
379 let auth = if let Some(token) = map.remove("bearerToken") {
381 AuthConfig::Bearer { token }
382 } else if let Some(json_path) = map.remove("googleServiceAccount") {
383 AuthConfig::GoogleServiceAccount { json_path }
384 } else {
385 AuthConfig::None
386 };
387
388 let consumer_strategy = map
390 .remove("consumerStrategy")
391 .map(|v| ConsumerStrategy::from_str(&v))
392 .transpose()?
393 .unwrap_or_default();
394
395 let producer_strategy = map
396 .remove("producerStrategy")
397 .map(|v| ProducerStrategy::from_str(&v))
398 .transpose()?
399 .unwrap_or_default();
400
401 for (k, v) in &map {
403 tracing::warn!("unrecognized gRPC URI parameter '{k}={v}' — ignored");
404 }
405
406 Ok(GrpcConfig {
407 proto_file,
408 service,
409 method,
410 reflection,
411 tls,
412 max_receive_message_length,
413 deadline_ms,
414 metadata,
415 tls_config: None,
416 auth,
417 interceptors: InterceptorConfig::default(),
418 consumer_strategy,
419 producer_strategy,
420 retry: NetworkRetryPolicy::default(),
421 })
422}
423
424pub fn parse_grpc_uri(uri: &str) -> Result<(String, u16, String, String, GrpcConfig), CamelError> {
425 let parsed = url::Url::parse(uri).map_err(|e| CamelError::RouteError(e.to_string()))?;
426 let host = parsed
427 .host_str()
428 .ok_or_else(|| CamelError::RouteError("missing host".to_string()))?
429 .to_string();
430 let port = parsed
431 .port()
432 .ok_or_else(|| CamelError::RouteError("missing port".to_string()))?;
433 let path = parsed.path().trim_start_matches('/');
434 let (service, method) = path.rsplit_once('/').ok_or_else(|| {
435 CamelError::RouteError("URI path must be package.Service/Method".to_string())
436 })?;
437 let config = parse_grpc_query_params(
438 parsed
439 .query_pairs()
440 .map(|(k, v)| (k.to_string(), v.to_string())),
441 )?;
442 if let Some(ref proto) = config.proto_file
443 && (proto.starts_with('/') || proto.contains(".."))
444 {
445 return Err(CamelError::RouteError(format!(
446 "proto path '{}' must be relative and cannot contain '..'",
447 proto
448 )));
449 }
450 if config.reflection {
451 tracing::warn!("gRPC reflection is not supported in v1 — parameter ignored");
452 }
453 if config.tls {
454 tracing::warn!("gRPC TLS is not supported in v1 — parameter ignored");
455 }
456 Ok((host, port, service.to_string(), method.to_string(), config))
457}
458
459pub fn apply_config_metadata<T>(config: &GrpcConfig, request: &mut tonic::Request<T>) {
464 if let Some(ref metadata_str) = config.metadata {
465 for pair in metadata_str.split(',') {
466 let pair = pair.trim();
467 if let Some((key, value)) = pair.split_once('=') {
468 let key = key.trim();
469 let value = value.trim();
470 if let Ok(name) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes())
471 && let Ok(meta_val) = tonic::metadata::MetadataValue::try_from(value)
472 {
473 request.metadata_mut().insert(name, meta_val);
474 tracing::debug!(key = key, "applied config metadata to gRPC request");
475 }
476 }
477 }
478 }
479}
480
481pub async fn apply_auth_metadata<T>(
487 auth: &AuthConfig,
488 request: &mut tonic::Request<T>,
489) -> Result<(), camel_api::CamelError> {
490 match auth {
491 AuthConfig::Bearer { token } => {
492 if let Ok(name) = tonic::metadata::MetadataKey::from_bytes("authorization".as_bytes()) {
493 let value = format!("Bearer {token}"); if let Ok(meta_val) = tonic::metadata::MetadataValue::try_from(value.as_str()) {
495 request.metadata_mut().insert(name, meta_val);
496 tracing::debug!("applied bearer auth to gRPC request");
497 } else {
498 return Err(camel_api::CamelError::ProcessorError(
499 "bearer token contains invalid characters".into(),
500 ));
501 }
502 }
503 }
504 AuthConfig::OAuth2 { token_provider } => {
505 let token = token_provider.get_token().await.map_err(|e| {
506 let message = format!("failed to acquire OAuth2 token for gRPC producer: {e}"); camel_api::CamelError::ProcessorError(message)
508 })?;
509 if let Ok(name) = tonic::metadata::MetadataKey::from_bytes("authorization".as_bytes()) {
510 let value = format!("Bearer {token}"); if let Ok(meta_val) = tonic::metadata::MetadataValue::try_from(value.as_str()) {
512 request.metadata_mut().insert(name, meta_val);
513 }
514 }
515 }
516 AuthConfig::None | AuthConfig::GoogleServiceAccount { .. } => {}
517 }
518 Ok(())
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524
525 #[test]
526 fn test_parse_grpc_uri_valid() {
527 let uri = "grpc://localhost:50051/com.example.MyService/MyMethod";
528 let (host, port, service, method, config) = parse_grpc_uri(uri).unwrap();
529 assert_eq!(host, "localhost");
530 assert_eq!(port, 50051);
531 assert_eq!(service, "com.example.MyService");
532 assert_eq!(method, "MyMethod");
533 assert_eq!(config.max_receive_message_length, 4 * 1024 * 1024);
534 assert!(!config.reflection);
535 assert!(!config.tls);
536 }
537
538 #[test]
539 fn test_parse_grpc_uri_bool_query_params_case_insensitive() {
540 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=true";
541 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
542 assert!(config.reflection);
543
544 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=TRUE";
545 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
546 assert!(config.reflection);
547
548 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=1";
549 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
550 assert!(config.reflection);
551
552 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=yes";
553 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
554 assert!(config.reflection);
555
556 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=false";
557 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
558 assert!(!config.reflection);
559
560 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=0";
561 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
562 assert!(!config.reflection);
563
564 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=no";
565 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
566 assert!(!config.reflection);
567 }
568
569 #[test]
570 fn test_parse_grpc_uri_bool_query_params_invalid() {
571 let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=maybe";
572 let result = parse_grpc_uri(uri);
573 assert!(result.is_err());
574 assert!(
575 result
576 .unwrap_err()
577 .to_string()
578 .contains("invalid bool value")
579 );
580 }
581
582 #[test]
583 fn test_parse_grpc_uri_with_proto_file() {
584 let uri = "grpc://localhost:50051/pkg.Svc/Method?protoFile=my.proto";
585 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
586 assert_eq!(config.proto_file, Some("my.proto".to_string()));
587 }
588
589 #[test]
590 fn test_parse_grpc_uri_numeric_query_params_work() {
591 let uri = "grpc://localhost:50051/pkg.Svc/Method?max_receive_message_length=8388608";
592 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
593 assert_eq!(config.max_receive_message_length, 8388608);
594
595 let uri = "grpc://localhost:50051/pkg.Svc/Method?deadline_ms=5000";
596 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
597 assert_eq!(config.deadline_ms, Some(5000));
598 }
599
600 #[test]
601 fn test_parse_grpc_uri_numeric_query_params_invalid() {
602 let uri = "grpc://localhost:50051/pkg.Svc/Method?deadline_ms=notanumber";
603 let result = parse_grpc_uri(uri);
604 assert!(result.is_err());
605 assert!(
606 result
607 .unwrap_err()
608 .to_string()
609 .contains("invalid numeric value")
610 );
611 }
612
613 #[test]
614 fn test_parse_grpc_uri_with_metadata() {
615 let uri = "grpc://localhost:50051/pkg.Svc/Method?metadata=some-value";
616 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
617 assert_eq!(config.metadata, Some("some-value".to_string()));
618 }
619
620 #[test]
621 fn test_parse_grpc_uri_invalid_uri() {
622 let result = parse_grpc_uri("not-a-valid-uri");
623 assert!(result.is_err());
624 assert!(result.unwrap_err().to_string().contains("relative URL"));
625 }
626
627 #[test]
628 fn test_parse_grpc_uri_missing_host() {
629 let result = parse_grpc_uri("grpc:/pkg.Svc/Method");
630 assert!(result.is_err());
631 let err = result.unwrap_err().to_string();
632 assert!(err.contains("missing host") || err.contains("empty host"));
633 }
634
635 #[test]
636 fn test_parse_grpc_uri_missing_port() {
637 let result = parse_grpc_uri("grpc://localhost/pkg.Svc/Method");
638 assert!(result.is_err());
639 assert!(result.unwrap_err().to_string().contains("missing port"));
640 }
641
642 #[test]
643 fn test_parse_grpc_uri_missing_method_separator() {
644 let result = parse_grpc_uri("grpc://localhost:50051/NoSlashHere");
645 assert!(result.is_err());
646 assert!(
647 result
648 .unwrap_err()
649 .to_string()
650 .contains("package.Service/Method")
651 );
652 }
653
654 #[test]
655 fn test_parse_grpc_uri_proto_absolute_path_rejected() {
656 let uri = "grpc://localhost:50051/pkg.Svc/Method?protoFile=/etc/passwd";
657 let result = parse_grpc_uri(uri);
658 assert!(result.is_err());
659 assert!(result.unwrap_err().to_string().contains("proto path"));
660 }
661
662 #[test]
663 fn test_parse_grpc_uri_proto_traversal_rejected() {
664 let uri = "grpc://localhost:50051/pkg.Svc/Method?protoFile=../secret.proto";
665 let result = parse_grpc_uri(uri);
666 assert!(result.is_err());
667 assert!(result.unwrap_err().to_string().contains(".."));
668 }
669
670 #[test]
671 fn test_parse_grpc_uri_tls_bool_param() {
672 let uri = "grpc://localhost:50051/pkg.Svc/Method?tls=true";
673 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
674 assert!(config.tls);
675 }
676
677 #[test]
678 fn test_grpc_config_defaults_via_deserialize() {
679 let config: GrpcConfig = serde_json::from_value(serde_json::json!({})).unwrap();
680 assert_eq!(config.max_receive_message_length, 4 * 1024 * 1024);
681 assert!(!config.reflection);
682 assert!(!config.tls);
683 assert!(config.proto_file.is_none());
684 assert!(config.service.is_none());
685 assert!(config.method.is_none());
686 assert!(config.deadline_ms.is_none());
687 assert!(config.metadata.is_none());
688 }
689
690 #[test]
691 fn test_grpc_config_deserialize_all_fields() {
692 let config: GrpcConfig = serde_json::from_value(serde_json::json!({
693 "protoFile": "test.proto",
694 "service": "MyService",
695 "method": "MyMethod",
696 "reflection": true,
697 "tls": true,
698 "max_receive_message_length": 1024,
699 "deadline_ms": 3000,
700 "metadata": "auth-token"
701 }))
702 .unwrap();
703 assert_eq!(config.proto_file, Some("test.proto".to_string()));
704 assert_eq!(config.service, Some("MyService".to_string()));
705 assert_eq!(config.method, Some("MyMethod".to_string()));
706 assert!(config.reflection);
707 assert!(config.tls);
708 assert_eq!(config.max_receive_message_length, 1024);
709 assert_eq!(config.deadline_ms, Some(3000));
710 assert_eq!(config.metadata, Some("auth-token".to_string()));
711 }
712
713 #[test]
714 fn test_grpc_config_clone_and_debug() {
715 let config: GrpcConfig = serde_json::from_value(serde_json::json!({
716 "protoFile": "test.proto"
717 }))
718 .unwrap();
719 let cloned = config.clone();
720 assert_eq!(config.proto_file, cloned.proto_file);
721 let debug_str = format!("{config:?}");
722 assert!(debug_str.contains("GrpcConfig"));
723 }
724
725 #[test]
728 fn test_server_config_default() {
729 let cfg = GrpcServerConfig::default();
730 assert!(cfg.max_receive_message_len.is_none());
731 }
732
733 #[test]
734 fn test_server_config_max_receive_message_len_applied() {
735 let cfg = GrpcServerConfig {
736 max_receive_message_len: Some(4096),
737 };
738 assert_eq!(cfg.max_receive_message_len, Some(4096));
739 }
740
741 #[test]
742 fn test_server_config_clone_and_debug() {
743 let cfg = GrpcServerConfig {
744 max_receive_message_len: Some(8192),
745 };
746 let cloned = cfg.clone();
747 assert_eq!(cfg.max_receive_message_len, cloned.max_receive_message_len);
748 let debug_str = format!("{cfg:?}");
749 assert!(debug_str.contains("GrpcServerConfig"));
750 }
751
752 #[test]
755 fn test_bool_param_case_insensitive() {
756 assert!(parse_bool_param("True").unwrap());
757 assert!(!parse_bool_param("FALSE").unwrap());
758 assert!(parse_bool_param("1").unwrap());
759 assert!(!parse_bool_param("0").unwrap());
760 assert!(parse_bool_param("yes").unwrap());
761 assert!(!parse_bool_param("no").unwrap());
762 assert!(parse_bool_param("YES").unwrap());
763 assert!(!parse_bool_param("NO").unwrap());
764 }
765
766 #[test]
767 fn test_bool_param_invalid_values() {
768 assert!(parse_bool_param("maybe").is_err());
769 assert!(parse_bool_param("").is_err());
770 assert!(parse_bool_param("2").is_err());
771 assert!(parse_bool_param("-1").is_err());
772 }
773
774 #[test]
777 fn test_apply_config_metadata_single_pair() {
778 let config: GrpcConfig = serde_json::from_value(serde_json::json!({
779 "metadata": "x-custom=hello"
780 }))
781 .unwrap();
782 let mut request = tonic::Request::new(());
783 apply_config_metadata(&config, &mut request);
784 assert_eq!(
785 request
786 .metadata()
787 .get("x-custom")
788 .unwrap()
789 .to_str()
790 .unwrap(),
791 "hello"
792 );
793 }
794
795 #[test]
796 fn test_apply_config_metadata_multiple_pairs() {
797 let config: GrpcConfig = serde_json::from_value(serde_json::json!({
798 "metadata": "x-a=1, x-b=2"
799 }))
800 .unwrap();
801 let mut request = tonic::Request::new(());
802 apply_config_metadata(&config, &mut request);
803 assert_eq!(
804 request.metadata().get("x-a").unwrap().to_str().unwrap(),
805 "1"
806 );
807 assert_eq!(
808 request.metadata().get("x-b").unwrap().to_str().unwrap(),
809 "2"
810 );
811 }
812
813 #[test]
814 fn test_apply_config_metadata_empty_metadata() {
815 let config: GrpcConfig =
816 serde_json::from_value(serde_json::json!({"metadata": ""})).unwrap();
817 let mut request = tonic::Request::new(());
818 apply_config_metadata(&config, &mut request);
819 assert!(request.metadata().is_empty());
820 }
821
822 #[test]
825 fn test_tls_config_default() {
826 let tls = TlsConfig::default();
827 assert!(!tls.tls_enabled);
828 assert!(tls.ca_cert_path.is_none());
829 assert!(tls.client_cert_path.is_none());
830 assert!(tls.client_key_path.is_none());
831 assert!(!tls.insecure_skip_verify);
832 }
833
834 #[test]
835 fn test_tls_config_deserialize_enabled() {
836 let tls: TlsConfig = serde_json::from_value(serde_json::json!({
837 "tls_enabled": true,
838 "ca_cert_path": "/path/to/ca.pem",
839 "client_cert_path": "/path/to/client.pem",
840 "client_key_path": "/path/to/client.key",
841 "insecure_skip_verify": true
842 }))
843 .unwrap();
844 assert!(tls.tls_enabled);
845 assert_eq!(tls.ca_cert_path, Some("/path/to/ca.pem".to_string()));
846 assert_eq!(
847 tls.client_cert_path,
848 Some("/path/to/client.pem".to_string())
849 );
850 assert_eq!(tls.client_key_path, Some("/path/to/client.key".to_string()));
851 assert!(tls.insecure_skip_verify);
852 }
853
854 #[test]
855 fn test_tls_config_clone_and_debug() {
856 let tls = TlsConfig {
857 tls_enabled: true,
858 ca_cert_path: Some("/ca.pem".to_string()),
859 client_cert_path: None,
860 client_key_path: None,
861 insecure_skip_verify: false,
862 };
863 let cloned = tls.clone();
864 assert_eq!(tls.tls_enabled, cloned.tls_enabled);
865 let debug_str = format!("{tls:?}");
866 assert!(debug_str.contains("TlsConfig"));
867 }
868
869 #[test]
872 fn test_auth_config_default_is_none() {
873 let auth = AuthConfig::default();
874 assert!(matches!(auth, AuthConfig::None));
875 }
876
877 #[tokio::test]
878 async fn test_auth_config_bearer_applies_metadata() {
879 let auth = AuthConfig::Bearer {
880 token: "my-secret-token".to_string(),
881 };
882 let mut request = tonic::Request::new(());
883 apply_auth_metadata(&auth, &mut request).await.unwrap();
884 let val = request
885 .metadata()
886 .get("authorization")
887 .unwrap()
888 .to_str()
889 .unwrap();
890 assert_eq!(val, "Bearer my-secret-token");
891 }
892
893 #[tokio::test]
894 async fn test_auth_config_none_no_metadata() {
895 let auth = AuthConfig::None;
896 let mut request = tonic::Request::new(());
897 apply_auth_metadata(&auth, &mut request).await.unwrap();
898 assert!(request.metadata().get("authorization").is_none());
899 }
900
901 #[tokio::test]
902 async fn test_auth_config_google_scaffold_no_metadata() {
903 let auth = AuthConfig::GoogleServiceAccount {
904 json_path: "/path/to/sa.json".to_string(),
905 };
906 let mut request = tonic::Request::new(());
907 apply_auth_metadata(&auth, &mut request).await.unwrap();
908 assert!(request.metadata().get("authorization").is_none());
909 }
910
911 #[tokio::test]
912 async fn test_auth_config_oauth2_sets_bearer() {
913 #[derive(Debug)]
914 struct MockProvider;
915 #[async_trait::async_trait]
916 impl camel_auth::TokenProvider for MockProvider {
917 async fn get_token(&self) -> Result<String, camel_auth::AuthError> {
918 Ok("mock-oauth2-token".to_string())
919 }
920 }
921 let auth = AuthConfig::OAuth2 {
922 token_provider: std::sync::Arc::new(MockProvider),
923 };
924 let mut request = tonic::Request::new(());
925 apply_auth_metadata(&auth, &mut request).await.unwrap();
926 let auth_header = request.metadata().get("authorization").unwrap();
927 assert_eq!(auth_header, "Bearer mock-oauth2-token");
928 }
929
930 #[tokio::test]
931 async fn test_auth_config_oauth2_failure_returns_error() {
932 #[derive(Debug)]
933 struct FailingProvider;
934 #[async_trait::async_trait]
935 impl camel_auth::TokenProvider for FailingProvider {
936 async fn get_token(&self) -> Result<String, camel_auth::AuthError> {
937 Err(camel_auth::AuthError::ProviderUnavailable(
938 "mock failure".into(),
939 ))
940 }
941 }
942 let auth = AuthConfig::OAuth2 {
943 token_provider: std::sync::Arc::new(FailingProvider),
944 };
945 let mut request = tonic::Request::new(());
946 let result = apply_auth_metadata(&auth, &mut request).await;
947 assert!(result.is_err());
948 assert!(request.metadata().get("authorization").is_none());
949 }
950
951 #[test]
954 fn test_interceptor_config_default_empty() {
955 let ic = InterceptorConfig::default();
956 assert!(ic.interceptors.is_empty());
957 }
958
959 #[test]
960 fn test_interceptor_config_deserialize() {
961 let ic: InterceptorConfig = serde_json::from_value(serde_json::json!({
962 "interceptors": ["logging", "auth"]
963 }))
964 .unwrap();
965 assert_eq!(ic.interceptors.len(), 2);
966 assert_eq!(ic.interceptors[0], "logging");
967 assert_eq!(ic.interceptors[1], "auth");
968 }
969
970 #[test]
973 fn test_consumer_strategy_default() {
974 assert_eq!(ConsumerStrategy::default(), ConsumerStrategy::RoundRobin);
975 }
976
977 #[test]
978 fn test_consumer_strategy_from_str() {
979 assert_eq!(
980 ConsumerStrategy::from_str("roundRobin").unwrap(),
981 ConsumerStrategy::RoundRobin
982 );
983 assert_eq!(
984 ConsumerStrategy::from_str("first").unwrap(),
985 ConsumerStrategy::First
986 );
987 assert_eq!(
988 ConsumerStrategy::from_str("last").unwrap(),
989 ConsumerStrategy::Last
990 );
991 }
992
993 #[test]
994 fn test_consumer_strategy_display() {
995 assert_eq!(ConsumerStrategy::RoundRobin.to_string(), "roundRobin");
996 assert_eq!(ConsumerStrategy::First.to_string(), "first");
997 assert_eq!(ConsumerStrategy::Last.to_string(), "last");
998 }
999
1000 #[test]
1001 fn test_consumer_strategy_invalid() {
1002 assert!(ConsumerStrategy::from_str("invalid").is_err());
1003 }
1004
1005 #[test]
1008 fn test_producer_strategy_default() {
1009 assert_eq!(ProducerStrategy::default(), ProducerStrategy::RequestReply);
1010 }
1011
1012 #[test]
1013 fn test_producer_strategy_from_str() {
1014 assert_eq!(
1015 ProducerStrategy::from_str("fireAndForget").unwrap(),
1016 ProducerStrategy::FireAndForget
1017 );
1018 assert_eq!(
1019 ProducerStrategy::from_str("requestReply").unwrap(),
1020 ProducerStrategy::RequestReply
1021 );
1022 }
1023
1024 #[test]
1025 fn test_producer_strategy_display() {
1026 assert_eq!(ProducerStrategy::FireAndForget.to_string(), "fireAndForget");
1027 assert_eq!(ProducerStrategy::RequestReply.to_string(), "requestReply");
1028 }
1029
1030 #[test]
1031 fn test_producer_strategy_invalid() {
1032 assert!(ProducerStrategy::from_str("invalid").is_err());
1033 }
1034
1035 #[test]
1038 fn test_grpc_config_debug_redacts_metadata() {
1039 let config: GrpcConfig = serde_json::from_value(serde_json::json!({
1040 "metadata": "secret-key=value"
1041 }))
1042 .unwrap();
1043 let debug_str = format!("{config:?}");
1044 assert!(debug_str.contains("[REDACTED]"));
1045 assert!(!debug_str.contains("secret-key=value"));
1046 }
1047
1048 #[test]
1049 fn test_grpc_config_debug_no_redaction_without_secrets() {
1050 let config: GrpcConfig = serde_json::from_value(serde_json::json!({
1051 "protoFile": "test.proto"
1052 }))
1053 .unwrap();
1054 let debug_str = format!("{config:?}");
1055 assert!(debug_str.contains("test.proto"));
1056 }
1057
1058 #[test]
1059 fn test_auth_config_debug_redacts_bearer_token() {
1060 let auth = AuthConfig::Bearer {
1061 token: "super-secret-token".to_string(),
1062 };
1063 let debug_str = format!("{auth:?}");
1064 assert!(debug_str.contains("[REDACTED]"));
1065 assert!(!debug_str.contains("super-secret-token"));
1066 }
1067
1068 #[test]
1069 fn test_auth_config_debug_redacts_google_json_path() {
1070 let auth = AuthConfig::GoogleServiceAccount {
1071 json_path: "/secret/sa.json".to_string(),
1072 };
1073 let debug_str = format!("{auth:?}");
1074 assert!(debug_str.contains("[REDACTED]"));
1075 assert!(!debug_str.contains("/secret/sa.json"));
1076 }
1077
1078 #[test]
1079 fn test_auth_config_debug_none_is_clean() {
1080 let auth = AuthConfig::None;
1081 let debug_str = format!("{auth:?}");
1082 assert_eq!(debug_str, "None");
1083 }
1084
1085 #[test]
1088 fn test_parse_grpc_uri_bearer_token() {
1089 let uri = "grpc://localhost:50051/pkg.Svc/Method?bearerToken=my-token";
1090 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
1091 match config.auth {
1092 AuthConfig::Bearer { ref token } => assert_eq!(token, "my-token"),
1093 _ => panic!("expected Bearer auth"),
1094 }
1095 }
1096
1097 #[test]
1100 fn grpc_config_has_retry_policy_toml() {
1101 let toml_str = r#"
1102 protoFile = "helloworld.proto"
1103 service = "Greeter"
1104 method = "SayHello"
1105 [retry]
1106 max_attempts = 3
1107 initial_delay_ms = 500
1108 "#;
1109 let cfg: GrpcConfig = toml::from_str(toml_str).expect("parse");
1110 assert_eq!(cfg.retry.max_attempts, 3);
1111 assert_eq!(
1112 cfg.retry.initial_delay,
1113 std::time::Duration::from_millis(500)
1114 );
1115 }
1116
1117 #[test]
1118 fn grpc_config_retry_defaults_when_not_specified() {
1119 let toml_str = r#"
1120 protoFile = "helloworld.proto"
1121 "#;
1122 let cfg: GrpcConfig = toml::from_str(toml_str).expect("parse");
1123 assert_eq!(
1124 cfg.retry,
1125 camel_component_api::NetworkRetryPolicy::default()
1126 );
1127 }
1128
1129 #[test]
1132 fn test_parse_grpc_uri_consumer_strategy() {
1133 let uri = "grpc://localhost:50051/pkg.Svc/Method?consumerStrategy=first";
1134 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
1135 assert_eq!(config.consumer_strategy, ConsumerStrategy::First);
1136 }
1137
1138 #[test]
1139 fn test_parse_grpc_uri_producer_strategy() {
1140 let uri = "grpc://localhost:50051/pkg.Svc/Method?producerStrategy=fireAndForget";
1141 let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
1142 assert_eq!(config.producer_strategy, ProducerStrategy::FireAndForget);
1143 }
1144}