Skip to main content

camel_component_grpc/
config.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::str::FromStr;
4use std::sync::Arc;
5
6use camel_api::error::CamelError;
7use camel_auth::oauth2::TokenProvider;
8use camel_component_api::NetworkRetryPolicy;
9use serde::Deserialize;
10use serde::de::{self, MapAccess, Visitor};
11
12// ── TLS configuration (GRPC-006) ──────────────────────────────────────────
13
14/// TLS/mTLS configuration for gRPC channels.
15///
16/// When `tls_enabled` is `true`, the producer will attempt to establish a
17/// TLS-secured connection using the provided certificate paths.
18///
19/// /// TODO(GRPC-006): load cert files at runtime
20#[derive(Debug, Clone, Default, Deserialize, PartialEq)]
21#[non_exhaustive]
22pub struct TlsConfig {
23    #[serde(default)]
24    pub tls_enabled: bool,
25    pub ca_cert_path: Option<String>,
26    pub client_cert_path: Option<String>,
27    pub client_key_path: Option<String>,
28    #[serde(default)]
29    pub insecure_skip_verify: bool,
30}
31
32// ── Auth configuration (GRPC-007) ─────────────────────────────────────────
33
34/// Authentication configuration for gRPC channels.
35#[derive(Default)]
36#[non_exhaustive]
37pub enum AuthConfig {
38    /// No authentication.
39    #[default]
40    None,
41    /// Bearer token authentication. Adds `Authorization: Bearer <token>` to request metadata.
42    Bearer { token: String },
43    /// Google service account authentication (scaffold only).
44    ///
45    /// /// TODO(GRPC-007): Google service account token refresh not yet implemented
46    GoogleServiceAccount { json_path: String },
47    /// OAuth2 token provider for dynamic token injection.
48    OAuth2 {
49        token_provider: Arc<dyn TokenProvider>,
50    },
51}
52
53impl<'de> Deserialize<'de> for AuthConfig {
54    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
55    where
56        D: de::Deserializer<'de>,
57    {
58        struct AuthConfigVisitor;
59
60        impl<'de> Visitor<'de> for AuthConfigVisitor {
61            type Value = AuthConfig;
62
63            fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
64                formatter.write_str("an AuthConfig variant")
65            }
66
67            fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
68            where
69                E: de::Error,
70            {
71                match v {
72                    "None" => Ok(AuthConfig::None),
73                    "OAuth2" => Err(de::Error::custom(
74                        "OAuth2 variant cannot be deserialized; construct programmatically",
75                    )),
76                    other => Err(de::Error::unknown_variant(
77                        other,
78                        &["None", "Bearer", "GoogleServiceAccount", "OAuth2"],
79                    )),
80                }
81            }
82
83            fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
84            where
85                M: MapAccess<'de>,
86            {
87                let variant = map
88                    .next_key::<String>()?
89                    .ok_or_else(|| de::Error::invalid_length(0, &self))?;
90                match variant.as_str() {
91                    "None" => Ok(AuthConfig::None),
92                    "Bearer" => {
93                        let mut token = None;
94                        while let Some(key) = map.next_key::<String>()? {
95                            if key == "token" {
96                                token = Some(map.next_value()?);
97                            } else {
98                                let _: de::IgnoredAny = map.next_value()?;
99                            }
100                        }
101                        let token = token.ok_or_else(|| de::Error::missing_field("token"))?;
102                        Ok(AuthConfig::Bearer { token })
103                    }
104                    "GoogleServiceAccount" => {
105                        let mut json_path = None;
106                        while let Some(key) = map.next_key::<String>()? {
107                            if key == "json_path" {
108                                json_path = Some(map.next_value()?);
109                            } else {
110                                let _: de::IgnoredAny = map.next_value()?;
111                            }
112                        }
113                        let json_path =
114                            json_path.ok_or_else(|| de::Error::missing_field("json_path"))?;
115                        Ok(AuthConfig::GoogleServiceAccount { json_path })
116                    }
117                    "OAuth2" => Err(de::Error::custom(
118                        "OAuth2 variant cannot be deserialized; construct programmatically",
119                    )),
120                    other => Err(de::Error::unknown_variant(
121                        other,
122                        &["None", "Bearer", "GoogleServiceAccount"],
123                    )),
124                }
125            }
126        }
127
128        deserializer.deserialize_any(AuthConfigVisitor)
129    }
130}
131
132impl fmt::Debug for AuthConfig {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        match self {
135            AuthConfig::None => write!(f, "None"), // allow-secret
136            AuthConfig::Bearer { .. } => write!(f, "Bearer {{ token: \"[REDACTED]\" }}"), // allow-secret
137            AuthConfig::GoogleServiceAccount { .. } => {
138                write!(f, "GoogleServiceAccount {{ json_path: \"[REDACTED]\" }}") // allow-secret
139            }
140            AuthConfig::OAuth2 { .. } => write!(f, "OAuth2 {{ token_provider: \"[REDACTED]\" }}"), // allow-secret
141        }
142    }
143}
144
145impl Clone for AuthConfig {
146    fn clone(&self) -> Self {
147        match self {
148            AuthConfig::None => AuthConfig::None,
149            AuthConfig::Bearer { token } => AuthConfig::Bearer {
150                token: token.clone(),
151            },
152            AuthConfig::GoogleServiceAccount { json_path } => AuthConfig::GoogleServiceAccount {
153                json_path: json_path.clone(),
154            },
155            AuthConfig::OAuth2 { token_provider } => AuthConfig::OAuth2 {
156                token_provider: Arc::clone(token_provider),
157            },
158        }
159    }
160}
161
162// ── Interceptor placeholder (GRPC-008) ────────────────────────────────────
163
164/// Placeholder for future gRPC interceptor registration.
165///
166/// This field stores interceptor class/type names as strings. Actual wiring
167/// into the tonic service stack is not yet implemented.
168///
169/// /// TODO(GRPC-008): interceptor registry not yet implemented
170#[derive(Debug, Clone, Default, Deserialize)]
171#[non_exhaustive]
172pub struct InterceptorConfig {
173    #[serde(default)]
174    pub interceptors: Vec<String>,
175}
176
177// ── Consumer / Producer strategies (GRPC-009) ────────────────────────────
178
179/// Strategy for selecting among multiple gRPC consumers.
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
181#[serde(rename_all = "camelCase")]
182pub enum ConsumerStrategy {
183    #[default]
184    RoundRobin,
185    First,
186    Last,
187}
188
189impl fmt::Display for ConsumerStrategy {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        match self {
192            ConsumerStrategy::RoundRobin => write!(f, "roundRobin"),
193            ConsumerStrategy::First => write!(f, "first"),
194            ConsumerStrategy::Last => write!(f, "last"),
195        }
196    }
197}
198
199impl FromStr for ConsumerStrategy {
200    type Err = CamelError;
201
202    fn from_str(s: &str) -> Result<Self, Self::Err> {
203        match s {
204            "roundRobin" | "round_robin" => Ok(Self::RoundRobin),
205            "first" => Ok(Self::First),
206            "last" => Ok(Self::Last),
207            _ => Err(CamelError::Config(format!("invalid ConsumerStrategy: {s}"))),
208        }
209    }
210}
211
212/// Strategy for gRPC producer invocation mode.
213#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
214#[serde(rename_all = "camelCase")]
215pub enum ProducerStrategy {
216    FireAndForget,
217    #[default]
218    RequestReply,
219}
220
221impl fmt::Display for ProducerStrategy {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            ProducerStrategy::FireAndForget => write!(f, "fireAndForget"),
225            ProducerStrategy::RequestReply => write!(f, "requestReply"),
226        }
227    }
228}
229
230impl FromStr for ProducerStrategy {
231    type Err = CamelError;
232
233    fn from_str(s: &str) -> Result<Self, Self::Err> {
234        match s {
235            "fireAndForget" | "fire_and_forget" => Ok(Self::FireAndForget),
236            "requestReply" | "request_reply" => Ok(Self::RequestReply),
237            _ => Err(CamelError::Config(format!("invalid ProducerStrategy: {s}"))),
238        }
239    }
240}
241
242// ── Main config ───────────────────────────────────────────────────────────
243
244#[derive(Clone, Deserialize)]
245pub struct GrpcConfig {
246    #[serde(rename = "protoFile")]
247    pub proto_file: Option<String>,
248    pub service: Option<String>,
249    pub method: Option<String>,
250    #[serde(default)]
251    pub reflection: bool,
252    #[serde(default)]
253    pub tls: bool,
254    #[serde(default = "default_max_msg_len")]
255    pub max_receive_message_length: usize,
256    pub deadline_ms: Option<u64>,
257    pub metadata: Option<String>,
258
259    // GRPC-006: TLS/mTLS support
260    #[serde(default)]
261    pub tls_config: Option<TlsConfig>,
262
263    // GRPC-007: Auth support
264    #[serde(default)]
265    pub auth: AuthConfig,
266
267    // GRPC-008: Interceptor placeholder
268    #[serde(default)]
269    pub interceptors: InterceptorConfig,
270
271    // GRPC-009: Strategy configuration
272    #[serde(default)]
273    pub consumer_strategy: ConsumerStrategy,
274    #[serde(default)]
275    pub producer_strategy: ProducerStrategy,
276
277    /// Transient error retry policy for gRPC producer RPC calls.
278    ///
279    /// Controls how the producer retries `Unavailable`, `DeadlineExceeded`,
280    /// `ResourceExhausted`, `Aborted`, and transport-level errors. Permanent
281    /// codes (`InvalidArgument`, `NotFound`, `PermissionDenied`, etc.) are
282    /// never retried.
283    #[serde(default)]
284    pub retry: NetworkRetryPolicy,
285}
286
287/// Custom Debug that redacts sensitive fields (GRPC-013).
288impl fmt::Debug for GrpcConfig {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        f.debug_struct("GrpcConfig")
291            .field("proto_file", &self.proto_file)
292            .field("service", &self.service)
293            .field("method", &self.method)
294            .field("reflection", &self.reflection)
295            .field("tls", &self.tls)
296            .field(
297                "max_receive_message_length",
298                &self.max_receive_message_length,
299            )
300            .field("deadline_ms", &self.deadline_ms)
301            .field("metadata", &self.metadata.as_ref().map(|_| "[REDACTED]"))
302            .field("tls_config", &self.tls_config)
303            .field("auth", &self.auth)
304            .field("interceptors", &self.interceptors)
305            .field("consumer_strategy", &self.consumer_strategy)
306            .field("producer_strategy", &self.producer_strategy)
307            .field("retry", &self.retry)
308            .finish()
309    }
310}
311
312fn default_max_msg_len() -> usize {
313    4 * 1024 * 1024
314}
315
316/// Server-side configuration for the gRPC transport layer.
317#[derive(Debug, Clone, Default)]
318pub struct GrpcServerConfig {
319    /// Maximum incoming message size in bytes. None means use tonic/hyper default.
320    pub max_receive_message_len: Option<usize>,
321}
322
323// ── URI param parsing helpers ──────────────────────────────────────────────
324
325fn parse_bool_param(val: &str) -> Result<bool, CamelError> {
326    match val.to_ascii_lowercase().as_str() {
327        "true" | "1" | "yes" => Ok(true),
328        "false" | "0" | "no" => Ok(false),
329        _ => Err(CamelError::Config(format!("invalid bool value: {val}"))),
330    }
331}
332
333fn parse_numeric_param<T: std::str::FromStr>(val: &str, field: &str) -> Result<T, CamelError>
334where
335    T::Err: std::fmt::Display,
336{
337    val.parse::<T>()
338        .map_err(|e| CamelError::Config(format!("invalid numeric value for {field}: {val} ({e})")))
339}
340
341/// Parse query pairs into a typed `GrpcConfig`, handling bool and numeric
342/// params natively instead of relying on serde string→bool/number coercion.
343fn parse_grpc_query_params(
344    pairs: impl Iterator<Item = (String, String)>,
345) -> Result<GrpcConfig, CamelError> {
346    let mut map: HashMap<String, String> = HashMap::new();
347    for (k, v) in pairs {
348        map.insert(k, v);
349    }
350
351    let proto_file = map.remove("protoFile");
352    let service = map.remove("service");
353    let method = map.remove("method");
354    let metadata = map.remove("metadata");
355
356    let reflection = map
357        .remove("reflection")
358        .map(|v| parse_bool_param(&v))
359        .transpose()?
360        .unwrap_or(false);
361
362    let tls = map
363        .remove("tls")
364        .map(|v| parse_bool_param(&v))
365        .transpose()?
366        .unwrap_or(false);
367
368    let max_receive_message_length = map
369        .remove("max_receive_message_length")
370        .map(|v| parse_numeric_param(&v, "max_receive_message_length"))
371        .transpose()?
372        .unwrap_or_else(default_max_msg_len);
373
374    let deadline_ms = map
375        .remove("deadline_ms")
376        .map(|v| parse_numeric_param(&v, "deadline_ms"))
377        .transpose()?;
378
379    // GRPC-007: Parse auth from query params
380    let auth = if let Some(token) = map.remove("bearerToken") {
381        AuthConfig::Bearer { token }
382    } else if let Some(json_path) = map.remove("googleServiceAccount") {
383        AuthConfig::GoogleServiceAccount { json_path }
384    } else {
385        AuthConfig::None
386    };
387
388    // GRPC-009: Parse strategies
389    let consumer_strategy = map
390        .remove("consumerStrategy")
391        .map(|v| ConsumerStrategy::from_str(&v))
392        .transpose()?
393        .unwrap_or_default();
394
395    let producer_strategy = map
396        .remove("producerStrategy")
397        .map(|v| ProducerStrategy::from_str(&v))
398        .transpose()?
399        .unwrap_or_default();
400
401    // Warn about any unrecognized params
402    for (k, v) in &map {
403        tracing::warn!("unrecognized gRPC URI parameter '{k}={v}' — ignored");
404    }
405
406    Ok(GrpcConfig {
407        proto_file,
408        service,
409        method,
410        reflection,
411        tls,
412        max_receive_message_length,
413        deadline_ms,
414        metadata,
415        tls_config: None,
416        auth,
417        interceptors: InterceptorConfig::default(),
418        consumer_strategy,
419        producer_strategy,
420        retry: NetworkRetryPolicy::default(),
421    })
422}
423
424pub fn parse_grpc_uri(uri: &str) -> Result<(String, u16, String, String, GrpcConfig), CamelError> {
425    let parsed = url::Url::parse(uri).map_err(|e| CamelError::RouteError(e.to_string()))?;
426    let host = parsed
427        .host_str()
428        .ok_or_else(|| CamelError::RouteError("missing host".to_string()))?
429        .to_string();
430    let port = parsed
431        .port()
432        .ok_or_else(|| CamelError::RouteError("missing port".to_string()))?;
433    let path = parsed.path().trim_start_matches('/');
434    let (service, method) = path.rsplit_once('/').ok_or_else(|| {
435        CamelError::RouteError("URI path must be package.Service/Method".to_string())
436    })?;
437    let config = parse_grpc_query_params(
438        parsed
439            .query_pairs()
440            .map(|(k, v)| (k.to_string(), v.to_string())),
441    )?;
442    if let Some(ref proto) = config.proto_file
443        && (proto.starts_with('/') || proto.contains(".."))
444    {
445        return Err(CamelError::RouteError(format!(
446            "proto path '{}' must be relative and cannot contain '..'",
447            proto
448        )));
449    }
450    if config.reflection {
451        tracing::warn!("gRPC reflection is not supported in v1 — parameter ignored");
452    }
453    if config.tls {
454        tracing::warn!("gRPC TLS is not supported in v1 — parameter ignored");
455    }
456    Ok((host, port, service.to_string(), method.to_string(), config))
457}
458
459/// Apply config-level metadata to a tonic request (GRPC-004).
460///
461/// Parses `config.metadata` as `key1=value1,key2=value2` and injects
462/// each entry into the request's metadata map.
463pub fn apply_config_metadata<T>(config: &GrpcConfig, request: &mut tonic::Request<T>) {
464    if let Some(ref metadata_str) = config.metadata {
465        for pair in metadata_str.split(',') {
466            let pair = pair.trim();
467            if let Some((key, value)) = pair.split_once('=') {
468                let key = key.trim();
469                let value = value.trim();
470                if let Ok(name) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes())
471                    && let Ok(meta_val) = tonic::metadata::MetadataValue::try_from(value)
472                {
473                    request.metadata_mut().insert(name, meta_val);
474                    tracing::debug!(key = key, "applied config metadata to gRPC request");
475                }
476            }
477        }
478    }
479}
480
481/// Apply auth headers from `AuthConfig` to a tonic request (GRPC-007).
482///
483/// Returns `Err` if auth is configured but cannot be applied (e.g. OAuth2
484/// token acquisition fails). Fail-closed: callers that configured auth
485/// expect authenticated requests.
486pub async fn apply_auth_metadata<T>(
487    auth: &AuthConfig,
488    request: &mut tonic::Request<T>,
489) -> Result<(), camel_api::CamelError> {
490    match auth {
491        AuthConfig::Bearer { token } => {
492            if let Ok(name) = tonic::metadata::MetadataKey::from_bytes("authorization".as_bytes()) {
493                let value = format!("Bearer {token}"); // allow-secret
494                if let Ok(meta_val) = tonic::metadata::MetadataValue::try_from(value.as_str()) {
495                    request.metadata_mut().insert(name, meta_val);
496                    tracing::debug!("applied bearer auth to gRPC request");
497                } else {
498                    return Err(camel_api::CamelError::ProcessorError(
499                        "bearer token contains invalid characters".into(),
500                    ));
501                }
502            }
503        }
504        AuthConfig::OAuth2 { token_provider } => {
505            let token = token_provider.get_token().await.map_err(|e| {
506                let message = format!("failed to acquire OAuth2 token for gRPC producer: {e}"); // allow-secret
507                camel_api::CamelError::ProcessorError(message)
508            })?;
509            if let Ok(name) = tonic::metadata::MetadataKey::from_bytes("authorization".as_bytes()) {
510                let value = format!("Bearer {token}"); // allow-secret
511                if let Ok(meta_val) = tonic::metadata::MetadataValue::try_from(value.as_str()) {
512                    request.metadata_mut().insert(name, meta_val);
513                }
514            }
515        }
516        AuthConfig::None | AuthConfig::GoogleServiceAccount { .. } => {}
517    }
518    Ok(())
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524
525    #[test]
526    fn test_parse_grpc_uri_valid() {
527        let uri = "grpc://localhost:50051/com.example.MyService/MyMethod";
528        let (host, port, service, method, config) = parse_grpc_uri(uri).unwrap();
529        assert_eq!(host, "localhost");
530        assert_eq!(port, 50051);
531        assert_eq!(service, "com.example.MyService");
532        assert_eq!(method, "MyMethod");
533        assert_eq!(config.max_receive_message_length, 4 * 1024 * 1024);
534        assert!(!config.reflection);
535        assert!(!config.tls);
536    }
537
538    #[test]
539    fn test_parse_grpc_uri_bool_query_params_case_insensitive() {
540        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=true";
541        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
542        assert!(config.reflection);
543
544        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=TRUE";
545        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
546        assert!(config.reflection);
547
548        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=1";
549        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
550        assert!(config.reflection);
551
552        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=yes";
553        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
554        assert!(config.reflection);
555
556        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=false";
557        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
558        assert!(!config.reflection);
559
560        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=0";
561        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
562        assert!(!config.reflection);
563
564        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=no";
565        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
566        assert!(!config.reflection);
567    }
568
569    #[test]
570    fn test_parse_grpc_uri_bool_query_params_invalid() {
571        let uri = "grpc://localhost:50051/pkg.Svc/Method?reflection=maybe";
572        let result = parse_grpc_uri(uri);
573        assert!(result.is_err());
574        assert!(
575            result
576                .unwrap_err()
577                .to_string()
578                .contains("invalid bool value")
579        );
580    }
581
582    #[test]
583    fn test_parse_grpc_uri_with_proto_file() {
584        let uri = "grpc://localhost:50051/pkg.Svc/Method?protoFile=my.proto";
585        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
586        assert_eq!(config.proto_file, Some("my.proto".to_string()));
587    }
588
589    #[test]
590    fn test_parse_grpc_uri_numeric_query_params_work() {
591        let uri = "grpc://localhost:50051/pkg.Svc/Method?max_receive_message_length=8388608";
592        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
593        assert_eq!(config.max_receive_message_length, 8388608);
594
595        let uri = "grpc://localhost:50051/pkg.Svc/Method?deadline_ms=5000";
596        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
597        assert_eq!(config.deadline_ms, Some(5000));
598    }
599
600    #[test]
601    fn test_parse_grpc_uri_numeric_query_params_invalid() {
602        let uri = "grpc://localhost:50051/pkg.Svc/Method?deadline_ms=notanumber";
603        let result = parse_grpc_uri(uri);
604        assert!(result.is_err());
605        assert!(
606            result
607                .unwrap_err()
608                .to_string()
609                .contains("invalid numeric value")
610        );
611    }
612
613    #[test]
614    fn test_parse_grpc_uri_with_metadata() {
615        let uri = "grpc://localhost:50051/pkg.Svc/Method?metadata=some-value";
616        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
617        assert_eq!(config.metadata, Some("some-value".to_string()));
618    }
619
620    #[test]
621    fn test_parse_grpc_uri_invalid_uri() {
622        let result = parse_grpc_uri("not-a-valid-uri");
623        assert!(result.is_err());
624        assert!(result.unwrap_err().to_string().contains("relative URL"));
625    }
626
627    #[test]
628    fn test_parse_grpc_uri_missing_host() {
629        let result = parse_grpc_uri("grpc:/pkg.Svc/Method");
630        assert!(result.is_err());
631        let err = result.unwrap_err().to_string();
632        assert!(err.contains("missing host") || err.contains("empty host"));
633    }
634
635    #[test]
636    fn test_parse_grpc_uri_missing_port() {
637        let result = parse_grpc_uri("grpc://localhost/pkg.Svc/Method");
638        assert!(result.is_err());
639        assert!(result.unwrap_err().to_string().contains("missing port"));
640    }
641
642    #[test]
643    fn test_parse_grpc_uri_missing_method_separator() {
644        let result = parse_grpc_uri("grpc://localhost:50051/NoSlashHere");
645        assert!(result.is_err());
646        assert!(
647            result
648                .unwrap_err()
649                .to_string()
650                .contains("package.Service/Method")
651        );
652    }
653
654    #[test]
655    fn test_parse_grpc_uri_proto_absolute_path_rejected() {
656        let uri = "grpc://localhost:50051/pkg.Svc/Method?protoFile=/etc/passwd";
657        let result = parse_grpc_uri(uri);
658        assert!(result.is_err());
659        assert!(result.unwrap_err().to_string().contains("proto path"));
660    }
661
662    #[test]
663    fn test_parse_grpc_uri_proto_traversal_rejected() {
664        let uri = "grpc://localhost:50051/pkg.Svc/Method?protoFile=../secret.proto";
665        let result = parse_grpc_uri(uri);
666        assert!(result.is_err());
667        assert!(result.unwrap_err().to_string().contains(".."));
668    }
669
670    #[test]
671    fn test_parse_grpc_uri_tls_bool_param() {
672        let uri = "grpc://localhost:50051/pkg.Svc/Method?tls=true";
673        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
674        assert!(config.tls);
675    }
676
677    #[test]
678    fn test_grpc_config_defaults_via_deserialize() {
679        let config: GrpcConfig = serde_json::from_value(serde_json::json!({})).unwrap();
680        assert_eq!(config.max_receive_message_length, 4 * 1024 * 1024);
681        assert!(!config.reflection);
682        assert!(!config.tls);
683        assert!(config.proto_file.is_none());
684        assert!(config.service.is_none());
685        assert!(config.method.is_none());
686        assert!(config.deadline_ms.is_none());
687        assert!(config.metadata.is_none());
688    }
689
690    #[test]
691    fn test_grpc_config_deserialize_all_fields() {
692        let config: GrpcConfig = serde_json::from_value(serde_json::json!({
693            "protoFile": "test.proto",
694            "service": "MyService",
695            "method": "MyMethod",
696            "reflection": true,
697            "tls": true,
698            "max_receive_message_length": 1024,
699            "deadline_ms": 3000,
700            "metadata": "auth-token"
701        }))
702        .unwrap();
703        assert_eq!(config.proto_file, Some("test.proto".to_string()));
704        assert_eq!(config.service, Some("MyService".to_string()));
705        assert_eq!(config.method, Some("MyMethod".to_string()));
706        assert!(config.reflection);
707        assert!(config.tls);
708        assert_eq!(config.max_receive_message_length, 1024);
709        assert_eq!(config.deadline_ms, Some(3000));
710        assert_eq!(config.metadata, Some("auth-token".to_string()));
711    }
712
713    #[test]
714    fn test_grpc_config_clone_and_debug() {
715        let config: GrpcConfig = serde_json::from_value(serde_json::json!({
716            "protoFile": "test.proto"
717        }))
718        .unwrap();
719        let cloned = config.clone();
720        assert_eq!(config.proto_file, cloned.proto_file);
721        let debug_str = format!("{config:?}");
722        assert!(debug_str.contains("GrpcConfig"));
723    }
724
725    // ── GrpcServerConfig tests ─────────────────────────────────────────────
726
727    #[test]
728    fn test_server_config_default() {
729        let cfg = GrpcServerConfig::default();
730        assert!(cfg.max_receive_message_len.is_none());
731    }
732
733    #[test]
734    fn test_server_config_max_receive_message_len_applied() {
735        let cfg = GrpcServerConfig {
736            max_receive_message_len: Some(4096),
737        };
738        assert_eq!(cfg.max_receive_message_len, Some(4096));
739    }
740
741    #[test]
742    fn test_server_config_clone_and_debug() {
743        let cfg = GrpcServerConfig {
744            max_receive_message_len: Some(8192),
745        };
746        let cloned = cfg.clone();
747        assert_eq!(cfg.max_receive_message_len, cloned.max_receive_message_len);
748        let debug_str = format!("{cfg:?}");
749        assert!(debug_str.contains("GrpcServerConfig"));
750    }
751
752    // ── parse_bool_param tests ─────────────────────────────────────────────
753
754    #[test]
755    fn test_bool_param_case_insensitive() {
756        assert!(parse_bool_param("True").unwrap());
757        assert!(!parse_bool_param("FALSE").unwrap());
758        assert!(parse_bool_param("1").unwrap());
759        assert!(!parse_bool_param("0").unwrap());
760        assert!(parse_bool_param("yes").unwrap());
761        assert!(!parse_bool_param("no").unwrap());
762        assert!(parse_bool_param("YES").unwrap());
763        assert!(!parse_bool_param("NO").unwrap());
764    }
765
766    #[test]
767    fn test_bool_param_invalid_values() {
768        assert!(parse_bool_param("maybe").is_err());
769        assert!(parse_bool_param("").is_err());
770        assert!(parse_bool_param("2").is_err());
771        assert!(parse_bool_param("-1").is_err());
772    }
773
774    // ── GRPC-004: apply_config_metadata tests ──────────────────────────────
775
776    #[test]
777    fn test_apply_config_metadata_single_pair() {
778        let config: GrpcConfig = serde_json::from_value(serde_json::json!({
779            "metadata": "x-custom=hello"
780        }))
781        .unwrap();
782        let mut request = tonic::Request::new(());
783        apply_config_metadata(&config, &mut request);
784        assert_eq!(
785            request
786                .metadata()
787                .get("x-custom")
788                .unwrap()
789                .to_str()
790                .unwrap(),
791            "hello"
792        );
793    }
794
795    #[test]
796    fn test_apply_config_metadata_multiple_pairs() {
797        let config: GrpcConfig = serde_json::from_value(serde_json::json!({
798            "metadata": "x-a=1, x-b=2"
799        }))
800        .unwrap();
801        let mut request = tonic::Request::new(());
802        apply_config_metadata(&config, &mut request);
803        assert_eq!(
804            request.metadata().get("x-a").unwrap().to_str().unwrap(),
805            "1"
806        );
807        assert_eq!(
808            request.metadata().get("x-b").unwrap().to_str().unwrap(),
809            "2"
810        );
811    }
812
813    #[test]
814    fn test_apply_config_metadata_empty_metadata() {
815        let config: GrpcConfig =
816            serde_json::from_value(serde_json::json!({"metadata": ""})).unwrap();
817        let mut request = tonic::Request::new(());
818        apply_config_metadata(&config, &mut request);
819        assert!(request.metadata().is_empty());
820    }
821
822    // ── GRPC-006: TlsConfig tests ──────────────────────────────────────────
823
824    #[test]
825    fn test_tls_config_default() {
826        let tls = TlsConfig::default();
827        assert!(!tls.tls_enabled);
828        assert!(tls.ca_cert_path.is_none());
829        assert!(tls.client_cert_path.is_none());
830        assert!(tls.client_key_path.is_none());
831        assert!(!tls.insecure_skip_verify);
832    }
833
834    #[test]
835    fn test_tls_config_deserialize_enabled() {
836        let tls: TlsConfig = serde_json::from_value(serde_json::json!({
837            "tls_enabled": true,
838            "ca_cert_path": "/path/to/ca.pem",
839            "client_cert_path": "/path/to/client.pem",
840            "client_key_path": "/path/to/client.key",
841            "insecure_skip_verify": true
842        }))
843        .unwrap();
844        assert!(tls.tls_enabled);
845        assert_eq!(tls.ca_cert_path, Some("/path/to/ca.pem".to_string()));
846        assert_eq!(
847            tls.client_cert_path,
848            Some("/path/to/client.pem".to_string())
849        );
850        assert_eq!(tls.client_key_path, Some("/path/to/client.key".to_string()));
851        assert!(tls.insecure_skip_verify);
852    }
853
854    #[test]
855    fn test_tls_config_clone_and_debug() {
856        let tls = TlsConfig {
857            tls_enabled: true,
858            ca_cert_path: Some("/ca.pem".to_string()),
859            client_cert_path: None,
860            client_key_path: None,
861            insecure_skip_verify: false,
862        };
863        let cloned = tls.clone();
864        assert_eq!(tls.tls_enabled, cloned.tls_enabled);
865        let debug_str = format!("{tls:?}");
866        assert!(debug_str.contains("TlsConfig"));
867    }
868
869    // ── GRPC-007: AuthConfig tests ─────────────────────────────────────────
870
871    #[test]
872    fn test_auth_config_default_is_none() {
873        let auth = AuthConfig::default();
874        assert!(matches!(auth, AuthConfig::None));
875    }
876
877    #[tokio::test]
878    async fn test_auth_config_bearer_applies_metadata() {
879        let auth = AuthConfig::Bearer {
880            token: "my-secret-token".to_string(),
881        };
882        let mut request = tonic::Request::new(());
883        apply_auth_metadata(&auth, &mut request).await.unwrap();
884        let val = request
885            .metadata()
886            .get("authorization")
887            .unwrap()
888            .to_str()
889            .unwrap();
890        assert_eq!(val, "Bearer my-secret-token");
891    }
892
893    #[tokio::test]
894    async fn test_auth_config_none_no_metadata() {
895        let auth = AuthConfig::None;
896        let mut request = tonic::Request::new(());
897        apply_auth_metadata(&auth, &mut request).await.unwrap();
898        assert!(request.metadata().get("authorization").is_none());
899    }
900
901    #[tokio::test]
902    async fn test_auth_config_google_scaffold_no_metadata() {
903        let auth = AuthConfig::GoogleServiceAccount {
904            json_path: "/path/to/sa.json".to_string(),
905        };
906        let mut request = tonic::Request::new(());
907        apply_auth_metadata(&auth, &mut request).await.unwrap();
908        assert!(request.metadata().get("authorization").is_none());
909    }
910
911    #[tokio::test]
912    async fn test_auth_config_oauth2_sets_bearer() {
913        #[derive(Debug)]
914        struct MockProvider;
915        #[async_trait::async_trait]
916        impl camel_auth::TokenProvider for MockProvider {
917            async fn get_token(&self) -> Result<String, camel_auth::AuthError> {
918                Ok("mock-oauth2-token".to_string())
919            }
920        }
921        let auth = AuthConfig::OAuth2 {
922            token_provider: std::sync::Arc::new(MockProvider),
923        };
924        let mut request = tonic::Request::new(());
925        apply_auth_metadata(&auth, &mut request).await.unwrap();
926        let auth_header = request.metadata().get("authorization").unwrap();
927        assert_eq!(auth_header, "Bearer mock-oauth2-token");
928    }
929
930    #[tokio::test]
931    async fn test_auth_config_oauth2_failure_returns_error() {
932        #[derive(Debug)]
933        struct FailingProvider;
934        #[async_trait::async_trait]
935        impl camel_auth::TokenProvider for FailingProvider {
936            async fn get_token(&self) -> Result<String, camel_auth::AuthError> {
937                Err(camel_auth::AuthError::ProviderUnavailable(
938                    "mock failure".into(),
939                ))
940            }
941        }
942        let auth = AuthConfig::OAuth2 {
943            token_provider: std::sync::Arc::new(FailingProvider),
944        };
945        let mut request = tonic::Request::new(());
946        let result = apply_auth_metadata(&auth, &mut request).await;
947        assert!(result.is_err());
948        assert!(request.metadata().get("authorization").is_none());
949    }
950
951    // ── GRPC-008: InterceptorConfig tests ──────────────────────────────────
952
953    #[test]
954    fn test_interceptor_config_default_empty() {
955        let ic = InterceptorConfig::default();
956        assert!(ic.interceptors.is_empty());
957    }
958
959    #[test]
960    fn test_interceptor_config_deserialize() {
961        let ic: InterceptorConfig = serde_json::from_value(serde_json::json!({
962            "interceptors": ["logging", "auth"]
963        }))
964        .unwrap();
965        assert_eq!(ic.interceptors.len(), 2);
966        assert_eq!(ic.interceptors[0], "logging");
967        assert_eq!(ic.interceptors[1], "auth");
968    }
969
970    // ── GRPC-009: ConsumerStrategy tests ───────────────────────────────────
971
972    #[test]
973    fn test_consumer_strategy_default() {
974        assert_eq!(ConsumerStrategy::default(), ConsumerStrategy::RoundRobin);
975    }
976
977    #[test]
978    fn test_consumer_strategy_from_str() {
979        assert_eq!(
980            ConsumerStrategy::from_str("roundRobin").unwrap(),
981            ConsumerStrategy::RoundRobin
982        );
983        assert_eq!(
984            ConsumerStrategy::from_str("first").unwrap(),
985            ConsumerStrategy::First
986        );
987        assert_eq!(
988            ConsumerStrategy::from_str("last").unwrap(),
989            ConsumerStrategy::Last
990        );
991    }
992
993    #[test]
994    fn test_consumer_strategy_display() {
995        assert_eq!(ConsumerStrategy::RoundRobin.to_string(), "roundRobin");
996        assert_eq!(ConsumerStrategy::First.to_string(), "first");
997        assert_eq!(ConsumerStrategy::Last.to_string(), "last");
998    }
999
1000    #[test]
1001    fn test_consumer_strategy_invalid() {
1002        assert!(ConsumerStrategy::from_str("invalid").is_err());
1003    }
1004
1005    // ── GRPC-009: ProducerStrategy tests ───────────────────────────────────
1006
1007    #[test]
1008    fn test_producer_strategy_default() {
1009        assert_eq!(ProducerStrategy::default(), ProducerStrategy::RequestReply);
1010    }
1011
1012    #[test]
1013    fn test_producer_strategy_from_str() {
1014        assert_eq!(
1015            ProducerStrategy::from_str("fireAndForget").unwrap(),
1016            ProducerStrategy::FireAndForget
1017        );
1018        assert_eq!(
1019            ProducerStrategy::from_str("requestReply").unwrap(),
1020            ProducerStrategy::RequestReply
1021        );
1022    }
1023
1024    #[test]
1025    fn test_producer_strategy_display() {
1026        assert_eq!(ProducerStrategy::FireAndForget.to_string(), "fireAndForget");
1027        assert_eq!(ProducerStrategy::RequestReply.to_string(), "requestReply");
1028    }
1029
1030    #[test]
1031    fn test_producer_strategy_invalid() {
1032        assert!(ProducerStrategy::from_str("invalid").is_err());
1033    }
1034
1035    // ── GRPC-013: Debug redaction tests ────────────────────────────────────
1036
1037    #[test]
1038    fn test_grpc_config_debug_redacts_metadata() {
1039        let config: GrpcConfig = serde_json::from_value(serde_json::json!({
1040            "metadata": "secret-key=value"
1041        }))
1042        .unwrap();
1043        let debug_str = format!("{config:?}");
1044        assert!(debug_str.contains("[REDACTED]"));
1045        assert!(!debug_str.contains("secret-key=value"));
1046    }
1047
1048    #[test]
1049    fn test_grpc_config_debug_no_redaction_without_secrets() {
1050        let config: GrpcConfig = serde_json::from_value(serde_json::json!({
1051            "protoFile": "test.proto"
1052        }))
1053        .unwrap();
1054        let debug_str = format!("{config:?}");
1055        assert!(debug_str.contains("test.proto"));
1056    }
1057
1058    #[test]
1059    fn test_auth_config_debug_redacts_bearer_token() {
1060        let auth = AuthConfig::Bearer {
1061            token: "super-secret-token".to_string(),
1062        };
1063        let debug_str = format!("{auth:?}");
1064        assert!(debug_str.contains("[REDACTED]"));
1065        assert!(!debug_str.contains("super-secret-token"));
1066    }
1067
1068    #[test]
1069    fn test_auth_config_debug_redacts_google_json_path() {
1070        let auth = AuthConfig::GoogleServiceAccount {
1071            json_path: "/secret/sa.json".to_string(),
1072        };
1073        let debug_str = format!("{auth:?}");
1074        assert!(debug_str.contains("[REDACTED]"));
1075        assert!(!debug_str.contains("/secret/sa.json"));
1076    }
1077
1078    #[test]
1079    fn test_auth_config_debug_none_is_clean() {
1080        let auth = AuthConfig::None;
1081        let debug_str = format!("{auth:?}");
1082        assert_eq!(debug_str, "None");
1083    }
1084
1085    // ── GRPC-007: Bearer token parsed from URI ─────────────────────────────
1086
1087    #[test]
1088    fn test_parse_grpc_uri_bearer_token() {
1089        let uri = "grpc://localhost:50051/pkg.Svc/Method?bearerToken=my-token";
1090        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
1091        match config.auth {
1092            AuthConfig::Bearer { ref token } => assert_eq!(token, "my-token"),
1093            _ => panic!("expected Bearer auth"),
1094        }
1095    }
1096
1097    // ── retry: NetworkRetryPolicy tests ────────────────────────────────────
1098
1099    #[test]
1100    fn grpc_config_has_retry_policy_toml() {
1101        let toml_str = r#"
1102            protoFile = "helloworld.proto"
1103            service = "Greeter"
1104            method = "SayHello"
1105            [retry]
1106            max_attempts = 3
1107            initial_delay_ms = 500
1108        "#;
1109        let cfg: GrpcConfig = toml::from_str(toml_str).expect("parse");
1110        assert_eq!(cfg.retry.max_attempts, 3);
1111        assert_eq!(
1112            cfg.retry.initial_delay,
1113            std::time::Duration::from_millis(500)
1114        );
1115    }
1116
1117    #[test]
1118    fn grpc_config_retry_defaults_when_not_specified() {
1119        let toml_str = r#"
1120            protoFile = "helloworld.proto"
1121        "#;
1122        let cfg: GrpcConfig = toml::from_str(toml_str).expect("parse");
1123        assert_eq!(
1124            cfg.retry,
1125            camel_component_api::NetworkRetryPolicy::default()
1126        );
1127    }
1128
1129    // ── GRPC-009: Strategy parsed from URI ─────────────────────────────────
1130
1131    #[test]
1132    fn test_parse_grpc_uri_consumer_strategy() {
1133        let uri = "grpc://localhost:50051/pkg.Svc/Method?consumerStrategy=first";
1134        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
1135        assert_eq!(config.consumer_strategy, ConsumerStrategy::First);
1136    }
1137
1138    #[test]
1139    fn test_parse_grpc_uri_producer_strategy() {
1140        let uri = "grpc://localhost:50051/pkg.Svc/Method?producerStrategy=fireAndForget";
1141        let (_, _, _, _, config) = parse_grpc_uri(uri).unwrap();
1142        assert_eq!(config.producer_strategy, ProducerStrategy::FireAndForget);
1143    }
1144}