open_feature_flagd/lib.rs
1//! [Generated by cargo-readme: `cargo readme --no-title > README.md`]::
2//! # flagd Provider for OpenFeature
3//!
4//! A Rust implementation of the OpenFeature provider for flagd, enabling dynamic
5//! feature flag evaluation in your applications.
6//!
7//! This provider supports multiple evaluation modes, advanced targeting rules, caching strategies,
8//! and connection management. It is designed to work seamlessly with the OpenFeature SDK and the flagd service.
9//!
10//! ## Core Features
11//!
12//! - **Multiple Evaluation Modes**
13//! - **RPC Resolver (Remote Evaluation):** Uses gRPC to perform flag evaluations remotely at a flagd instance. Supports bi-directional streaming, retry backoff, and custom name resolution (including Envoy support).
14//! - **REST Resolver:** Uses the OpenFeature Remote Evaluation Protocol (OFREP) over HTTP to evaluate flags.
15//! - **In-Process Resolver:** Performs evaluations locally using an embedded evaluation engine. Flag configurations can be retrieved via gRPC (sync mode).
16//! - **File Resolver:** Operates entirely from a flag definition file, updating on file changes in a best-effort manner.
17//!
18//! - **Advanced Targeting**
19//! - **Fractional Rollouts:** Uses consistent hashing (implemented via murmurhash3) to split traffic between flag variants in configurable proportions.
20//! - **Semantic Versioning:** Compare values using common operators such as '=', '!=', '<', '<=', '>', '>=', '^', and '~'.
21//! - **String Operations:** Custom operators for performing “starts_with” and “ends_with” comparisons.
22//! - **Complex Targeting Rules:** Leverages JSONLogic and custom operators to support nested conditions and dynamic evaluation.
23//!
24//! - **Caching Strategies**
25//! - Built-in support for LRU caching as well as an in-memory alternative. Flag evaluation results can be cached and later returned with a “CACHED” reason until the configuration updates.
26//!
27//! - **Connection Management**
28//! - Automatic connection establishment with configurable retries, timeout settings, and custom TLS or Unix-socket options.
29//! - Support for upstream name resolution including a custom resolver for Envoy proxy integration.
30//!
31//! ## Installation
32//! Add the dependency in your `Cargo.toml`:
33//! ```toml
34//! [dependencies]
35//! open-feature-flagd = "0.0.1"
36//! open-feature = "0.2"
37//! ```
38//! Then integrate it into your application:
39//!
40//! ```rust,no_run
41//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
42//! use open_feature::provider::FeatureProvider;
43//! use open_feature::EvaluationContext;
44//!
45//! #[tokio::main]
46//! async fn main() {
47//! // Example using the REST resolver mode.
48//! let provider = FlagdProvider::new(FlagdOptions {
49//! host: "localhost".to_string(),
50//! port: 8016,
51//! resolver_type: ResolverType::Rest,
52//! ..Default::default()
53//! }).await.unwrap();
54//!
55//! let context = EvaluationContext::default().with_targeting_key("user-123");
56//! let result = provider.resolve_bool_value("bool-flag", &context).await.unwrap();
57//! println!("Flag value: {}", result.value);
58//! }
59//! ```
60//!
61//! ## Evaluation Modes
62//! ### Remote Resolver (RPC)
63//! In RPC mode, the provider communicates with flagd via gRPC. It supports features like streaming updates, retry mechanisms, and name resolution (including Envoy).
64//!
65//! ```rust,no_run
66//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
67//! use open_feature::provider::FeatureProvider;
68//! use open_feature::EvaluationContext;
69//!
70//! #[tokio::main]
71//! async fn main() {
72//! let provider = FlagdProvider::new(FlagdOptions {
73//! host: "localhost".to_string(),
74//! port: 8013,
75//! resolver_type: ResolverType::Rpc,
76//! ..Default::default()
77//! }).await.unwrap();
78//!
79//! let context = EvaluationContext::default().with_targeting_key("user-123");
80//! let bool_result = provider.resolve_bool_value("feature-enabled", &context).await.unwrap();
81//! println!("Feature enabled: {}", bool_result.value);
82//! }
83//! ```
84//!
85//! ### REST Resolver
86//! In REST mode the provider uses the OpenFeature Remote Evaluation Protocol (OFREP) over HTTP.
87//! It is useful when gRPC is not an option.
88//! ```rust,no_run
89//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
90//! use open_feature::provider::FeatureProvider;
91//! use open_feature::EvaluationContext;
92//!
93//! #[tokio::main]
94//! async fn main() {
95//! let provider = FlagdProvider::new(FlagdOptions {
96//! host: "localhost".to_string(),
97//! port: 8016,
98//! resolver_type: ResolverType::Rest,
99//! ..Default::default()
100//! }).await.unwrap();
101//!
102//! let context = EvaluationContext::default().with_targeting_key("user-456");
103//! let result = provider.resolve_string_value("feature-variant", &context).await.unwrap();
104//! println!("Variant: {}", result.value);
105//! }
106//! ```
107//!
108//! ### In-Process Resolver
109//! In-process evaluation is performed locally. Flag configurations are sourced via gRPC sync stream.
110//! This mode supports advanced targeting operators (fractional, semver, string comparisons)
111//! using the built-in evaluation engine.
112//! ```rust,no_run
113//! use open_feature_flagd::{CacheSettings, FlagdOptions, FlagdProvider, ResolverType};
114//! use open_feature::provider::FeatureProvider;
115//! use open_feature::EvaluationContext;
116//!
117//! #[tokio::main]
118//! async fn main() {
119//! let provider = FlagdProvider::new(FlagdOptions {
120//! host: "localhost".to_string(),
121//! port: 8015,
122//! resolver_type: ResolverType::InProcess,
123//! selector: Some("my-service".to_string()),
124//! cache_settings: Some(CacheSettings::default()),
125//! ..Default::default()
126//! }).await.unwrap();
127//!
128//! let context = EvaluationContext::default()
129//! .with_targeting_key("user-abc")
130//! .with_custom_field("environment", "production")
131//! .with_custom_field("semver", "2.1.0");
132//!
133//! let dark_mode = provider.resolve_bool_value("dark-mode", &context).await.unwrap();
134//! println!("Dark mode enabled: {}", dark_mode.value);
135//! }
136//! ```
137//!
138//! ### File Mode
139//! File mode is an in-process variant where flag configurations are read from a file.
140//! This is useful for development or environments without network access.
141//! ```rust,no_run
142//! use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
143//! use open_feature::provider::FeatureProvider;
144//! use open_feature::EvaluationContext;
145//!
146//! #[tokio::main]
147//! async fn main() {
148//! let file_path = "./path/to/flagd-config.json".to_string();
149//! let provider = FlagdProvider::new(FlagdOptions {
150//! host: "localhost".to_string(),
151//! resolver_type: ResolverType::File,
152//! source_configuration: Some(file_path),
153//! ..Default::default()
154//! }).await.unwrap();
155//!
156//! let context = EvaluationContext::default();
157//! let result = provider.resolve_int_value("rollout-percentage", &context).await.unwrap();
158//! println!("Rollout percentage: {}", result.value);
159//! }
160//! ```
161//!
162//! ## Configuration Options
163//! Configurations can be provided as constructor options or via environment variables (with constructor options taking priority). The following options are supported:
164//!
165//! | Option | Env Variable | Type / Supported Value | Default | Compatible Resolver |
166//! |-----------------------------------------|-----------------------------------------|-----------------------------------|-------------------------------------|--------------------------------|
167//! | Host | FLAGD_HOST | string | "localhost" | RPC, REST, In-Process, File |
168//! | Port | FLAGD_PORT | number | 8013 (RPC), 8016 (REST) | RPC, REST, In-Process, File |
169//! | Target URI | FLAGD_TARGET_URI | string | "" | RPC, In-Process |
170//! | TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
171//! | Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
172//! | Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
173//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
174//! | Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
175//! | Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
176//! | Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
177//! | Retry Backoff (ms) | FLAGD_RETRY_BACKOFF_MS | number | 1000 | RPC, In-Process |
178//! | Retry Backoff Maximum (ms) | FLAGD_RETRY_BACKOFF_MAX_MS | number | 120000 | RPC, In-Process |
179//! | Retry Grace Period | FLAGD_RETRY_GRACE_PERIOD | number | 5 | RPC, In-Process |
180//! | Event Stream Deadline (ms) | FLAGD_STREAM_DEADLINE_MS | number | 600000 | RPC |
181//! | Offline Poll Interval (ms) | FLAGD_OFFLINE_POLL_MS | number | 5000 | File |
182//! | Source Selector | FLAGD_SOURCE_SELECTOR | string | "" | In-Process |
183//!
184//! ## License
185//! Apache 2.0 - See [LICENSE](./../../LICENSE) for more information.
186//!
187
188pub mod cache;
189pub mod error;
190pub mod resolver;
191use crate::error::FlagdError;
192use crate::resolver::in_process::resolver::{FileResolver, InProcessResolver};
193use async_trait::async_trait;
194use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetails};
195use open_feature::{
196 EvaluationContext, EvaluationContextFieldValue, EvaluationError, StructValue, Value,
197};
198use resolver::rest::RestResolver;
199use tracing::debug;
200use tracing::instrument;
201
202use std::collections::BTreeMap;
203use std::sync::Arc;
204
205pub use cache::{CacheService, CacheSettings, CacheType};
206pub use resolver::rpc::RpcResolver;
207
208// Include the generated protobuf code
209pub mod flagd {
210 pub mod evaluation {
211 pub mod v1 {
212 include!(concat!(env!("OUT_DIR"), "/flagd.evaluation.v1.rs"));
213 }
214 }
215 pub mod sync {
216 pub mod v1 {
217 include!(concat!(env!("OUT_DIR"), "/flagd.sync.v1.rs"));
218 }
219 }
220}
221
222/// Configuration options for the flagd provider
223#[derive(Debug, Clone)]
224pub struct FlagdOptions {
225 /// Host address for the service
226 pub host: String,
227 /// Port number for the service
228 pub port: u16,
229 /// Target URI for custom name resolution (e.g. "envoy://service/flagd")
230 pub target_uri: Option<String>,
231 /// Type of resolver to use
232 pub resolver_type: ResolverType,
233 /// Whether to use TLS
234 pub tls: bool,
235 /// Path to TLS certificate
236 pub cert_path: Option<String>,
237 /// Request timeout in milliseconds
238 pub deadline_ms: u32,
239 /// Cache configuration settings
240 pub cache_settings: Option<CacheSettings>,
241 /// Initial backoff duration in milliseconds for retry attempts (default: 1000ms)
242 /// Not supported in OFREP (REST) evaluation
243 pub retry_backoff_ms: u32,
244 /// Maximum backoff duration in milliseconds for retry attempts, prevents exponential backoff from growing indefinitely (default: 120000ms)
245 /// Not supported in OFREP (REST) evaluation
246 pub retry_backoff_max_ms: u32,
247 /// Maximum number of retry attempts before giving up (default: 5)
248 /// Not supported in OFREP (REST) evaluation
249 pub retry_grace_period: u32,
250 /// Source selector for filtering flag configurations
251 /// Used to scope flag sync requests in in-process evaluation
252 pub selector: Option<String>,
253 /// Unix domain socket path for connecting to flagd
254 /// When provided, this takes precedence over host:port configuration
255 /// Example: "/var/run/flagd.sock"
256 /// Only works with GRPC resolver
257 pub socket_path: Option<String>,
258 /// Source configuration for file-based resolver
259 pub source_configuration: Option<String>,
260 /// The deadline in milliseconds for event streaming operations. Set to 0 to disable.
261 /// Recommended to prevent infrastructure from killing idle connections.
262 pub stream_deadline_ms: u32,
263 /// Offline polling interval in milliseconds
264 pub offline_poll_interval_ms: Option<u32>,
265}
266/// Type of resolver to use for flag evaluation
267#[derive(Debug, Clone, PartialEq)]
268pub enum ResolverType {
269 /// Remote evaluation using gRPC connection to flagd service
270 Rpc,
271 /// Remote evaluation using REST connection to flagd service
272 Rest,
273 /// Local evaluation with embedded flag engine using gRPC connection
274 InProcess,
275 /// Local evaluation with no external dependencies
276 File,
277}
278impl Default for FlagdOptions {
279 fn default() -> Self {
280 let resolver_type = if let Ok(r) = std::env::var("FLAGD_RESOLVER") {
281 match r.to_uppercase().as_str() {
282 "RPC" => ResolverType::Rpc,
283 "REST" => ResolverType::Rest,
284 "IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
285 "FILE" | "OFFLINE" => ResolverType::File,
286 _ => ResolverType::Rpc,
287 }
288 } else {
289 ResolverType::Rpc
290 };
291
292 let port = match resolver_type {
293 ResolverType::Rpc => 8013,
294 ResolverType::InProcess => 8015,
295 _ => 8013,
296 };
297
298 let mut options = Self {
299 host: std::env::var("FLAGD_HOST").unwrap_or_else(|_| "localhost".to_string()),
300 port: std::env::var("FLAGD_PORT")
301 .ok()
302 .and_then(|p| p.parse().ok())
303 .unwrap_or(port),
304 target_uri: std::env::var("FLAGD_TARGET_URI").ok(),
305 resolver_type,
306 tls: std::env::var("FLAGD_TLS")
307 .map(|v| v.to_lowercase() == "true")
308 .unwrap_or(false),
309 cert_path: std::env::var("FLAGD_SERVER_CERT_PATH").ok(),
310 deadline_ms: std::env::var("FLAGD_DEADLINE_MS")
311 .ok()
312 .and_then(|v| v.parse().ok())
313 .unwrap_or(500),
314 retry_backoff_ms: std::env::var("FLAGD_RETRY_BACKOFF_MS")
315 .ok()
316 .and_then(|v| v.parse().ok())
317 .unwrap_or(1000),
318 retry_backoff_max_ms: std::env::var("FLAGD_RETRY_BACKOFF_MAX_MS")
319 .ok()
320 .and_then(|v| v.parse().ok())
321 .unwrap_or(120000),
322 retry_grace_period: std::env::var("FLAGD_RETRY_GRACE_PERIOD")
323 .ok()
324 .and_then(|v| v.parse().ok())
325 .unwrap_or(5),
326 stream_deadline_ms: std::env::var("FLAGD_STREAM_DEADLINE_MS")
327 .ok()
328 .and_then(|v| v.parse().ok())
329 .unwrap_or(600000),
330 socket_path: std::env::var("FLAGD_SOCKET_PATH").ok(),
331 selector: std::env::var("FLAGD_SOURCE_SELECTOR").ok(),
332 cache_settings: Some(CacheSettings::default()),
333 source_configuration: std::env::var("FLAGD_OFFLINE_FLAG_SOURCE_PATH").ok(),
334 offline_poll_interval_ms: Some(
335 std::env::var("FLAGD_OFFLINE_POLL_MS")
336 .ok()
337 .and_then(|s| s.parse().ok())
338 .unwrap_or(5000),
339 ),
340 };
341
342 if options.source_configuration.is_some() && options.resolver_type != ResolverType::Rpc {
343 options.resolver_type = ResolverType::File;
344 }
345
346 options
347 }
348}
349
350/// Main provider implementation for flagd
351#[derive(Clone)]
352pub struct FlagdProvider {
353 /// The underlying feature flag resolver
354 provider: Arc<dyn FeatureProvider + Send + Sync>,
355 /// Optional caching layer
356 cache: Option<Arc<CacheService<Value>>>,
357}
358
359impl FlagdProvider {
360 #[instrument(skip(options))]
361 pub async fn new(options: FlagdOptions) -> Result<Self, FlagdError> {
362 debug!("Initializing FlagdProvider with options: {:?}", options);
363
364 let provider: Arc<dyn FeatureProvider + Send + Sync> = match options.resolver_type {
365 ResolverType::Rpc => {
366 debug!("Using RPC resolver");
367 Arc::new(RpcResolver::new(&options).await?)
368 }
369 ResolverType::Rest => {
370 debug!("Using REST resolver");
371 Arc::new(RestResolver::new(&options))
372 }
373 ResolverType::InProcess => {
374 debug!("Using in-process resolver");
375 Arc::new(InProcessResolver::new(&options).await?)
376 }
377 ResolverType::File => {
378 debug!("Using file resolver");
379 Arc::new(
380 FileResolver::new(
381 options.source_configuration.unwrap(),
382 options.cache_settings.clone(),
383 )
384 .await?,
385 )
386 }
387 };
388
389 Ok(Self {
390 provider,
391 cache: options
392 .cache_settings
393 .map(|settings| Arc::new(CacheService::new(settings))),
394 })
395 }
396}
397
398impl std::fmt::Debug for FlagdProvider {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 f.debug_struct("FlagdProvider")
401 .field("cache", &self.cache)
402 .finish()
403 }
404}
405
406fn convert_context(context: &EvaluationContext) -> Option<prost_types::Struct> {
407 let mut fields = BTreeMap::new();
408
409 if let Some(targeting_key) = &context.targeting_key {
410 fields.insert(
411 "targetingKey".to_string(),
412 prost_types::Value {
413 kind: Some(prost_types::value::Kind::StringValue(targeting_key.clone())),
414 },
415 );
416 }
417
418 for (key, value) in &context.custom_fields {
419 let prost_value = match value {
420 EvaluationContextFieldValue::String(s) => prost_types::Value {
421 kind: Some(prost_types::value::Kind::StringValue(s.clone())),
422 },
423 EvaluationContextFieldValue::Bool(b) => prost_types::Value {
424 kind: Some(prost_types::value::Kind::BoolValue(*b)),
425 },
426 EvaluationContextFieldValue::Int(i) => prost_types::Value {
427 kind: Some(prost_types::value::Kind::NumberValue(*i as f64)),
428 },
429 EvaluationContextFieldValue::Float(f) => prost_types::Value {
430 kind: Some(prost_types::value::Kind::NumberValue(*f)),
431 },
432 EvaluationContextFieldValue::DateTime(dt) => prost_types::Value {
433 kind: Some(prost_types::value::Kind::StringValue(dt.to_string())),
434 },
435 EvaluationContextFieldValue::Struct(s) => prost_types::Value {
436 kind: Some(prost_types::value::Kind::StringValue(format!("{:?}", s))),
437 },
438 };
439 fields.insert(key.clone(), prost_value);
440 }
441
442 Some(prost_types::Struct { fields })
443}
444
445fn convert_proto_struct_to_struct_value(proto_struct: prost_types::Struct) -> StructValue {
446 let fields = proto_struct
447 .fields
448 .into_iter()
449 .map(|(key, value)| {
450 (
451 key,
452 match value.kind.unwrap() {
453 prost_types::value::Kind::NullValue(_) => Value::String(String::new()),
454 prost_types::value::Kind::NumberValue(n) => Value::Float(n),
455 prost_types::value::Kind::StringValue(s) => Value::String(s),
456 prost_types::value::Kind::BoolValue(b) => Value::Bool(b),
457 prost_types::value::Kind::StructValue(s) => Value::String(format!("{:?}", s)),
458 prost_types::value::Kind::ListValue(l) => Value::String(format!("{:?}", l)),
459 },
460 )
461 })
462 .collect();
463
464 StructValue { fields }
465}
466
467impl FlagdProvider {
468 async fn get_cached_value<T>(
469 &self,
470 flag_key: &str,
471 context: &EvaluationContext,
472 value_converter: impl Fn(Value) -> Option<T>,
473 ) -> Option<T> {
474 if let Some(cache) = &self.cache {
475 if let Some(cached_value) = cache.get(flag_key, context).await {
476 return value_converter(cached_value);
477 }
478 }
479 None
480 }
481}
482
483#[async_trait]
484impl FeatureProvider for FlagdProvider {
485 fn metadata(&self) -> &ProviderMetadata {
486 self.provider.metadata()
487 }
488
489 async fn resolve_bool_value(
490 &self,
491 flag_key: &str,
492 context: &EvaluationContext,
493 ) -> Result<ResolutionDetails<bool>, EvaluationError> {
494 if let Some(value) = self
495 .get_cached_value(flag_key, context, |v| match v {
496 Value::Bool(b) => Some(b),
497 _ => None,
498 })
499 .await
500 {
501 return Ok(ResolutionDetails::new(value));
502 }
503
504 let result = self.provider.resolve_bool_value(flag_key, context).await?;
505
506 if let Some(cache) = &self.cache {
507 cache
508 .add(flag_key, context, Value::Bool(result.value))
509 .await;
510 }
511
512 Ok(result)
513 }
514
515 async fn resolve_int_value(
516 &self,
517 flag_key: &str,
518 context: &EvaluationContext,
519 ) -> Result<ResolutionDetails<i64>, EvaluationError> {
520 if let Some(value) = self
521 .get_cached_value(flag_key, context, |v| match v {
522 Value::Int(i) => Some(i),
523 _ => None,
524 })
525 .await
526 {
527 return Ok(ResolutionDetails::new(value));
528 }
529
530 let result = self.provider.resolve_int_value(flag_key, context).await?;
531
532 if let Some(cache) = &self.cache {
533 cache.add(flag_key, context, Value::Int(result.value)).await;
534 }
535
536 Ok(result)
537 }
538
539 async fn resolve_float_value(
540 &self,
541 flag_key: &str,
542 context: &EvaluationContext,
543 ) -> Result<ResolutionDetails<f64>, EvaluationError> {
544 if let Some(value) = self
545 .get_cached_value(flag_key, context, |v| match v {
546 Value::Float(f) => Some(f),
547 _ => None,
548 })
549 .await
550 {
551 return Ok(ResolutionDetails::new(value));
552 }
553
554 let result = self.provider.resolve_float_value(flag_key, context).await?;
555
556 if let Some(cache) = &self.cache {
557 cache
558 .add(flag_key, context, Value::Float(result.value))
559 .await;
560 }
561
562 Ok(result)
563 }
564
565 async fn resolve_string_value(
566 &self,
567 flag_key: &str,
568 context: &EvaluationContext,
569 ) -> Result<ResolutionDetails<String>, EvaluationError> {
570 if let Some(value) = self
571 .get_cached_value(flag_key, context, |v| match v {
572 Value::String(s) => Some(s),
573 _ => None,
574 })
575 .await
576 {
577 return Ok(ResolutionDetails::new(value));
578 }
579
580 let result = self
581 .provider
582 .resolve_string_value(flag_key, context)
583 .await?;
584
585 if let Some(cache) = &self.cache {
586 cache
587 .add(flag_key, context, Value::String(result.value.clone()))
588 .await;
589 }
590
591 Ok(result)
592 }
593
594 async fn resolve_struct_value(
595 &self,
596 flag_key: &str,
597 context: &EvaluationContext,
598 ) -> Result<ResolutionDetails<StructValue>, EvaluationError> {
599 if let Some(value) = self
600 .get_cached_value(flag_key, context, |v| match v {
601 Value::Struct(s) => Some(s),
602 _ => None,
603 })
604 .await
605 {
606 return Ok(ResolutionDetails::new(value));
607 }
608
609 let result = self
610 .provider
611 .resolve_struct_value(flag_key, context)
612 .await?;
613
614 if let Some(cache) = &self.cache {
615 cache
616 .add(flag_key, context, Value::Struct(result.value.clone()))
617 .await;
618 }
619
620 Ok(result)
621 }
622}