open_feature_flagd/resolver/
rpc.rs

1//! # RPC Flag Resolver
2//!
3//! Evaluates feature flags using gRPC protocol with the flagd service.
4//!
5//! ## Features
6//!
7//! * High-performance gRPC-based flag evaluation
8//! * Bi-directional streaming support
9//! * Event-driven configuration updates
10//! * Type-safe evaluation
11//! * Structured error handling
12//! * Comprehensive logging
13//!
14//! ## Supported Types
15//!
16//! * Boolean flags
17//! * String flags
18//! * Integer flags
19//! * Float flags
20//! * Structured flags
21//!
22//! ## Example
23//!
24//! ```rust,no_run
25//! use open_feature_flagd::resolver::rpc::RpcResolver;
26//! use open_feature_flagd::FlagdOptions;
27//! use open_feature::provider::FeatureProvider;
28//! use open_feature::EvaluationContext;
29//!
30//! #[tokio::main]
31//! async fn main() {
32//!     let options = FlagdOptions {
33//!         host: "localhost".to_string(),
34//!         port: 8013,
35//!         deadline_ms: 500,
36//!         ..Default::default()
37//!     };
38//!     let resolver = RpcResolver::new(&options).await.unwrap();
39//!     let context = EvaluationContext::default();
40//!
41//!     let result = resolver.resolve_bool_value("my-flag", &context).await.unwrap();
42//!     println!("Flag value: {}", result.value);
43//! }
44//! ```
45
46#[allow(unused_imports)]
47use crate::flagd::evaluation::v1::EventStreamRequest;
48use crate::flagd::evaluation::v1::{
49    ResolveBooleanRequest, ResolveBooleanResponse, ResolveFloatRequest, ResolveFloatResponse,
50    ResolveIntRequest, ResolveIntResponse, ResolveObjectRequest, ResolveObjectResponse,
51    ResolveStringRequest, ResolveStringResponse, service_client::ServiceClient,
52};
53use crate::{FlagdOptions, convert_context, convert_proto_struct_to_struct_value};
54use async_trait::async_trait;
55use hyper_util::rt::TokioIo;
56use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetails};
57use open_feature::{
58    EvaluationContext, EvaluationError, EvaluationErrorCode, EvaluationReason, FlagMetadata,
59    FlagMetadataValue, StructValue,
60};
61use std::collections::HashMap;
62use std::sync::OnceLock;
63use std::time::Duration;
64use tokio::net::UnixStream;
65use tokio::time::sleep;
66use tonic::transport::{Channel, Endpoint, Uri};
67use tower::service_fn;
68use tracing::{debug, error, instrument, warn};
69
70use super::common::upstream::UpstreamConfig;
71
72type ClientType = ServiceClient<Channel>;
73
74fn convert_proto_metadata(metadata: prost_types::Struct) -> FlagMetadata {
75    let mut values = HashMap::new();
76    for (k, v) in metadata.fields {
77        let metadata_value = match v.kind.unwrap() {
78            prost_types::value::Kind::BoolValue(b) => FlagMetadataValue::Bool(b),
79            prost_types::value::Kind::NumberValue(n) => FlagMetadataValue::Float(n),
80            prost_types::value::Kind::StringValue(s) => FlagMetadataValue::String(s),
81            _ => FlagMetadataValue::String("unsupported".to_string()),
82        };
83        values.insert(k, metadata_value);
84    }
85    FlagMetadata { values }
86}
87
88/// Maps gRPC status codes to OpenFeature error codes
89///
90/// This ensures consistent error handling across different resolver types
91/// and proper conformance with the OpenFeature specification.
92fn map_grpc_status_to_error_code(status: &tonic::Status) -> EvaluationErrorCode {
93    use tonic::Code;
94    match status.code() {
95        Code::NotFound => EvaluationErrorCode::FlagNotFound,
96        Code::InvalidArgument => EvaluationErrorCode::InvalidContext,
97        Code::Unauthenticated | Code::PermissionDenied => {
98            EvaluationErrorCode::General("authentication/authorization error".to_string())
99        }
100        Code::FailedPrecondition => EvaluationErrorCode::TypeMismatch,
101        Code::DeadlineExceeded | Code::Cancelled => {
102            EvaluationErrorCode::General("request timeout or cancelled".to_string())
103        }
104        Code::Unavailable => EvaluationErrorCode::General("service unavailable".to_string()),
105        _ => EvaluationErrorCode::General(format!("{:?}", status.code())),
106    }
107}
108
109pub struct RpcResolver {
110    client: ClientType,
111    metadata: OnceLock<ProviderMetadata>,
112}
113
114impl RpcResolver {
115    #[instrument(skip(options))]
116    pub async fn new(
117        options: &FlagdOptions,
118    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
119        debug!("initializing RPC resolver connection to {}", options.host);
120
121        let mut retry_delay = Duration::from_millis(options.retry_backoff_ms as u64);
122        let mut attempts = 0;
123
124        loop {
125            match RpcResolver::establish_connection(options).await {
126                Ok(client) => {
127                    debug!("Successfully established RPC connection");
128                    return Ok(Self {
129                        client,
130                        metadata: OnceLock::new(),
131                    });
132                }
133                Err(e) => {
134                    attempts += 1;
135                    if attempts >= options.retry_grace_period {
136                        error!("Failed to establish connection after {} attempts", attempts);
137                        return Err(e);
138                    }
139
140                    warn!(
141                        "Connection attempt {} failed, retrying in {}ms: {}",
142                        attempts,
143                        retry_delay.as_millis(),
144                        e
145                    );
146
147                    sleep(retry_delay).await;
148                    retry_delay = Duration::from_millis((retry_delay.as_millis() * 2) as u64)
149                        .min(Duration::from_millis(options.retry_backoff_max_ms as u64));
150                }
151            }
152        }
153    }
154
155    async fn establish_connection(
156        options: &FlagdOptions,
157    ) -> Result<ClientType, Box<dyn std::error::Error + Send + Sync>> {
158        if let Some(socket_path) = &options.socket_path {
159            debug!("Attempting Unix socket connection to: {}", socket_path);
160            let socket_path = socket_path.clone();
161            let channel = Endpoint::try_from("http://[::]:50051")?
162                .connect_with_connector(service_fn(move |_: Uri| {
163                    let path = socket_path.clone();
164                    async move {
165                        let stream = UnixStream::connect(path).await?;
166                        Ok::<_, std::io::Error>(TokioIo::new(stream))
167                    }
168                }))
169                .await?;
170
171            return Ok(ServiceClient::new(channel));
172        }
173
174        let target = options
175            .target_uri
176            .clone()
177            .unwrap_or_else(|| format!("{}:{}", options.host, options.port));
178        let upstream_config = UpstreamConfig::new(target.replace("http://", ""), false)?;
179        let mut endpoint = upstream_config.endpoint().clone();
180
181        // Extend support for envoy names resolution
182        if let Some(uri) = &options.target_uri
183            && uri.starts_with("envoy://")
184        {
185            // Expected format: envoy://<host:port>/<desired_authority>
186            let without_prefix = uri.trim_start_matches("envoy://");
187            let segments: Vec<&str> = without_prefix.split('/').collect();
188            if segments.len() >= 2 {
189                let authority_str = segments[1];
190                // Create a full URI from the authority for endpoint.origin()
191                let authority_uri =
192                    std::str::FromStr::from_str(&format!("http://{}", authority_str))?;
193                endpoint = endpoint.origin(authority_uri);
194            }
195        }
196
197        let channel = endpoint
198            .timeout(Duration::from_millis(options.deadline_ms as u64))
199            .connect()
200            .await?;
201
202        Ok(ServiceClient::new(channel))
203    }
204}
205
206#[async_trait]
207impl FeatureProvider for RpcResolver {
208    fn metadata(&self) -> &ProviderMetadata {
209        self.metadata.get_or_init(|| ProviderMetadata::new("flagd"))
210    }
211
212    #[instrument(skip(self, context))]
213    async fn resolve_bool_value(
214        &self,
215        flag_key: &str,
216        context: &EvaluationContext,
217    ) -> Result<ResolutionDetails<bool>, EvaluationError> {
218        debug!(flag_key, "resolving boolean flag");
219        let request = ResolveBooleanRequest {
220            flag_key: flag_key.to_string(),
221            context: convert_context(context),
222        };
223
224        match self.client.clone().resolve_boolean(request).await {
225            Ok(response) => {
226                let inner: ResolveBooleanResponse = response.into_inner();
227                debug!(flag_key, value = inner.value, reason = %inner.reason, "boolean flag resolved");
228                Ok(ResolutionDetails {
229                    value: inner.value,
230                    variant: Some(inner.variant),
231                    reason: Some(EvaluationReason::Other(inner.reason)),
232                    flag_metadata: inner.metadata.map(convert_proto_metadata),
233                })
234            }
235            Err(status) => {
236                error!(flag_key, error = %status, "failed to resolve boolean flag");
237                Err(EvaluationError {
238                    code: map_grpc_status_to_error_code(&status),
239                    message: Some(status.message().to_string()),
240                })
241            }
242        }
243    }
244
245    #[instrument(skip(self, context))]
246    async fn resolve_string_value(
247        &self,
248        flag_key: &str,
249        context: &EvaluationContext,
250    ) -> Result<ResolutionDetails<String>, EvaluationError> {
251        debug!(flag_key, "resolving string flag");
252        let request = ResolveStringRequest {
253            flag_key: flag_key.to_string(),
254            context: convert_context(context),
255        };
256
257        match self.client.clone().resolve_string(request).await {
258            Ok(response) => {
259                let inner: ResolveStringResponse = response.into_inner();
260                debug!(flag_key, value = %inner.value, reason = %inner.reason, "string flag resolved");
261                Ok(ResolutionDetails {
262                    value: inner.value,
263                    variant: Some(inner.variant),
264                    reason: Some(EvaluationReason::Other(inner.reason)),
265                    flag_metadata: inner.metadata.map(convert_proto_metadata),
266                })
267            }
268            Err(status) => {
269                error!(flag_key, error = %status, "failed to resolve string flag");
270                Err(EvaluationError {
271                    code: map_grpc_status_to_error_code(&status),
272                    message: Some(status.message().to_string()),
273                })
274            }
275        }
276    }
277
278    #[instrument(skip(self, context))]
279    async fn resolve_float_value(
280        &self,
281        flag_key: &str,
282        context: &EvaluationContext,
283    ) -> Result<ResolutionDetails<f64>, EvaluationError> {
284        debug!(flag_key, "resolving float flag");
285        let request = ResolveFloatRequest {
286            flag_key: flag_key.to_string(),
287            context: convert_context(context),
288        };
289
290        match self.client.clone().resolve_float(request).await {
291            Ok(response) => {
292                let inner: ResolveFloatResponse = response.into_inner();
293                debug!(flag_key, value = inner.value, reason = %inner.reason, "float flag resolved");
294                Ok(ResolutionDetails {
295                    value: inner.value,
296                    variant: Some(inner.variant),
297                    reason: Some(EvaluationReason::Other(inner.reason)),
298                    flag_metadata: inner.metadata.map(convert_proto_metadata),
299                })
300            }
301            Err(status) => {
302                error!(flag_key, error = %status, "failed to resolve float flag");
303                Err(EvaluationError {
304                    code: map_grpc_status_to_error_code(&status),
305                    message: Some(status.message().to_string()),
306                })
307            }
308        }
309    }
310
311    #[instrument(skip(self, context))]
312    async fn resolve_int_value(
313        &self,
314        flag_key: &str,
315        context: &EvaluationContext,
316    ) -> Result<ResolutionDetails<i64>, EvaluationError> {
317        debug!(flag_key, "resolving integer flag");
318        let request = ResolveIntRequest {
319            flag_key: flag_key.to_string(),
320            context: convert_context(context),
321        };
322
323        match self.client.clone().resolve_int(request).await {
324            Ok(response) => {
325                let inner: ResolveIntResponse = response.into_inner();
326                debug!(flag_key, value = inner.value, reason = %inner.reason, "integer flag resolved");
327                Ok(ResolutionDetails {
328                    value: inner.value,
329                    variant: Some(inner.variant),
330                    reason: Some(EvaluationReason::Other(inner.reason)),
331                    flag_metadata: inner.metadata.map(convert_proto_metadata),
332                })
333            }
334            Err(status) => {
335                error!(flag_key, error = %status, "failed to resolve integer flag");
336                Err(EvaluationError {
337                    code: map_grpc_status_to_error_code(&status),
338                    message: Some(status.message().to_string()),
339                })
340            }
341        }
342    }
343
344    #[instrument(skip(self, context))]
345    async fn resolve_struct_value(
346        &self,
347        flag_key: &str,
348        context: &EvaluationContext,
349    ) -> Result<ResolutionDetails<StructValue>, EvaluationError> {
350        debug!(flag_key, "resolving struct flag");
351        let request = ResolveObjectRequest {
352            flag_key: flag_key.to_string(),
353            context: convert_context(context),
354        };
355
356        match self.client.clone().resolve_object(request).await {
357            Ok(response) => {
358                let inner: ResolveObjectResponse = response.into_inner();
359                debug!(flag_key, reason = %inner.reason, "struct flag resolved");
360                Ok(ResolutionDetails {
361                    value: convert_proto_struct_to_struct_value(inner.value.unwrap_or_default()),
362                    variant: Some(inner.variant),
363                    reason: Some(EvaluationReason::Other(inner.reason)),
364                    flag_metadata: inner.metadata.map(convert_proto_metadata),
365                })
366            }
367            Err(status) => {
368                error!(flag_key, error = %status, "failed to resolve struct flag");
369                Err(EvaluationError {
370                    code: map_grpc_status_to_error_code(&status),
371                    message: Some(status.message().to_string()),
372                })
373            }
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::flagd::evaluation::v1::{
382        EventStreamResponse, ResolveAllRequest, ResolveAllResponse,
383        service_server::{Service, ServiceServer},
384    };
385    use futures_core::Stream;
386    use serial_test::serial;
387    use std::{collections::BTreeMap, pin::Pin};
388    use tempfile::TempDir;
389    use test_log::test;
390    use tokio::net::UnixListener;
391    use tokio::sync::oneshot;
392    use tokio::{net::TcpListener, time::Instant};
393    use tokio_stream::wrappers::UnixListenerStream;
394    use tonic::{Request, Response, Status, transport::Server};
395
396    pub struct MockFlagService;
397
398    #[tonic::async_trait]
399    impl Service for MockFlagService {
400        async fn resolve_boolean(
401            &self,
402            _request: Request<ResolveBooleanRequest>,
403        ) -> Result<Response<ResolveBooleanResponse>, Status> {
404            Ok(Response::new(ResolveBooleanResponse {
405                value: true,
406                reason: "test".to_string(),
407                variant: "test".to_string(),
408                metadata: Some(create_test_metadata()),
409            }))
410        }
411
412        async fn resolve_string(
413            &self,
414            _request: Request<ResolveStringRequest>,
415        ) -> Result<Response<ResolveStringResponse>, Status> {
416            Ok(Response::new(ResolveStringResponse {
417                value: "test".to_string(),
418                reason: "test".to_string(),
419                variant: "test".to_string(),
420                metadata: Some(create_test_metadata()),
421            }))
422        }
423
424        async fn resolve_float(
425            &self,
426            _request: Request<ResolveFloatRequest>,
427        ) -> Result<Response<ResolveFloatResponse>, Status> {
428            Ok(Response::new(ResolveFloatResponse {
429                value: 1.0,
430                reason: "test".to_string(),
431                variant: "test".to_string(),
432                metadata: Some(create_test_metadata()),
433            }))
434        }
435
436        async fn resolve_int(
437            &self,
438            _request: Request<ResolveIntRequest>,
439        ) -> Result<Response<ResolveIntResponse>, Status> {
440            Ok(Response::new(ResolveIntResponse {
441                value: 42,
442                reason: "test".to_string(),
443                variant: "test".to_string(),
444                metadata: Some(create_test_metadata()),
445            }))
446        }
447
448        async fn resolve_object(
449            &self,
450            _request: Request<ResolveObjectRequest>,
451        ) -> Result<Response<ResolveObjectResponse>, Status> {
452            let mut fields = BTreeMap::new();
453            fields.insert(
454                "key".to_string(),
455                prost_types::Value {
456                    kind: Some(prost_types::value::Kind::StringValue("value".to_string())),
457                },
458            );
459
460            Ok(Response::new(ResolveObjectResponse {
461                value: Some(prost_types::Struct { fields }),
462                reason: "test".to_string(),
463                variant: "test".to_string(),
464                metadata: Some(create_test_metadata()),
465            }))
466        }
467
468        async fn resolve_all(
469            &self,
470            _request: Request<ResolveAllRequest>,
471        ) -> Result<Response<ResolveAllResponse>, Status> {
472            Ok(Response::new(ResolveAllResponse {
473                flags: Default::default(),
474                metadata: Some(create_test_metadata()),
475            }))
476        }
477
478        type EventStreamStream =
479            Pin<Box<dyn Stream<Item = Result<EventStreamResponse, Status>> + Send + 'static>>;
480
481        async fn event_stream(
482            &self,
483            _request: Request<EventStreamRequest>,
484        ) -> Result<Response<Self::EventStreamStream>, Status> {
485            let output = tokio_stream::empty();
486            Ok(Response::new(Box::pin(output)))
487        }
488    }
489
490    fn create_test_metadata() -> prost_types::Struct {
491        let mut fields = BTreeMap::new();
492        fields.insert(
493            "bool_key".to_string(),
494            prost_types::Value {
495                kind: Some(prost_types::value::Kind::BoolValue(true)),
496            },
497        );
498        fields.insert(
499            "number_key".to_string(),
500            prost_types::Value {
501                kind: Some(prost_types::value::Kind::NumberValue(42.0)),
502            },
503        );
504        fields.insert(
505            "string_key".to_string(),
506            prost_types::Value {
507                kind: Some(prost_types::value::Kind::StringValue("test".to_string())),
508            },
509        );
510        prost_types::Struct { fields }
511    }
512
513    struct TestServer {
514        target: String,
515        _shutdown: oneshot::Sender<()>,
516    }
517
518    impl TestServer {
519        async fn new() -> Self {
520            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
521            let addr = listener.local_addr().unwrap();
522            let (tx, rx) = oneshot::channel();
523
524            let server = tonic::transport::Server::builder()
525                .add_service(ServiceServer::new(MockFlagService))
526                .serve(addr);
527
528            tokio::spawn(async move {
529                tokio::select! {
530                    _ = server => {},
531                    _ = rx => {},
532                }
533            });
534
535            Self {
536                target: format!("{}:{}", addr.ip(), addr.port()),
537                _shutdown: tx,
538            }
539        }
540    }
541
542    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
543    async fn test_dns_resolution() {
544        let server = TestServer::new().await;
545        // Add delay to ensure server is ready
546        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
547        let options = FlagdOptions {
548            host: server.target.clone(),
549            port: 8013,
550            target_uri: None,
551            deadline_ms: 500,
552            ..Default::default()
553        };
554        let resolver = RpcResolver::new(&options).await.unwrap();
555        let context = EvaluationContext::default().with_targeting_key("test-user");
556
557        let result = resolver
558            .resolve_bool_value("test-flag", &context)
559            .await
560            .unwrap();
561        assert_eq!(result.value, true);
562    }
563
564    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
565    async fn test_envoy_resolution() {
566        let server = TestServer::new().await;
567        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
568
569        let options = FlagdOptions {
570            host: server.target.clone(),
571            port: 8013,
572            target_uri: Some(format!("envoy://{}/flagd-service", server.target)),
573            deadline_ms: 500,
574            ..Default::default()
575        };
576
577        let resolver = RpcResolver::new(&options).await.unwrap();
578        let context = EvaluationContext::default().with_targeting_key("test-user");
579
580        let result = resolver
581            .resolve_bool_value("test-flag", &context)
582            .await
583            .unwrap();
584        assert_eq!(result.value, true);
585    }
586
587    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
588    async fn test_value_resolution() {
589        let server = TestServer::new().await;
590        let options = FlagdOptions {
591            host: server.target.clone(),
592            port: 8013,
593            target_uri: None,
594            deadline_ms: 500,
595            ..Default::default()
596        };
597        let resolver = RpcResolver::new(&options).await.unwrap();
598        let context = EvaluationContext::default().with_targeting_key("test-user");
599
600        // Test all value types
601        assert_eq!(
602            resolver
603                .resolve_bool_value("test-flag", &context)
604                .await
605                .unwrap()
606                .value,
607            true
608        );
609        assert_eq!(
610            resolver
611                .resolve_string_value("test-flag", &context)
612                .await
613                .unwrap()
614                .value,
615            "test"
616        );
617        assert_eq!(
618            resolver
619                .resolve_float_value("test-flag", &context)
620                .await
621                .unwrap()
622                .value,
623            1.0
624        );
625        assert_eq!(
626            resolver
627                .resolve_int_value("test-flag", &context)
628                .await
629                .unwrap()
630                .value,
631            42
632        );
633
634        let struct_result = resolver
635            .resolve_struct_value("test-flag", &context)
636            .await
637            .unwrap();
638        assert!(!struct_result.value.fields.is_empty());
639    }
640
641    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
642    async fn test_metadata() {
643        let metadata = create_test_metadata();
644        let flag_metadata = convert_proto_metadata(metadata);
645
646        assert!(matches!(
647            flag_metadata.values.get("bool_key"),
648            Some(FlagMetadataValue::Bool(true))
649        ));
650        assert!(matches!(
651            flag_metadata.values.get("number_key"),
652            Some(FlagMetadataValue::Float(42.0))
653        ));
654        assert!(matches!(
655            flag_metadata.values.get("string_key"),
656            Some(FlagMetadataValue::String(s)) if s == "test"
657        ));
658    }
659
660    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
661    async fn test_standard_connection() {
662        let server = TestServer::new().await;
663        let parts: Vec<&str> = server.target.split(':').collect();
664        let options = FlagdOptions {
665            host: parts[0].to_string(),
666            port: parts[1].parse().unwrap(),
667            target_uri: None,
668            deadline_ms: 500,
669            ..Default::default()
670        };
671
672        let resolver = RpcResolver::new(&options).await.unwrap();
673        let context = EvaluationContext::default().with_targeting_key("test-user");
674
675        let result = resolver
676            .resolve_bool_value("test-flag", &context)
677            .await
678            .unwrap();
679        assert_eq!(result.value, true);
680    }
681
682    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
683    async fn test_envoy_connection() {
684        let server = TestServer::new().await;
685        let parts: Vec<&str> = server.target.split(':').collect();
686        let options = FlagdOptions {
687            host: parts[0].to_string(),
688            port: parts[1].parse().unwrap(),
689            target_uri: Some(format!("envoy://{}/flagd-service", server.target)),
690            deadline_ms: 500,
691            ..Default::default()
692        };
693
694        let resolver = RpcResolver::new(&options).await.unwrap();
695        let context = EvaluationContext::default().with_targeting_key("test-user");
696
697        let result = resolver
698            .resolve_bool_value("test-flag", &context)
699            .await
700            .unwrap();
701        assert_eq!(result.value, true);
702    }
703
704    #[test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
705    #[serial]
706    async fn test_retry_mechanism() {
707        // Bind to a port but don't accept connections - this causes immediate connection failures
708        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
709        let addr = listener.local_addr().unwrap();
710        // Drop the listener immediately to ensure the port rejects connections
711        drop(listener);
712
713        let options = FlagdOptions {
714            host: addr.ip().to_string(),
715            port: addr.port(),
716            retry_backoff_ms: 100,
717            retry_backoff_max_ms: 400,
718            retry_grace_period: 3,
719            deadline_ms: 100, // Short timeout for fast failures
720            ..Default::default()
721        };
722
723        let start = Instant::now();
724        let result = RpcResolver::new(&options).await;
725        let duration = start.elapsed();
726
727        assert!(result.is_err());
728        // Should take at least 300ms (100ms + 200ms delays)
729        assert!(duration.as_millis() >= 300);
730        // Allow some buffer for system overhead and processing time
731        assert!(duration.as_millis() < 600);
732    }
733
734    #[test(tokio::test)]
735    async fn test_successful_retry() {
736        let server = TestServer::new().await;
737        let options = FlagdOptions {
738            host: server.target.clone(),
739            port: 8013,
740            retry_backoff_ms: 100,
741            retry_backoff_max_ms: 400,
742            retry_grace_period: 3,
743            ..Default::default()
744        };
745
746        let resolver = RpcResolver::new(&options).await.unwrap();
747        let context = EvaluationContext::default();
748
749        let result = resolver
750            .resolve_bool_value("test-flag", &context)
751            .await
752            .unwrap();
753        assert_eq!(result.value, true);
754    }
755
756    #[test(tokio::test)]
757    async fn test_rpc_unix_socket_connection() {
758        let tmp_dir = TempDir::new().unwrap();
759        let socket_path = tmp_dir.path().join("test.sock");
760        let socket_path_str = socket_path.to_str().unwrap().to_string();
761
762        // Start mock gRPC server with proper shutdown handling
763        let server_handle = tokio::spawn(async move {
764            let uds = UnixListener::bind(&socket_path).unwrap();
765            Server::builder()
766                .add_service(ServiceServer::new(MockFlagService))
767                .serve_with_incoming(UnixListenerStream::new(uds))
768                .await
769                .unwrap();
770        });
771
772        // Give server time to start
773        tokio::time::sleep(Duration::from_millis(100)).await;
774
775        let options = FlagdOptions {
776            socket_path: Some(socket_path_str),
777            retry_backoff_ms: 100,
778            retry_backoff_max_ms: 400,
779            retry_grace_period: 3,
780            ..Default::default()
781        };
782
783        let resolver = RpcResolver::new(&options).await;
784        assert!(resolver.is_ok());
785
786        // Clean shutdown
787        server_handle.abort();
788    }
789
790    #[test]
791    fn test_grpc_error_code_mapping() {
792        use tonic::Code;
793
794        // Test NOT_FOUND -> FlagNotFound
795        let status = tonic::Status::new(Code::NotFound, "Flag not found");
796        let error_code = map_grpc_status_to_error_code(&status);
797        assert!(matches!(error_code, EvaluationErrorCode::FlagNotFound));
798
799        // Test INVALID_ARGUMENT -> InvalidContext
800        let status = tonic::Status::new(Code::InvalidArgument, "Invalid context");
801        let error_code = map_grpc_status_to_error_code(&status);
802        assert!(matches!(error_code, EvaluationErrorCode::InvalidContext));
803
804        // Test UNAUTHENTICATED -> General
805        let status = tonic::Status::new(Code::Unauthenticated, "Not authenticated");
806        let error_code = map_grpc_status_to_error_code(&status);
807        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
808
809        // Test PERMISSION_DENIED -> General
810        let status = tonic::Status::new(Code::PermissionDenied, "Access denied");
811        let error_code = map_grpc_status_to_error_code(&status);
812        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
813
814        // Test FAILED_PRECONDITION -> TypeMismatch
815        let status = tonic::Status::new(Code::FailedPrecondition, "Type mismatch");
816        let error_code = map_grpc_status_to_error_code(&status);
817        assert!(matches!(error_code, EvaluationErrorCode::TypeMismatch));
818
819        // Test DEADLINE_EXCEEDED -> General
820        let status = tonic::Status::new(Code::DeadlineExceeded, "Timeout");
821        let error_code = map_grpc_status_to_error_code(&status);
822        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
823
824        // Test UNAVAILABLE -> General
825        let status = tonic::Status::new(Code::Unavailable, "Service unavailable");
826        let error_code = map_grpc_status_to_error_code(&status);
827        assert!(matches!(error_code, EvaluationErrorCode::General(_)));
828    }
829}