Skip to main content

open_feature_flagd/resolver/
rpc.rs

1//! # RPC Flag Resolver
2//!
3//! Evaluates feature flags using gRPC protocol with the flagd service.
4//!
5//! ## Features
6//!
7//! * High-performance gRPC-based flag evaluation
8//! * Bi-directional streaming support
9//! * Event-driven configuration updates
10//! * Type-safe evaluation
11//! * Structured error handling
12//! * Comprehensive logging
13//!
14//! ## Supported Types
15//!
16//! * Boolean flags
17//! * String flags
18//! * Integer flags
19//! * Float flags
20//! * Structured flags
21//!
22//! ## Example
23//!
24//! ```rust,no_run
25//! use open_feature_flagd::resolver::rpc::RpcResolver;
26//! use open_feature_flagd::FlagdOptions;
27//! use open_feature::provider::FeatureProvider;
28//! use open_feature::EvaluationContext;
29//!
30//! #[tokio::main]
31//! async fn main() {
32//!     let options = FlagdOptions {
33//!         host: "localhost".to_string(),
34//!         port: 8013,
35//!         deadline_ms: 500,
36//!         ..Default::default()
37//!     };
38//!     let resolver = RpcResolver::new(&options).await.unwrap();
39//!     let context = EvaluationContext::default();
40//!
41//!     let result = resolver.resolve_bool_value("my-flag", &context).await.unwrap();
42//!     println!("Flag value: {}", result.value);
43//! }
44//! ```
45
46#[allow(unused_imports)]
47use crate::flagd::evaluation::v1::EventStreamRequest;
48use crate::flagd::evaluation::v1::{
49    ResolveBooleanRequest, ResolveBooleanResponse, ResolveFloatRequest, ResolveFloatResponse,
50    ResolveIntRequest, ResolveIntResponse, ResolveObjectRequest, ResolveObjectResponse,
51    ResolveStringRequest, ResolveStringResponse, service_client::ServiceClient,
52};
53use crate::{FlagdOptions, convert_context, convert_proto_struct_to_struct_value};
54use async_trait::async_trait;
55use hyper_util::rt::TokioIo;
56use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetails};
57use open_feature::{
58    EvaluationContext, EvaluationError, EvaluationErrorCode, EvaluationReason, FlagMetadata,
59    FlagMetadataValue, StructValue,
60};
61use std::collections::HashMap;
62use std::sync::OnceLock;
63use std::time::Duration;
64use tokio::net::UnixStream;
65use tokio::time::sleep;
66use tonic::transport::{Channel, Endpoint, Uri};
67use tower::service_fn;
68use tracing::{debug, error, instrument, warn};
69
70use super::common::upstream::UpstreamConfig;
71
72type ClientType = ServiceClient<Channel>;
73
74fn convert_proto_metadata(metadata: prost_types::Struct) -> FlagMetadata {
75    let mut values = HashMap::new();
76    for (k, v) in metadata.fields {
77        let metadata_value = match v.kind.unwrap() {
78            prost_types::value::Kind::BoolValue(b) => FlagMetadataValue::Bool(b),
79            prost_types::value::Kind::NumberValue(n) => FlagMetadataValue::Float(n),
80            prost_types::value::Kind::StringValue(s) => FlagMetadataValue::String(s),
81            _ => FlagMetadataValue::String("unsupported".to_string()),
82        };
83        values.insert(k, metadata_value);
84    }
85    FlagMetadata { values }
86}
87
88/// Maps gRPC status codes to OpenFeature error codes
89///
90/// This ensures consistent error handling across different resolver types
91/// and proper conformance with the OpenFeature specification.
92fn map_grpc_status_to_error_code(status: &tonic::Status) -> EvaluationErrorCode {
93    use tonic::Code;
94    match status.code() {
95        Code::NotFound => EvaluationErrorCode::FlagNotFound,
96        Code::InvalidArgument => EvaluationErrorCode::InvalidContext,
97        Code::Unauthenticated | Code::PermissionDenied => {
98            EvaluationErrorCode::General("authentication/authorization error".to_string())
99        }
100        Code::FailedPrecondition => EvaluationErrorCode::TypeMismatch,
101        Code::DeadlineExceeded | Code::Cancelled => {
102            EvaluationErrorCode::General("request timeout or cancelled".to_string())
103        }
104        Code::Unavailable => EvaluationErrorCode::General("service unavailable".to_string()),
105        _ => EvaluationErrorCode::General(format!("{:?}", status.code())),
106    }
107}
108
109pub struct RpcResolver {
110    client: ClientType,
111    metadata: OnceLock<ProviderMetadata>,
112}
113
114impl RpcResolver {
115    #[instrument(skip(options))]
116    pub async fn new(
117        options: &FlagdOptions,
118    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
119        debug!("initializing RPC resolver connection to {}", options.host);
120
121        let mut retry_delay = Duration::from_millis(options.retry_backoff_ms as u64);
122        let mut attempts = 0;
123
124        loop {
125            match RpcResolver::establish_connection(options).await {
126                Ok(client) => {
127                    debug!("Successfully established RPC connection");
128                    return Ok(Self {
129                        client,
130                        metadata: OnceLock::new(),
131                    });
132                }
133                Err(e) => {
134                    attempts += 1;
135                    if attempts >= options.retry_grace_period {
136                        error!("Failed to establish connection after {} attempts", attempts);
137                        return Err(e);
138                    }
139
140                    warn!(
141                        "Connection attempt {} failed, retrying in {}ms: {}",
142                        attempts,
143                        retry_delay.as_millis(),
144                        e
145                    );
146
147                    sleep(retry_delay).await;
148                    retry_delay = Duration::from_millis((retry_delay.as_millis() * 2) as u64)
149                        .min(Duration::from_millis(options.retry_backoff_max_ms as u64));
150                }
151            }
152        }
153    }
154
155    async fn establish_connection(
156        options: &FlagdOptions,
157    ) -> Result<ClientType, Box<dyn std::error::Error + Send + Sync>> {
158        if let Some(socket_path) = &options.socket_path {
159            debug!("Attempting Unix socket connection to: {}", socket_path);
160            let socket_path = socket_path.clone();
161            let channel = Endpoint::try_from("http://[::]:50051")?
162                .connect_with_connector(service_fn(move |_: Uri| {
163                    let path = socket_path.clone();
164                    async move {
165                        let stream = UnixStream::connect(path).await?;
166                        Ok::<_, std::io::Error>(TokioIo::new(stream))
167                    }
168                }))
169                .await?;
170
171            return Ok(ServiceClient::new(channel));
172        }
173
174        let target = options
175            .target_uri
176            .clone()
177            .unwrap_or_else(|| format!("{}:{}", options.host, options.port));
178        let upstream_config =
179            UpstreamConfig::new(target, false, options.tls, options.cert_path.as_deref())?;
180        let mut endpoint = upstream_config.endpoint().clone();
181
182        // Extend support for envoy names resolution
183        if let Some(uri) = &options.target_uri
184            && uri.starts_with("envoy://")
185        {
186            // Expected format: envoy://<host:port>/<desired_authority>
187            let without_prefix = uri.trim_start_matches("envoy://");
188            let segments: Vec<&str> = without_prefix.split('/').collect();
189            if segments.len() >= 2 {
190                let authority_str = segments[1];
191                // Create a full URI from the authority for endpoint.origin()
192                let authority_uri =
193                    std::str::FromStr::from_str(&format!("http://{}", authority_str))?;
194                endpoint = endpoint.origin(authority_uri);
195            }
196        }
197
198        let channel = endpoint
199            .timeout(Duration::from_millis(options.deadline_ms as u64))
200            .connect()
201            .await?;
202
203        Ok(ServiceClient::new(channel))
204    }
205}
206
207#[async_trait]
208impl FeatureProvider for RpcResolver {
209    fn metadata(&self) -> &ProviderMetadata {
210        self.metadata.get_or_init(|| ProviderMetadata::new("flagd"))
211    }
212
213    #[instrument(skip(self, context))]
214    async fn resolve_bool_value(
215        &self,
216        flag_key: &str,
217        context: &EvaluationContext,
218    ) -> Result<ResolutionDetails<bool>, EvaluationError> {
219        debug!(flag_key, "resolving boolean flag");
220        let request = ResolveBooleanRequest {
221            flag_key: flag_key.to_string(),
222            context: convert_context(context),
223        };
224
225        match self.client.clone().resolve_boolean(request).await {
226            Ok(response) => {
227                let inner: ResolveBooleanResponse = response.into_inner();
228                debug!(flag_key, value = inner.value, reason = %inner.reason, "boolean flag resolved");
229                Ok(ResolutionDetails {
230                    value: inner.value,
231                    variant: Some(inner.variant),
232                    reason: Some(EvaluationReason::Other(inner.reason)),
233                    flag_metadata: inner.metadata.map(convert_proto_metadata),
234                })
235            }
236            Err(status) => {
237                error!(flag_key, error = %status, "failed to resolve boolean flag");
238                Err(EvaluationError {
239                    code: map_grpc_status_to_error_code(&status),
240                    message: Some(status.message().to_string()),
241                })
242            }
243        }
244    }
245
246    #[instrument(skip(self, context))]
247    async fn resolve_string_value(
248        &self,
249        flag_key: &str,
250        context: &EvaluationContext,
251    ) -> Result<ResolutionDetails<String>, EvaluationError> {
252        debug!(flag_key, "resolving string flag");
253        let request = ResolveStringRequest {
254            flag_key: flag_key.to_string(),
255            context: convert_context(context),
256        };
257
258        match self.client.clone().resolve_string(request).await {
259            Ok(response) => {
260                let inner: ResolveStringResponse = response.into_inner();
261                debug!(flag_key, value = %inner.value, reason = %inner.reason, "string flag resolved");
262                Ok(ResolutionDetails {
263                    value: inner.value,
264                    variant: Some(inner.variant),
265                    reason: Some(EvaluationReason::Other(inner.reason)),
266                    flag_metadata: inner.metadata.map(convert_proto_metadata),
267                })
268            }
269            Err(status) => {
270                error!(flag_key, error = %status, "failed to resolve string flag");
271                Err(EvaluationError {
272                    code: map_grpc_status_to_error_code(&status),
273                    message: Some(status.message().to_string()),
274                })
275            }
276        }
277    }
278
279    #[instrument(skip(self, context))]
280    async fn resolve_float_value(
281        &self,
282        flag_key: &str,
283        context: &EvaluationContext,
284    ) -> Result<ResolutionDetails<f64>, EvaluationError> {
285        debug!(flag_key, "resolving float flag");
286        let request = ResolveFloatRequest {
287            flag_key: flag_key.to_string(),
288            context: convert_context(context),
289        };
290
291        match self.client.clone().resolve_float(request).await {
292            Ok(response) => {
293                let inner: ResolveFloatResponse = response.into_inner();
294                debug!(flag_key, value = inner.value, reason = %inner.reason, "float flag resolved");
295                Ok(ResolutionDetails {
296                    value: inner.value,
297                    variant: Some(inner.variant),
298                    reason: Some(EvaluationReason::Other(inner.reason)),
299                    flag_metadata: inner.metadata.map(convert_proto_metadata),
300                })
301            }
302            Err(status) => {
303                error!(flag_key, error = %status, "failed to resolve float flag");
304                Err(EvaluationError {
305                    code: map_grpc_status_to_error_code(&status),
306                    message: Some(status.message().to_string()),
307                })
308            }
309        }
310    }
311
312    #[instrument(skip(self, context))]
313    async fn resolve_int_value(
314        &self,
315        flag_key: &str,
316        context: &EvaluationContext,
317    ) -> Result<ResolutionDetails<i64>, EvaluationError> {
318        debug!(flag_key, "resolving integer flag");
319        let request = ResolveIntRequest {
320            flag_key: flag_key.to_string(),
321            context: convert_context(context),
322        };
323
324        match self.client.clone().resolve_int(request).await {
325            Ok(response) => {
326                let inner: ResolveIntResponse = response.into_inner();
327                debug!(flag_key, value = inner.value, reason = %inner.reason, "integer flag resolved");
328                Ok(ResolutionDetails {
329                    value: inner.value,
330                    variant: Some(inner.variant),
331                    reason: Some(EvaluationReason::Other(inner.reason)),
332                    flag_metadata: inner.metadata.map(convert_proto_metadata),
333                })
334            }
335            Err(status) => {
336                error!(flag_key, error = %status, "failed to resolve integer flag");
337                Err(EvaluationError {
338                    code: map_grpc_status_to_error_code(&status),
339                    message: Some(status.message().to_string()),
340                })
341            }
342        }
343    }
344
345    #[instrument(skip(self, context))]
346    async fn resolve_struct_value(
347        &self,
348        flag_key: &str,
349        context: &EvaluationContext,
350    ) -> Result<ResolutionDetails<StructValue>, EvaluationError> {
351        debug!(flag_key, "resolving struct flag");
352        let request = ResolveObjectRequest {
353            flag_key: flag_key.to_string(),
354            context: convert_context(context),
355        };
356
357        match self.client.clone().resolve_object(request).await {
358            Ok(response) => {
359                let inner: ResolveObjectResponse = response.into_inner();
360                debug!(flag_key, reason = %inner.reason, "struct flag resolved");
361                Ok(ResolutionDetails {
362                    value: convert_proto_struct_to_struct_value(inner.value.unwrap_or_default()),
363                    variant: Some(inner.variant),
364                    reason: Some(EvaluationReason::Other(inner.reason)),
365                    flag_metadata: inner.metadata.map(convert_proto_metadata),
366                })
367            }
368            Err(status) => {
369                error!(flag_key, error = %status, "failed to resolve struct flag");
370                Err(EvaluationError {
371                    code: map_grpc_status_to_error_code(&status),
372                    message: Some(status.message().to_string()),
373                })
374            }
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use crate::flagd::evaluation::v1::{
383        EventStreamResponse, ResolveAllRequest, ResolveAllResponse,
384        service_server::{Service, ServiceServer},
385    };
386    use futures_core::Stream;
387    use serial_test::serial;
388    use std::{collections::BTreeMap, pin::Pin};
389    use tempfile::TempDir;
390    use test_log::test;
391    use tokio::net::UnixListener;
392    use tokio::sync::oneshot;
393    use tokio::{net::TcpListener, time::Instant};
394    use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
395    use tonic::{Request, Response, Status, transport::Server};
396
397    pub struct MockFlagService;
398
399    #[tonic::async_trait]
400    impl Service for MockFlagService {
401        async fn resolve_boolean(
402            &self,
403            _request: Request<ResolveBooleanRequest>,
404        ) -> Result<Response<ResolveBooleanResponse>, Status> {
405            Ok(Response::new(ResolveBooleanResponse {
406                value: true,
407                reason: "test".to_string(),
408                variant: "test".to_string(),
409                metadata: Some(create_test_metadata()),
410            }))
411        }
412
413        async fn resolve_string(
414            &self,
415            _request: Request<ResolveStringRequest>,
416        ) -> Result<Response<ResolveStringResponse>, Status> {
417            Ok(Response::new(ResolveStringResponse {
418                value: "test".to_string(),
419                reason: "test".to_string(),
420                variant: "test".to_string(),
421                metadata: Some(create_test_metadata()),
422            }))
423        }
424
425        async fn resolve_float(
426            &self,
427            _request: Request<ResolveFloatRequest>,
428        ) -> Result<Response<ResolveFloatResponse>, Status> {
429            Ok(Response::new(ResolveFloatResponse {
430                value: 1.0,
431                reason: "test".to_string(),
432                variant: "test".to_string(),
433                metadata: Some(create_test_metadata()),
434            }))
435        }
436
437        async fn resolve_int(
438            &self,
439            _request: Request<ResolveIntRequest>,
440        ) -> Result<Response<ResolveIntResponse>, Status> {
441            Ok(Response::new(ResolveIntResponse {
442                value: 42,
443                reason: "test".to_string(),
444                variant: "test".to_string(),
445                metadata: Some(create_test_metadata()),
446            }))
447        }
448
449        async fn resolve_object(
450            &self,
451            _request: Request<ResolveObjectRequest>,
452        ) -> Result<Response<ResolveObjectResponse>, Status> {
453            let mut fields = BTreeMap::new();
454            fields.insert(
455                "key".to_string(),
456                prost_types::Value {
457                    kind: Some(prost_types::value::Kind::StringValue("value".to_string())),
458                },
459            );
460
461            Ok(Response::new(ResolveObjectResponse {
462                value: Some(prost_types::Struct { fields }),
463                reason: "test".to_string(),
464                variant: "test".to_string(),
465                metadata: Some(create_test_metadata()),
466            }))
467        }
468
469        async fn resolve_all(
470            &self,
471            _request: Request<ResolveAllRequest>,
472        ) -> Result<Response<ResolveAllResponse>, Status> {
473            Ok(Response::new(ResolveAllResponse {
474                flags: Default::default(),
475                metadata: Some(create_test_metadata()),
476            }))
477        }
478
479        type EventStreamStream =
480            Pin<Box<dyn Stream<Item = Result<EventStreamResponse, Status>> + Send + 'static>>;
481
482        async fn event_stream(
483            &self,
484            _request: Request<EventStreamRequest>,
485        ) -> Result<Response<Self::EventStreamStream>, Status> {
486            let output = tokio_stream::empty();
487            Ok(Response::new(Box::pin(output)))
488        }
489    }
490
491    fn create_test_metadata() -> prost_types::Struct {
492        let mut fields = BTreeMap::new();
493        fields.insert(
494            "bool_key".to_string(),
495            prost_types::Value {
496                kind: Some(prost_types::value::Kind::BoolValue(true)),
497            },
498        );
499        fields.insert(
500            "number_key".to_string(),
501            prost_types::Value {
502                kind: Some(prost_types::value::Kind::NumberValue(42.0)),
503            },
504        );
505        fields.insert(
506            "string_key".to_string(),
507            prost_types::Value {
508                kind: Some(prost_types::value::Kind::StringValue("test".to_string())),
509            },
510        );
511        prost_types::Struct { fields }
512    }
513
514    struct TestServer {
515        target: String,
516        _shutdown: oneshot::Sender<()>,
517    }
518
519    impl TestServer {
520        async fn new() -> Self {
521            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
522            let addr = listener.local_addr().unwrap();
523            let (tx, rx) = oneshot::channel();
524
525            let server = tonic::transport::Server::builder()
526                .add_service(ServiceServer::new(MockFlagService))
527                .serve_with_incoming(TcpListenerStream::new(listener));
528
529            tokio::spawn(async move {
530                tokio::select! {
531                    _ = server => {},
532                    _ = rx => {},
533                }
534            });
535
536            Self {
537                target: format!("{}:{}", addr.ip(), addr.port()),
538                _shutdown: tx,
539            }
540        }
541    }
542
543    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
544    async fn test_dns_resolution() {
545        let server = TestServer::new().await;
546        // Add delay to ensure server is ready
547        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
548        let options = FlagdOptions {
549            host: server.target.clone(),
550            port: 8013,
551            target_uri: None,
552            deadline_ms: 500,
553            ..Default::default()
554        };
555        let resolver = RpcResolver::new(&options).await.unwrap();
556        let context = EvaluationContext::default().with_targeting_key("test-user");
557
558        let result = resolver
559            .resolve_bool_value("test-flag", &context)
560            .await
561            .unwrap();
562        assert_eq!(result.value, true);
563    }
564
565    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
566    async fn test_envoy_resolution() {
567        let server = TestServer::new().await;
568        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
569
570        let options = FlagdOptions {
571            host: server.target.clone(),
572            port: 8013,
573            target_uri: Some(format!("envoy://{}/flagd-service", server.target)),
574            deadline_ms: 500,
575            ..Default::default()
576        };
577
578        let resolver = RpcResolver::new(&options).await.unwrap();
579        let context = EvaluationContext::default().with_targeting_key("test-user");
580
581        let result = resolver
582            .resolve_bool_value("test-flag", &context)
583            .await
584            .unwrap();
585        assert_eq!(result.value, true);
586    }
587
588    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
589    async fn test_value_resolution() {
590        let server = TestServer::new().await;
591        let options = FlagdOptions {
592            host: server.target.clone(),
593            port: 8013,
594            target_uri: None,
595            deadline_ms: 500,
596            ..Default::default()
597        };
598        let resolver = RpcResolver::new(&options).await.unwrap();
599        let context = EvaluationContext::default().with_targeting_key("test-user");
600
601        // Test all value types
602        assert_eq!(
603            resolver
604                .resolve_bool_value("test-flag", &context)
605                .await
606                .unwrap()
607                .value,
608            true
609        );
610        assert_eq!(
611            resolver
612                .resolve_string_value("test-flag", &context)
613                .await
614                .unwrap()
615                .value,
616            "test"
617        );
618        assert_eq!(
619            resolver
620                .resolve_float_value("test-flag", &context)
621                .await
622                .unwrap()
623                .value,
624            1.0
625        );
626        assert_eq!(
627            resolver
628                .resolve_int_value("test-flag", &context)
629                .await
630                .unwrap()
631                .value,
632            42
633        );
634
635        let struct_result = resolver
636            .resolve_struct_value("test-flag", &context)
637            .await
638            .unwrap();
639        assert!(!struct_result.value.fields.is_empty());
640    }
641
642    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
643    async fn test_metadata() {
644        let metadata = create_test_metadata();
645        let flag_metadata = convert_proto_metadata(metadata);
646
647        assert!(matches!(
648            flag_metadata.values.get("bool_key"),
649            Some(FlagMetadataValue::Bool(true))
650        ));
651        assert!(matches!(
652            flag_metadata.values.get("number_key"),
653            Some(FlagMetadataValue::Float(42.0))
654        ));
655        assert!(matches!(
656            flag_metadata.values.get("string_key"),
657            Some(FlagMetadataValue::String(s)) if s == "test"
658        ));
659    }
660
661    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
662    async fn test_standard_connection() {
663        let server = TestServer::new().await;
664        let parts: Vec<&str> = server.target.split(':').collect();
665        let options = FlagdOptions {
666            host: parts[0].to_string(),
667            port: parts[1].parse().unwrap(),
668            target_uri: None,
669            deadline_ms: 500,
670            ..Default::default()
671        };
672
673        let resolver = RpcResolver::new(&options).await.unwrap();
674        let context = EvaluationContext::default().with_targeting_key("test-user");
675
676        let result = resolver
677            .resolve_bool_value("test-flag", &context)
678            .await
679            .unwrap();
680        assert_eq!(result.value, true);
681    }
682
683    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
684    async fn test_envoy_connection() {
685        let server = TestServer::new().await;
686        let parts: Vec<&str> = server.target.split(':').collect();
687        let options = FlagdOptions {
688            host: parts[0].to_string(),
689            port: parts[1].parse().unwrap(),
690            target_uri: Some(format!("envoy://{}/flagd-service", server.target)),
691            deadline_ms: 500,
692            ..Default::default()
693        };
694
695        let resolver = RpcResolver::new(&options).await.unwrap();
696        let context = EvaluationContext::default().with_targeting_key("test-user");
697
698        let result = resolver
699            .resolve_bool_value("test-flag", &context)
700            .await
701            .unwrap();
702        assert_eq!(result.value, true);
703    }
704
705    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
706    #[serial]
707    async fn test_retry_mechanism() {
708        // Bind to a port but don't accept connections - this causes immediate connection failures
709        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
710        let addr = listener.local_addr().unwrap();
711        // Drop the listener immediately to ensure the port rejects connections
712        drop(listener);
713
714        let options = FlagdOptions {
715            host: addr.ip().to_string(),
716            port: addr.port(),
717            retry_backoff_ms: 100,
718            retry_backoff_max_ms: 400,
719            retry_grace_period: 3,
720            deadline_ms: 100, // Short timeout for fast failures
721            ..Default::default()
722        };
723
724        let start = Instant::now();
725        let result = RpcResolver::new(&options).await;
726        let duration = start.elapsed();
727
728        assert!(result.is_err());
729        // Should take at least 300ms (100ms + 200ms delays)
730        assert!(duration.as_millis() >= 300);
731        // Allow some buffer for system overhead and processing time
732        assert!(duration.as_millis() < 600);
733    }
734
735    #[test(tokio::test)]
736    async fn test_successful_retry() {
737        let server = TestServer::new().await;
738        let options = FlagdOptions {
739            host: server.target.clone(),
740            port: 8013,
741            retry_backoff_ms: 100,
742            retry_backoff_max_ms: 400,
743            retry_grace_period: 3,
744            ..Default::default()
745        };
746
747        let resolver = RpcResolver::new(&options).await.unwrap();
748        let context = EvaluationContext::default();
749
750        let result = resolver
751            .resolve_bool_value("test-flag", &context)
752            .await
753            .unwrap();
754        assert_eq!(result.value, true);
755    }
756
757    #[test(tokio::test)]
758    async fn test_rpc_unix_socket_connection() {
759        let tmp_dir = TempDir::new().unwrap();
760        let socket_path = tmp_dir.path().join("test.sock");
761        let socket_path_str = socket_path.to_str().unwrap().to_string();
762
763        // Start mock gRPC server with proper shutdown handling
764        let server_handle = tokio::spawn(async move {
765            let uds = UnixListener::bind(&socket_path).unwrap();
766            Server::builder()
767                .add_service(ServiceServer::new(MockFlagService))
768                .serve_with_incoming(UnixListenerStream::new(uds))
769                .await
770                .unwrap();
771        });
772
773        // Give server time to start
774        tokio::time::sleep(Duration::from_millis(100)).await;
775
776        let options = FlagdOptions {
777            socket_path: Some(socket_path_str),
778            retry_backoff_ms: 100,
779            retry_backoff_max_ms: 400,
780            retry_grace_period: 3,
781            ..Default::default()
782        };
783
784        let resolver = RpcResolver::new(&options).await;
785        assert!(resolver.is_ok());
786
787        // Clean shutdown
788        server_handle.abort();
789    }
790
791    #[test]
792    fn test_grpc_error_code_mapping() {
793        use tonic::Code;
794
795        // Test NOT_FOUND -> FlagNotFound
796        let status = tonic::Status::new(Code::NotFound, "Flag not found");
797        let error_code = map_grpc_status_to_error_code(&status);
798        assert!(matches!(error_code, EvaluationErrorCode::FlagNotFound));
799
800        // Test INVALID_ARGUMENT -> InvalidContext
801        let status = tonic::Status::new(Code::InvalidArgument, "Invalid context");
802        let error_code = map_grpc_status_to_error_code(&status);
803        assert!(matches!(error_code, EvaluationErrorCode::InvalidContext));
804
805        // Test UNAUTHENTICATED -> General
806        let status = tonic::Status::new(Code::Unauthenticated, "Not authenticated");
807        let error_code = map_grpc_status_to_error_code(&status);
808        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
809
810        // Test PERMISSION_DENIED -> General
811        let status = tonic::Status::new(Code::PermissionDenied, "Access denied");
812        let error_code = map_grpc_status_to_error_code(&status);
813        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
814
815        // Test FAILED_PRECONDITION -> TypeMismatch
816        let status = tonic::Status::new(Code::FailedPrecondition, "Type mismatch");
817        let error_code = map_grpc_status_to_error_code(&status);
818        assert!(matches!(error_code, EvaluationErrorCode::TypeMismatch));
819
820        // Test DEADLINE_EXCEEDED -> General
821        let status = tonic::Status::new(Code::DeadlineExceeded, "Timeout");
822        let error_code = map_grpc_status_to_error_code(&status);
823        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
824
825        // Test UNAVAILABLE -> General
826        let status = tonic::Status::new(Code::Unavailable, "Service unavailable");
827        let error_code = map_grpc_status_to_error_code(&status);
828        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
829    }
830}