open_feature_flagd/lib.rs
1//! [Generated by cargo-readme: `cargo readme --no-title --no-license > README.md`]::
2//! # flagd Provider for OpenFeature
3//!
4//! A Rust implementation of the OpenFeature provider for flagd, enabling dynamic
5//! feature flag evaluation in your applications.
6//!
7//! This provider supports multiple evaluation modes, advanced targeting rules, caching strategies,
8//! and connection management. It is designed to work seamlessly with the OpenFeature SDK and the flagd service.
9//!
10//! ## Core Features
11//!
12//! - **Multiple Evaluation Modes**
13//! - **RPC Resolver (Remote Evaluation):** Uses gRPC to perform flag evaluations remotely at a flagd instance. Supports bi-directional streaming, retry backoff, and custom name resolution (including Envoy support).
14//! - **REST Resolver:** Uses the OpenFeature Remote Evaluation Protocol (OFREP) over HTTP to evaluate flags.
15//! - **In-Process Resolver:** Performs evaluations locally using an embedded evaluation engine. Flag configurations can be retrieved via gRPC (sync mode).
16//! - **File Resolver:** Operates entirely from a flag definition file, updating on file changes in a best-effort manner.
17//!
18//! - **Advanced Targeting**
19//! - **Fractional Rollouts:** Uses consistent hashing (implemented via murmurhash3) to split traffic between flag variants in configurable proportions.
20//! - **Semantic Versioning:** Compare values using common operators such as '=', '!=', '<', '<=', '>', '>=', '^', and '~'.
21//! - **String Operations:** Custom operators for performing “starts_with” and “ends_with” comparisons.
22//! - **Complex Targeting Rules:** Leverages JSONLogic and custom operators to support nested conditions and dynamic evaluation.
23//!
24//! - **Caching Strategies**
25//! - Built-in support for LRU caching as well as an in-memory alternative. Flag evaluation results can be cached and later returned with a “CACHED” reason until the configuration updates.
26//!
27//! - **Connection Management**
28//! - Automatic connection establishment with configurable retries, timeout settings, and custom TLS or Unix-socket options.
29//! - Support for upstream name resolution including a custom resolver for Envoy proxy integration.
30//!
31//! ## Installation
32//! Add the dependency in your `Cargo.toml`:
33//! ```bash
34//! cargo add open-feature-flagd
35//! cargo add open-feature
36//! ```
37//! Then integrate it into your application:
38//!
39//! ```rust,no_run
40//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
41//! use open_feature::provider::FeatureProvider;
42//! use open_feature::EvaluationContext;
43//!
44//! #[tokio::main]
45//! async fn main() {
46//! // Example using the REST resolver mode.
47//! let provider = FlagdProvider::new(FlagdOptions {
48//! host: "localhost".to_string(),
49//! port: 8016,
50//! resolver_type: ResolverType::Rest,
51//! ..Default::default()
52//! }).await.unwrap();
53//!
54//! let context = EvaluationContext::default().with_targeting_key("user-123");
55//! let result = provider.resolve_bool_value("bool-flag", &context).await.unwrap();
56//! println!("Flag value: {}", result.value);
57//! }
58//! ```
59//!
60//! ## Evaluation Modes
61//! ### Remote Resolver (RPC)
62//! In RPC mode, the provider communicates with flagd via gRPC. It supports features like streaming updates, retry mechanisms, and name resolution (including Envoy).
63//!
64//! ```rust,no_run
65//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
66//! use open_feature::provider::FeatureProvider;
67//! use open_feature::EvaluationContext;
68//!
69//! #[tokio::main]
70//! async fn main() {
71//! let provider = FlagdProvider::new(FlagdOptions {
72//! host: "localhost".to_string(),
73//! port: 8013,
74//! resolver_type: ResolverType::Rpc,
75//! ..Default::default()
76//! }).await.unwrap();
77//!
78//! let context = EvaluationContext::default().with_targeting_key("user-123");
79//! let bool_result = provider.resolve_bool_value("feature-enabled", &context).await.unwrap();
80//! println!("Feature enabled: {}", bool_result.value);
81//! }
82//! ```
83//!
84//! ### REST Resolver
85//! In REST mode the provider uses the OpenFeature Remote Evaluation Protocol (OFREP) over HTTP.
86//! It is useful when gRPC is not an option.
87//! ```rust,no_run
88//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
89//! use open_feature::provider::FeatureProvider;
90//! use open_feature::EvaluationContext;
91//!
92//! #[tokio::main]
93//! async fn main() {
94//! let provider = FlagdProvider::new(FlagdOptions {
95//! host: "localhost".to_string(),
96//! port: 8016,
97//! resolver_type: ResolverType::Rest,
98//! ..Default::default()
99//! }).await.unwrap();
100//!
101//! let context = EvaluationContext::default().with_targeting_key("user-456");
102//! let result = provider.resolve_string_value("feature-variant", &context).await.unwrap();
103//! println!("Variant: {}", result.value);
104//! }
105//! ```
106//!
107//! ### In-Process Resolver
108//! In-process evaluation is performed locally. Flag configurations are sourced via gRPC sync stream.
109//! This mode supports advanced targeting operators (fractional, semver, string comparisons)
110//! using the built-in evaluation engine.
111//! ```rust,no_run
112//! use open_feature_flagd::{CacheSettings, FlagdOptions, FlagdProvider, ResolverType};
113//! use open_feature::provider::FeatureProvider;
114//! use open_feature::EvaluationContext;
115//!
116//! #[tokio::main]
117//! async fn main() {
118//! let provider = FlagdProvider::new(FlagdOptions {
119//! host: "localhost".to_string(),
120//! port: 8015,
121//! resolver_type: ResolverType::InProcess,
122//! selector: Some("my-service".to_string()),
123//! cache_settings: Some(CacheSettings::default()),
124//! ..Default::default()
125//! }).await.unwrap();
126//!
127//! let context = EvaluationContext::default()
128//! .with_targeting_key("user-abc")
129//! .with_custom_field("environment", "production")
130//! .with_custom_field("semver", "2.1.0");
131//!
132//! let dark_mode = provider.resolve_bool_value("dark-mode", &context).await.unwrap();
133//! println!("Dark mode enabled: {}", dark_mode.value);
134//! }
135//! ```
136//!
137//! ### File Mode
138//! File mode is an in-process variant where flag configurations are read from a file.
139//! This is useful for development or environments without network access.
140//! ```rust,no_run
141//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
142//! use open_feature::provider::FeatureProvider;
143//! use open_feature::EvaluationContext;
144//!
145//! #[tokio::main]
146//! async fn main() {
147//! let file_path = "./path/to/flagd-config.json".to_string();
148//! let provider = FlagdProvider::new(FlagdOptions {
149//! host: "localhost".to_string(),
150//! resolver_type: ResolverType::File,
151//! source_configuration: Some(file_path),
152//! ..Default::default()
153//! }).await.unwrap();
154//!
155//! let context = EvaluationContext::default();
156//! let result = provider.resolve_int_value("rollout-percentage", &context).await.unwrap();
157//! println!("Rollout percentage: {}", result.value);
158//! }
159//! ```
160//!
161//! ## Configuration Options
162//! Configurations can be provided as constructor options or via environment variables (with constructor options taking priority). The following options are supported:
163//!
164//! | Option | Env Variable | Type / Supported Value | Default | Compatible Resolver |
165//! |-----------------------------------------|-----------------------------------------|-----------------------------------|-------------------------------------|--------------------------------|
166//! | Host | FLAGD_HOST | string | "localhost" | RPC, REST, In-Process, File |
167//! | Port | FLAGD_PORT | number | 8013 (RPC), 8016 (REST) | RPC, REST, In-Process, File |
168//! | Target URI | FLAGD_TARGET_URI | string | "" | RPC, In-Process |
169//! | TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
170//! | Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
171//! | Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
172//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
173//! | Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
174//! | Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
175//! | Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
176//! | Retry Backoff (ms) | FLAGD_RETRY_BACKOFF_MS | number | 1000 | RPC, In-Process |
177//! | Retry Backoff Maximum (ms) | FLAGD_RETRY_BACKOFF_MAX_MS | number | 120000 | RPC, In-Process |
178//! | Retry Grace Period | FLAGD_RETRY_GRACE_PERIOD | number | 5 | RPC, In-Process |
179//! | Event Stream Deadline (ms) | FLAGD_STREAM_DEADLINE_MS | number | 600000 | RPC |
180//! | Offline Poll Interval (ms) | FLAGD_OFFLINE_POLL_MS | number | 5000 | File |
181//! | Source Selector | FLAGD_SOURCE_SELECTOR | string | "" | In-Process |
182//!
183//! ## License
184//! Apache 2.0 - See [LICENSE](./../../LICENSE) for more information.
185//!
186
187pub mod cache;
188pub mod error;
189pub mod resolver;
190use crate::error::FlagdError;
191use crate::resolver::in_process::resolver::{FileResolver, InProcessResolver};
192use async_trait::async_trait;
193use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetails};
194use open_feature::{
195 EvaluationContext, EvaluationContextFieldValue, EvaluationError, StructValue, Value,
196};
197use resolver::rest::RestResolver;
198use tracing::debug;
199use tracing::instrument;
200
201use std::collections::BTreeMap;
202use std::sync::Arc;
203
204pub use cache::{CacheService, CacheSettings, CacheType};
205pub use resolver::rpc::RpcResolver;
206
207// Include the generated protobuf code
208pub mod flagd {
209 pub mod evaluation {
210 pub mod v1 {
211 include!(concat!(env!("OUT_DIR"), "/flagd.evaluation.v1.rs"));
212 }
213 }
214 pub mod sync {
215 pub mod v1 {
216 include!(concat!(env!("OUT_DIR"), "/flagd.sync.v1.rs"));
217 }
218 }
219}
220
221/// Configuration options for the flagd provider
222#[derive(Debug, Clone)]
223pub struct FlagdOptions {
224 /// Host address for the service
225 pub host: String,
226 /// Port number for the service
227 pub port: u16,
228 /// Target URI for custom name resolution (e.g. "envoy://service/flagd")
229 pub target_uri: Option<String>,
230 /// Type of resolver to use
231 pub resolver_type: ResolverType,
232 /// Whether to use TLS
233 pub tls: bool,
234 /// Path to TLS certificate
235 pub cert_path: Option<String>,
236 /// Request timeout in milliseconds
237 pub deadline_ms: u32,
238 /// Cache configuration settings
239 pub cache_settings: Option<CacheSettings>,
240 /// Initial backoff duration in milliseconds for retry attempts (default: 1000ms)
241 /// Not supported in OFREP (REST) evaluation
242 pub retry_backoff_ms: u32,
243 /// Maximum backoff duration in milliseconds for retry attempts, prevents exponential backoff from growing indefinitely (default: 120000ms)
244 /// Not supported in OFREP (REST) evaluation
245 pub retry_backoff_max_ms: u32,
246 /// Maximum number of retry attempts before giving up (default: 5)
247 /// Not supported in OFREP (REST) evaluation
248 pub retry_grace_period: u32,
249 /// Source selector for filtering flag configurations
250 /// Used to scope flag sync requests in in-process evaluation
251 pub selector: Option<String>,
252 /// Unix domain socket path for connecting to flagd
253 /// When provided, this takes precedence over host:port configuration
254 /// Example: "/var/run/flagd.sock"
255 /// Only works with GRPC resolver
256 pub socket_path: Option<String>,
257 /// Source configuration for file-based resolver
258 pub source_configuration: Option<String>,
259 /// The deadline in milliseconds for event streaming operations. Set to 0 to disable.
260 /// Recommended to prevent infrastructure from killing idle connections.
261 pub stream_deadline_ms: u32,
262 /// Offline polling interval in milliseconds
263 pub offline_poll_interval_ms: Option<u32>,
264}
265/// Type of resolver to use for flag evaluation
266#[derive(Debug, Clone, PartialEq)]
267pub enum ResolverType {
268 /// Remote evaluation using gRPC connection to flagd service
269 Rpc,
270 /// Remote evaluation using REST connection to flagd service
271 Rest,
272 /// Local evaluation with embedded flag engine using gRPC connection
273 InProcess,
274 /// Local evaluation with no external dependencies
275 File,
276}
277impl Default for FlagdOptions {
278 fn default() -> Self {
279 let resolver_type = if let Ok(r) = std::env::var("FLAGD_RESOLVER") {
280 match r.to_uppercase().as_str() {
281 "RPC" => ResolverType::Rpc,
282 "REST" => ResolverType::Rest,
283 "IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
284 "FILE" | "OFFLINE" => ResolverType::File,
285 _ => ResolverType::Rpc,
286 }
287 } else {
288 ResolverType::Rpc
289 };
290
291 let port = match resolver_type {
292 ResolverType::Rpc => 8013,
293 ResolverType::InProcess => 8015,
294 _ => 8013,
295 };
296
297 let mut options = Self {
298 host: std::env::var("FLAGD_HOST").unwrap_or_else(|_| "localhost".to_string()),
299 port: std::env::var("FLAGD_PORT")
300 .ok()
301 .and_then(|p| p.parse().ok())
302 .unwrap_or(port),
303 target_uri: std::env::var("FLAGD_TARGET_URI").ok(),
304 resolver_type,
305 tls: std::env::var("FLAGD_TLS")
306 .map(|v| v.to_lowercase() == "true")
307 .unwrap_or(false),
308 cert_path: std::env::var("FLAGD_SERVER_CERT_PATH").ok(),
309 deadline_ms: std::env::var("FLAGD_DEADLINE_MS")
310 .ok()
311 .and_then(|v| v.parse().ok())
312 .unwrap_or(500),
313 retry_backoff_ms: std::env::var("FLAGD_RETRY_BACKOFF_MS")
314 .ok()
315 .and_then(|v| v.parse().ok())
316 .unwrap_or(1000),
317 retry_backoff_max_ms: std::env::var("FLAGD_RETRY_BACKOFF_MAX_MS")
318 .ok()
319 .and_then(|v| v.parse().ok())
320 .unwrap_or(120000),
321 retry_grace_period: std::env::var("FLAGD_RETRY_GRACE_PERIOD")
322 .ok()
323 .and_then(|v| v.parse().ok())
324 .unwrap_or(5),
325 stream_deadline_ms: std::env::var("FLAGD_STREAM_DEADLINE_MS")
326 .ok()
327 .and_then(|v| v.parse().ok())
328 .unwrap_or(600000),
329 socket_path: std::env::var("FLAGD_SOCKET_PATH").ok(),
330 selector: std::env::var("FLAGD_SOURCE_SELECTOR").ok(),
331 cache_settings: Some(CacheSettings::default()),
332 source_configuration: std::env::var("FLAGD_OFFLINE_FLAG_SOURCE_PATH").ok(),
333 offline_poll_interval_ms: Some(
334 std::env::var("FLAGD_OFFLINE_POLL_MS")
335 .ok()
336 .and_then(|s| s.parse().ok())
337 .unwrap_or(5000),
338 ),
339 };
340
341 if options.source_configuration.is_some() && options.resolver_type != ResolverType::Rpc {
342 options.resolver_type = ResolverType::File;
343 }
344
345 options
346 }
347}
348
349/// Main provider implementation for flagd
350#[derive(Clone)]
351pub struct FlagdProvider {
352 /// The underlying feature flag resolver
353 provider: Arc<dyn FeatureProvider + Send + Sync>,
354 /// Optional caching layer
355 cache: Option<Arc<CacheService<Value>>>,
356}
357
358impl FlagdProvider {
359 #[instrument(skip(options))]
360 pub async fn new(options: FlagdOptions) -> Result<Self, FlagdError> {
361 debug!("Initializing FlagdProvider with options: {:?}", options);
362
363 let provider: Arc<dyn FeatureProvider + Send + Sync> = match options.resolver_type {
364 ResolverType::Rpc => {
365 debug!("Using RPC resolver");
366 Arc::new(RpcResolver::new(&options).await?)
367 }
368 ResolverType::Rest => {
369 debug!("Using REST resolver");
370 Arc::new(RestResolver::new(&options))
371 }
372 ResolverType::InProcess => {
373 debug!("Using in-process resolver");
374 Arc::new(InProcessResolver::new(&options).await?)
375 }
376 ResolverType::File => {
377 debug!("Using file resolver");
378 Arc::new(
379 FileResolver::new(
380 options.source_configuration.unwrap(),
381 options.cache_settings.clone(),
382 )
383 .await?,
384 )
385 }
386 };
387
388 Ok(Self {
389 provider,
390 cache: options
391 .cache_settings
392 .map(|settings| Arc::new(CacheService::new(settings))),
393 })
394 }
395}
396
397impl std::fmt::Debug for FlagdProvider {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 f.debug_struct("FlagdProvider")
400 .field("cache", &self.cache)
401 .finish()
402 }
403}
404
405fn convert_context(context: &EvaluationContext) -> Option<prost_types::Struct> {
406 let mut fields = BTreeMap::new();
407
408 if let Some(targeting_key) = &context.targeting_key {
409 fields.insert(
410 "targetingKey".to_string(),
411 prost_types::Value {
412 kind: Some(prost_types::value::Kind::StringValue(targeting_key.clone())),
413 },
414 );
415 }
416
417 for (key, value) in &context.custom_fields {
418 let prost_value = match value {
419 EvaluationContextFieldValue::String(s) => prost_types::Value {
420 kind: Some(prost_types::value::Kind::StringValue(s.clone())),
421 },
422 EvaluationContextFieldValue::Bool(b) => prost_types::Value {
423 kind: Some(prost_types::value::Kind::BoolValue(*b)),
424 },
425 EvaluationContextFieldValue::Int(i) => prost_types::Value {
426 kind: Some(prost_types::value::Kind::NumberValue(*i as f64)),
427 },
428 EvaluationContextFieldValue::Float(f) => prost_types::Value {
429 kind: Some(prost_types::value::Kind::NumberValue(*f)),
430 },
431 EvaluationContextFieldValue::DateTime(dt) => prost_types::Value {
432 kind: Some(prost_types::value::Kind::StringValue(dt.to_string())),
433 },
434 EvaluationContextFieldValue::Struct(s) => prost_types::Value {
435 kind: Some(prost_types::value::Kind::StringValue(format!("{:?}", s))),
436 },
437 };
438 fields.insert(key.clone(), prost_value);
439 }
440
441 Some(prost_types::Struct { fields })
442}
443
444fn convert_proto_struct_to_struct_value(proto_struct: prost_types::Struct) -> StructValue {
445 let fields = proto_struct
446 .fields
447 .into_iter()
448 .map(|(key, value)| {
449 (
450 key,
451 match value.kind.unwrap() {
452 prost_types::value::Kind::NullValue(_) => Value::String(String::new()),
453 prost_types::value::Kind::NumberValue(n) => Value::Float(n),
454 prost_types::value::Kind::StringValue(s) => Value::String(s),
455 prost_types::value::Kind::BoolValue(b) => Value::Bool(b),
456 prost_types::value::Kind::StructValue(s) => Value::String(format!("{:?}", s)),
457 prost_types::value::Kind::ListValue(l) => Value::String(format!("{:?}", l)),
458 },
459 )
460 })
461 .collect();
462
463 StructValue { fields }
464}
465
466impl FlagdProvider {
467 async fn get_cached_value<T>(
468 &self,
469 flag_key: &str,
470 context: &EvaluationContext,
471 value_converter: impl Fn(Value) -> Option<T>,
472 ) -> Option<T> {
473 if let Some(cache) = &self.cache {
474 if let Some(cached_value) = cache.get(flag_key, context).await {
475 return value_converter(cached_value);
476 }
477 }
478 None
479 }
480}
481
482#[async_trait]
483impl FeatureProvider for FlagdProvider {
484 fn metadata(&self) -> &ProviderMetadata {
485 self.provider.metadata()
486 }
487
488 async fn resolve_bool_value(
489 &self,
490 flag_key: &str,
491 context: &EvaluationContext,
492 ) -> Result<ResolutionDetails<bool>, EvaluationError> {
493 if let Some(value) = self
494 .get_cached_value(flag_key, context, |v| match v {
495 Value::Bool(b) => Some(b),
496 _ => None,
497 })
498 .await
499 {
500 return Ok(ResolutionDetails::new(value));
501 }
502
503 let result = self.provider.resolve_bool_value(flag_key, context).await?;
504
505 if let Some(cache) = &self.cache {
506 cache
507 .add(flag_key, context, Value::Bool(result.value))
508 .await;
509 }
510
511 Ok(result)
512 }
513
514 async fn resolve_int_value(
515 &self,
516 flag_key: &str,
517 context: &EvaluationContext,
518 ) -> Result<ResolutionDetails<i64>, EvaluationError> {
519 if let Some(value) = self
520 .get_cached_value(flag_key, context, |v| match v {
521 Value::Int(i) => Some(i),
522 _ => None,
523 })
524 .await
525 {
526 return Ok(ResolutionDetails::new(value));
527 }
528
529 let result = self.provider.resolve_int_value(flag_key, context).await?;
530
531 if let Some(cache) = &self.cache {
532 cache.add(flag_key, context, Value::Int(result.value)).await;
533 }
534
535 Ok(result)
536 }
537
538 async fn resolve_float_value(
539 &self,
540 flag_key: &str,
541 context: &EvaluationContext,
542 ) -> Result<ResolutionDetails<f64>, EvaluationError> {
543 if let Some(value) = self
544 .get_cached_value(flag_key, context, |v| match v {
545 Value::Float(f) => Some(f),
546 _ => None,
547 })
548 .await
549 {
550 return Ok(ResolutionDetails::new(value));
551 }
552
553 let result = self.provider.resolve_float_value(flag_key, context).await?;
554
555 if let Some(cache) = &self.cache {
556 cache
557 .add(flag_key, context, Value::Float(result.value))
558 .await;
559 }
560
561 Ok(result)
562 }
563
564 async fn resolve_string_value(
565 &self,
566 flag_key: &str,
567 context: &EvaluationContext,
568 ) -> Result<ResolutionDetails<String>, EvaluationError> {
569 if let Some(value) = self
570 .get_cached_value(flag_key, context, |v| match v {
571 Value::String(s) => Some(s),
572 _ => None,
573 })
574 .await
575 {
576 return Ok(ResolutionDetails::new(value));
577 }
578
579 let result = self
580 .provider
581 .resolve_string_value(flag_key, context)
582 .await?;
583
584 if let Some(cache) = &self.cache {
585 cache
586 .add(flag_key, context, Value::String(result.value.clone()))
587 .await;
588 }
589
590 Ok(result)
591 }
592
593 async fn resolve_struct_value(
594 &self,
595 flag_key: &str,
596 context: &EvaluationContext,
597 ) -> Result<ResolutionDetails<StructValue>, EvaluationError> {
598 if let Some(value) = self
599 .get_cached_value(flag_key, context, |v| match v {
600 Value::Struct(s) => Some(s),
601 _ => None,
602 })
603 .await
604 {
605 return Ok(ResolutionDetails::new(value));
606 }
607
608 let result = self
609 .provider
610 .resolve_struct_value(flag_key, context)
611 .await?;
612
613 if let Some(cache) = &self.cache {
614 cache
615 .add(flag_key, context, Value::Struct(result.value.clone()))
616 .await;
617 }
618
619 Ok(result)
620 }
621}