open_feature_flagd/
lib.rs

1//! [Generated by cargo-readme: `cargo readme --no-title --no-license > README.md`]::
2//!  # flagd Provider for OpenFeature
3//!
4//! A Rust implementation of the OpenFeature provider for flagd, enabling dynamic
5//! feature flag evaluation in your applications.
6//!
7//! This provider supports multiple evaluation modes, advanced targeting rules, caching strategies,
8//! and connection management. It is designed to work seamlessly with the OpenFeature SDK and the flagd service.
9//!
10//! ## Core Features
11//!
12//! - **Multiple Evaluation Modes**
13//!     - **RPC Resolver (Remote Evaluation):** Uses gRPC to perform flag evaluations remotely at a flagd instance. Supports bi-directional streaming, retry backoff, and custom name resolution (including Envoy support).
14//!     - **REST Resolver:** Uses the OpenFeature Remote Evaluation Protocol (OFREP) over HTTP to evaluate flags.
15//!     - **In-Process Resolver:** Performs evaluations locally using an embedded evaluation engine. Flag configurations can be retrieved via gRPC (sync mode).
16//!     - **File Resolver:** Operates entirely from a flag definition file, updating on file changes in a best-effort manner.
17//!
18//! - **Advanced Targeting**
19//!     - **Fractional Rollouts:** Uses consistent hashing (implemented via murmurhash3) to split traffic between flag variants in configurable proportions.
20//!     - **Semantic Versioning:** Compare values using common operators such as '=', '!=', '<', '<=', '>', '>=', '^', and '~'.
21//!     - **String Operations:** Custom operators for performing “starts_with” and “ends_with” comparisons.
22//!     - **Complex Targeting Rules:** Leverages JSONLogic and custom operators to support nested conditions and dynamic evaluation.
23//!
24//! - **Caching Strategies**
25//!     - Built-in support for LRU caching as well as an in-memory alternative. Flag evaluation results can be cached and later returned with a “CACHED” reason until the configuration updates.
26//!
27//! - **Connection Management**
28//!     - Automatic connection establishment with configurable retries, timeout settings, and custom TLS or Unix-socket options.
29//!     - Support for upstream name resolution including a custom resolver for Envoy proxy integration.
30//!
31//! ## Installation
32//! Add the dependency in your `Cargo.toml`:
33//! ```bash
34//! cargo add open-feature-flagd
35//! cargo add open-feature
36//! ```
37//! Then integrate it into your application:
38//!
39//! ```rust,no_run
40//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
41//! use open_feature::provider::FeatureProvider;
42//! use open_feature::EvaluationContext;
43//!
44//! #[tokio::main]
45//! async fn main() {
46//!     // Example using the REST resolver mode.
47//!     let provider = FlagdProvider::new(FlagdOptions {
48//!         host: "localhost".to_string(),
49//!         port: 8016,
50//!         resolver_type: ResolverType::Rest,
51//!         ..Default::default()
52//!     }).await.unwrap();
53//!
54//!     let context = EvaluationContext::default().with_targeting_key("user-123");
55//!     let result = provider.resolve_bool_value("bool-flag", &context).await.unwrap();
56//!     println!("Flag value: {}", result.value);
57//! }
58//! ```
59//!
60//! ## Evaluation Modes
61//! ### Remote Resolver (RPC)
62//! In RPC mode, the provider communicates with flagd via gRPC. It supports features like streaming updates, retry mechanisms, and name resolution (including Envoy).
63//!
64//! ```rust,no_run
65//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
66//! use open_feature::provider::FeatureProvider;
67//! use open_feature::EvaluationContext;
68//!
69//! #[tokio::main]
70//! async fn main() {
71//!     let provider = FlagdProvider::new(FlagdOptions {
72//!         host: "localhost".to_string(),
73//!         port: 8013,
74//!         resolver_type: ResolverType::Rpc,
75//!         ..Default::default()
76//!     }).await.unwrap();
77//!
78//!     let context = EvaluationContext::default().with_targeting_key("user-123");
79//!     let bool_result = provider.resolve_bool_value("feature-enabled", &context).await.unwrap();
80//!     println!("Feature enabled: {}", bool_result.value);
81//! }
82//! ```
83//!
84//! ### REST Resolver
85//! In REST mode the provider uses the OpenFeature Remote Evaluation Protocol (OFREP) over HTTP.
86//! It is useful when gRPC is not an option.
87//! ```rust,no_run
88//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
89//! use open_feature::provider::FeatureProvider;
90//! use open_feature::EvaluationContext;
91//!
92//! #[tokio::main]
93//! async fn main() {
94//!     let provider = FlagdProvider::new(FlagdOptions {
95//!         host: "localhost".to_string(),
96//!         port: 8016,
97//!         resolver_type: ResolverType::Rest,
98//!         ..Default::default()
99//!     }).await.unwrap();
100//!
101//!     let context = EvaluationContext::default().with_targeting_key("user-456");
102//!     let result = provider.resolve_string_value("feature-variant", &context).await.unwrap();
103//!     println!("Variant: {}", result.value);
104//! }
105//! ```
106//!
107//! ### In-Process Resolver
108//! In-process evaluation is performed locally. Flag configurations are sourced via gRPC sync stream.
109//! This mode supports advanced targeting operators (fractional, semver, string comparisons)
110//! using the built-in evaluation engine.
111//! ```rust,no_run
112//! use open_feature_flagd::{CacheSettings, FlagdOptions, FlagdProvider, ResolverType};
113//! use open_feature::provider::FeatureProvider;
114//! use open_feature::EvaluationContext;
115//!
116//! #[tokio::main]
117//! async fn main() {
118//!     let provider = FlagdProvider::new(FlagdOptions {
119//!         host: "localhost".to_string(),
120//!         port: 8015,
121//!         resolver_type: ResolverType::InProcess,
122//!         selector: Some("my-service".to_string()),
123//!         cache_settings: Some(CacheSettings::default()),
124//!         ..Default::default()
125//!     }).await.unwrap();
126//!
127//!     let context = EvaluationContext::default()
128//!         .with_targeting_key("user-abc")
129//!         .with_custom_field("environment", "production")
130//!         .with_custom_field("semver", "2.1.0");
131//!
132//!     let dark_mode = provider.resolve_bool_value("dark-mode", &context).await.unwrap();
133//!     println!("Dark mode enabled: {}", dark_mode.value);
134//! }
135//! ```
136//!
137//! ### File Mode
138//! File mode is an in-process variant where flag configurations are read from a file.
139//! This is useful for development or environments without network access.
140//! ```rust,no_run
141//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
142//! use open_feature::provider::FeatureProvider;
143//! use open_feature::EvaluationContext;
144//!
145//! #[tokio::main]
146//! async fn main() {
147//!     let file_path = "./path/to/flagd-config.json".to_string();
148//!     let provider = FlagdProvider::new(FlagdOptions {
149//!         host: "localhost".to_string(),
150//!         resolver_type: ResolverType::File,
151//!         source_configuration: Some(file_path),
152//!         ..Default::default()
153//!     }).await.unwrap();
154//!
155//!     let context = EvaluationContext::default();
156//!     let result = provider.resolve_int_value("rollout-percentage", &context).await.unwrap();
157//!     println!("Rollout percentage: {}", result.value);
158//! }
159//! ```
160//!
161//! ## Configuration Options
162//! Configurations can be provided as constructor options or via environment variables (with constructor options taking priority). The following options are supported:
163//!
164//! | Option                                  | Env Variable                            | Type / Supported Value            | Default                             | Compatible Resolver            |
165//! |-----------------------------------------|-----------------------------------------|-----------------------------------|-------------------------------------|--------------------------------|
166//! | Host                                    | FLAGD_HOST                              | string                            | "localhost"                         | RPC, REST, In-Process, File    |
167//! | Port                                    | FLAGD_PORT                              | number                            | 8013 (RPC), 8016 (REST)             | RPC, REST, In-Process, File    |
168//! | Target URI                              | FLAGD_TARGET_URI                        | string                            | ""                                  | RPC, In-Process                |
169//! | TLS                                     | FLAGD_TLS                               | boolean                           | false                               | RPC, In-Process                |
170//! | Socket Path                             | FLAGD_SOCKET_PATH                       | string                            | ""                                  | RPC                            |
171//! | Certificate Path                        | FLAGD_SERVER_CERT_PATH                  | string                            | ""                                  | RPC, In-Process                |
172//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE                             | string ("lru", "mem", "disabled") | lru                                 | RPC, In-Process, File          |
173//! | Cache TTL (Seconds)                     | FLAGD_CACHE_TTL                         | number                            | 60                                  | RPC, In-Process, File          |
174//! | Max Cache Size                          | FLAGD_MAX_CACHE_SIZE                    | number                            | 1000                                | RPC, In-Process, File          |
175//! | Offline File Path                       | FLAGD_OFFLINE_FLAG_SOURCE_PATH          | string                            | ""                                  | File                           |
176//! | Retry Backoff (ms)                      | FLAGD_RETRY_BACKOFF_MS                  | number                            | 1000                                | RPC, In-Process                |
177//! | Retry Backoff Maximum (ms)              | FLAGD_RETRY_BACKOFF_MAX_MS              | number                            | 120000                              | RPC, In-Process                |
178//! | Retry Grace Period                      | FLAGD_RETRY_GRACE_PERIOD                | number                            | 5                                   | RPC, In-Process                |
179//! | Event Stream Deadline (ms)              | FLAGD_STREAM_DEADLINE_MS                | number                            | 600000                              | RPC                            |
180//! | Offline Poll Interval (ms)              | FLAGD_OFFLINE_POLL_MS                   | number                            | 5000                                | File                           |
181//! | Source Selector                         | FLAGD_SOURCE_SELECTOR                   | string                            | ""                                  | In-Process                     |
182//!
183//! ## License
184//! Apache 2.0 - See [LICENSE](./../../LICENSE) for more information.
185//!
186
187pub mod cache;
188pub mod error;
189pub mod resolver;
190use crate::error::FlagdError;
191use crate::resolver::in_process::resolver::{FileResolver, InProcessResolver};
192use async_trait::async_trait;
193use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetails};
194use open_feature::{
195    EvaluationContext, EvaluationContextFieldValue, EvaluationError, StructValue, Value,
196};
197use resolver::rest::RestResolver;
198use tracing::debug;
199use tracing::instrument;
200
201use std::collections::BTreeMap;
202use std::sync::Arc;
203
204pub use cache::{CacheService, CacheSettings, CacheType};
205pub use resolver::rpc::RpcResolver;
206
207// Include the generated protobuf code
208pub mod flagd {
209    pub mod evaluation {
210        pub mod v1 {
211            include!(concat!(env!("OUT_DIR"), "/flagd.evaluation.v1.rs"));
212        }
213    }
214    pub mod sync {
215        pub mod v1 {
216            include!(concat!(env!("OUT_DIR"), "/flagd.sync.v1.rs"));
217        }
218    }
219}
220
221/// Configuration options for the flagd provider
222#[derive(Debug, Clone)]
223pub struct FlagdOptions {
224    /// Host address for the service
225    pub host: String,
226    /// Port number for the service
227    pub port: u16,
228    /// Target URI for custom name resolution (e.g. "envoy://service/flagd")
229    pub target_uri: Option<String>,
230    /// Type of resolver to use
231    pub resolver_type: ResolverType,
232    /// Whether to use TLS
233    pub tls: bool,
234    /// Path to TLS certificate
235    pub cert_path: Option<String>,
236    /// Request timeout in milliseconds
237    pub deadline_ms: u32,
238    /// Cache configuration settings
239    pub cache_settings: Option<CacheSettings>,
240    /// Initial backoff duration in milliseconds for retry attempts (default: 1000ms)
241    /// Not supported in OFREP (REST) evaluation
242    pub retry_backoff_ms: u32,
243    /// Maximum backoff duration in milliseconds for retry attempts, prevents exponential backoff from growing indefinitely (default: 120000ms)
244    /// Not supported in OFREP (REST) evaluation
245    pub retry_backoff_max_ms: u32,
246    /// Maximum number of retry attempts before giving up (default: 5)
247    /// Not supported in OFREP (REST) evaluation
248    pub retry_grace_period: u32,
249    /// Source selector for filtering flag configurations
250    /// Used to scope flag sync requests in in-process evaluation
251    pub selector: Option<String>,
252    /// Unix domain socket path for connecting to flagd
253    /// When provided, this takes precedence over host:port configuration
254    /// Example: "/var/run/flagd.sock"
255    /// Only works with GRPC resolver
256    pub socket_path: Option<String>,
257    /// Source configuration for file-based resolver
258    pub source_configuration: Option<String>,
259    /// The deadline in milliseconds for event streaming operations. Set to 0 to disable.
260    /// Recommended to prevent infrastructure from killing idle connections.
261    pub stream_deadline_ms: u32,
262    /// Offline polling interval in milliseconds
263    pub offline_poll_interval_ms: Option<u32>,
264}
265/// Type of resolver to use for flag evaluation
266#[derive(Debug, Clone, PartialEq)]
267pub enum ResolverType {
268    /// Remote evaluation using gRPC connection to flagd service
269    Rpc,
270    /// Remote evaluation using REST connection to flagd service
271    Rest,
272    /// Local evaluation with embedded flag engine using gRPC connection
273    InProcess,
274    /// Local evaluation with no external dependencies
275    File,
276}
277impl Default for FlagdOptions {
278    fn default() -> Self {
279        let resolver_type = if let Ok(r) = std::env::var("FLAGD_RESOLVER") {
280            match r.to_uppercase().as_str() {
281                "RPC" => ResolverType::Rpc,
282                "REST" => ResolverType::Rest,
283                "IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
284                "FILE" | "OFFLINE" => ResolverType::File,
285                _ => ResolverType::Rpc,
286            }
287        } else {
288            ResolverType::Rpc
289        };
290
291        let port = match resolver_type {
292            ResolverType::Rpc => 8013,
293            ResolverType::InProcess => 8015,
294            _ => 8013,
295        };
296
297        let mut options = Self {
298            host: std::env::var("FLAGD_HOST").unwrap_or_else(|_| "localhost".to_string()),
299            port: std::env::var("FLAGD_PORT")
300                .ok()
301                .and_then(|p| p.parse().ok())
302                .unwrap_or(port),
303            target_uri: std::env::var("FLAGD_TARGET_URI").ok(),
304            resolver_type,
305            tls: std::env::var("FLAGD_TLS")
306                .map(|v| v.to_lowercase() == "true")
307                .unwrap_or(false),
308            cert_path: std::env::var("FLAGD_SERVER_CERT_PATH").ok(),
309            deadline_ms: std::env::var("FLAGD_DEADLINE_MS")
310                .ok()
311                .and_then(|v| v.parse().ok())
312                .unwrap_or(500),
313            retry_backoff_ms: std::env::var("FLAGD_RETRY_BACKOFF_MS")
314                .ok()
315                .and_then(|v| v.parse().ok())
316                .unwrap_or(1000),
317            retry_backoff_max_ms: std::env::var("FLAGD_RETRY_BACKOFF_MAX_MS")
318                .ok()
319                .and_then(|v| v.parse().ok())
320                .unwrap_or(120000),
321            retry_grace_period: std::env::var("FLAGD_RETRY_GRACE_PERIOD")
322                .ok()
323                .and_then(|v| v.parse().ok())
324                .unwrap_or(5),
325            stream_deadline_ms: std::env::var("FLAGD_STREAM_DEADLINE_MS")
326                .ok()
327                .and_then(|v| v.parse().ok())
328                .unwrap_or(600000),
329            socket_path: std::env::var("FLAGD_SOCKET_PATH").ok(),
330            selector: std::env::var("FLAGD_SOURCE_SELECTOR").ok(),
331            cache_settings: Some(CacheSettings::default()),
332            source_configuration: std::env::var("FLAGD_OFFLINE_FLAG_SOURCE_PATH").ok(),
333            offline_poll_interval_ms: Some(
334                std::env::var("FLAGD_OFFLINE_POLL_MS")
335                    .ok()
336                    .and_then(|s| s.parse().ok())
337                    .unwrap_or(5000),
338            ),
339        };
340
341        if options.source_configuration.is_some() && options.resolver_type != ResolverType::Rpc {
342            options.resolver_type = ResolverType::File;
343        }
344
345        options
346    }
347}
348
349/// Main provider implementation for flagd
350#[derive(Clone)]
351pub struct FlagdProvider {
352    /// The underlying feature flag resolver
353    provider: Arc<dyn FeatureProvider + Send + Sync>,
354    /// Optional caching layer
355    cache: Option<Arc<CacheService<Value>>>,
356}
357
358impl FlagdProvider {
359    #[instrument(skip(options))]
360    pub async fn new(options: FlagdOptions) -> Result<Self, FlagdError> {
361        debug!("Initializing FlagdProvider with options: {:?}", options);
362
363        let provider: Arc<dyn FeatureProvider + Send + Sync> = match options.resolver_type {
364            ResolverType::Rpc => {
365                debug!("Using RPC resolver");
366                Arc::new(RpcResolver::new(&options).await?)
367            }
368            ResolverType::Rest => {
369                debug!("Using REST resolver");
370                Arc::new(RestResolver::new(&options))
371            }
372            ResolverType::InProcess => {
373                debug!("Using in-process resolver");
374                Arc::new(InProcessResolver::new(&options).await?)
375            }
376            ResolverType::File => {
377                debug!("Using file resolver");
378                Arc::new(
379                    FileResolver::new(
380                        options.source_configuration.unwrap(),
381                        options.cache_settings.clone(),
382                    )
383                    .await?,
384                )
385            }
386        };
387
388        Ok(Self {
389            provider,
390            cache: options
391                .cache_settings
392                .map(|settings| Arc::new(CacheService::new(settings))),
393        })
394    }
395}
396
397impl std::fmt::Debug for FlagdProvider {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        f.debug_struct("FlagdProvider")
400            .field("cache", &self.cache)
401            .finish()
402    }
403}
404
405fn convert_context(context: &EvaluationContext) -> Option<prost_types::Struct> {
406    let mut fields = BTreeMap::new();
407
408    if let Some(targeting_key) = &context.targeting_key {
409        fields.insert(
410            "targetingKey".to_string(),
411            prost_types::Value {
412                kind: Some(prost_types::value::Kind::StringValue(targeting_key.clone())),
413            },
414        );
415    }
416
417    for (key, value) in &context.custom_fields {
418        let prost_value = match value {
419            EvaluationContextFieldValue::String(s) => prost_types::Value {
420                kind: Some(prost_types::value::Kind::StringValue(s.clone())),
421            },
422            EvaluationContextFieldValue::Bool(b) => prost_types::Value {
423                kind: Some(prost_types::value::Kind::BoolValue(*b)),
424            },
425            EvaluationContextFieldValue::Int(i) => prost_types::Value {
426                kind: Some(prost_types::value::Kind::NumberValue(*i as f64)),
427            },
428            EvaluationContextFieldValue::Float(f) => prost_types::Value {
429                kind: Some(prost_types::value::Kind::NumberValue(*f)),
430            },
431            EvaluationContextFieldValue::DateTime(dt) => prost_types::Value {
432                kind: Some(prost_types::value::Kind::StringValue(dt.to_string())),
433            },
434            EvaluationContextFieldValue::Struct(s) => prost_types::Value {
435                kind: Some(prost_types::value::Kind::StringValue(format!("{:?}", s))),
436            },
437        };
438        fields.insert(key.clone(), prost_value);
439    }
440
441    Some(prost_types::Struct { fields })
442}
443
444fn convert_proto_struct_to_struct_value(proto_struct: prost_types::Struct) -> StructValue {
445    let fields = proto_struct
446        .fields
447        .into_iter()
448        .map(|(key, value)| {
449            (
450                key,
451                match value.kind.unwrap() {
452                    prost_types::value::Kind::NullValue(_) => Value::String(String::new()),
453                    prost_types::value::Kind::NumberValue(n) => Value::Float(n),
454                    prost_types::value::Kind::StringValue(s) => Value::String(s),
455                    prost_types::value::Kind::BoolValue(b) => Value::Bool(b),
456                    prost_types::value::Kind::StructValue(s) => Value::String(format!("{:?}", s)),
457                    prost_types::value::Kind::ListValue(l) => Value::String(format!("{:?}", l)),
458                },
459            )
460        })
461        .collect();
462
463    StructValue { fields }
464}
465
466impl FlagdProvider {
467    async fn get_cached_value<T>(
468        &self,
469        flag_key: &str,
470        context: &EvaluationContext,
471        value_converter: impl Fn(Value) -> Option<T>,
472    ) -> Option<T> {
473        if let Some(cache) = &self.cache {
474            if let Some(cached_value) = cache.get(flag_key, context).await {
475                return value_converter(cached_value);
476            }
477        }
478        None
479    }
480}
481
482#[async_trait]
483impl FeatureProvider for FlagdProvider {
484    fn metadata(&self) -> &ProviderMetadata {
485        self.provider.metadata()
486    }
487
488    async fn resolve_bool_value(
489        &self,
490        flag_key: &str,
491        context: &EvaluationContext,
492    ) -> Result<ResolutionDetails<bool>, EvaluationError> {
493        if let Some(value) = self
494            .get_cached_value(flag_key, context, |v| match v {
495                Value::Bool(b) => Some(b),
496                _ => None,
497            })
498            .await
499        {
500            return Ok(ResolutionDetails::new(value));
501        }
502
503        let result = self.provider.resolve_bool_value(flag_key, context).await?;
504
505        if let Some(cache) = &self.cache {
506            cache
507                .add(flag_key, context, Value::Bool(result.value))
508                .await;
509        }
510
511        Ok(result)
512    }
513
514    async fn resolve_int_value(
515        &self,
516        flag_key: &str,
517        context: &EvaluationContext,
518    ) -> Result<ResolutionDetails<i64>, EvaluationError> {
519        if let Some(value) = self
520            .get_cached_value(flag_key, context, |v| match v {
521                Value::Int(i) => Some(i),
522                _ => None,
523            })
524            .await
525        {
526            return Ok(ResolutionDetails::new(value));
527        }
528
529        let result = self.provider.resolve_int_value(flag_key, context).await?;
530
531        if let Some(cache) = &self.cache {
532            cache.add(flag_key, context, Value::Int(result.value)).await;
533        }
534
535        Ok(result)
536    }
537
538    async fn resolve_float_value(
539        &self,
540        flag_key: &str,
541        context: &EvaluationContext,
542    ) -> Result<ResolutionDetails<f64>, EvaluationError> {
543        if let Some(value) = self
544            .get_cached_value(flag_key, context, |v| match v {
545                Value::Float(f) => Some(f),
546                _ => None,
547            })
548            .await
549        {
550            return Ok(ResolutionDetails::new(value));
551        }
552
553        let result = self.provider.resolve_float_value(flag_key, context).await?;
554
555        if let Some(cache) = &self.cache {
556            cache
557                .add(flag_key, context, Value::Float(result.value))
558                .await;
559        }
560
561        Ok(result)
562    }
563
564    async fn resolve_string_value(
565        &self,
566        flag_key: &str,
567        context: &EvaluationContext,
568    ) -> Result<ResolutionDetails<String>, EvaluationError> {
569        if let Some(value) = self
570            .get_cached_value(flag_key, context, |v| match v {
571                Value::String(s) => Some(s),
572                _ => None,
573            })
574            .await
575        {
576            return Ok(ResolutionDetails::new(value));
577        }
578
579        let result = self
580            .provider
581            .resolve_string_value(flag_key, context)
582            .await?;
583
584        if let Some(cache) = &self.cache {
585            cache
586                .add(flag_key, context, Value::String(result.value.clone()))
587                .await;
588        }
589
590        Ok(result)
591    }
592
593    async fn resolve_struct_value(
594        &self,
595        flag_key: &str,
596        context: &EvaluationContext,
597    ) -> Result<ResolutionDetails<StructValue>, EvaluationError> {
598        if let Some(value) = self
599            .get_cached_value(flag_key, context, |v| match v {
600                Value::Struct(s) => Some(s),
601                _ => None,
602            })
603            .await
604        {
605            return Ok(ResolutionDetails::new(value));
606        }
607
608        let result = self
609            .provider
610            .resolve_struct_value(flag_key, context)
611            .await?;
612
613        if let Some(cache) = &self.cache {
614            cache
615                .add(flag_key, context, Value::Struct(result.value.clone()))
616                .await;
617        }
618
619        Ok(result)
620    }
621}