1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
6use serde_json::json;
7use tracing::{debug, instrument, trace, warn};
8
9use crate::endpoints::Endpoint;
10use crate::event::BatchRequest;
11use crate::feature_flag_evaluations::{
12 EvaluateFlagsOptions, EvaluatedFlagRecord, FeatureFlagEvaluations, FeatureFlagEvaluationsHost,
13 FlagCalledEventParams,
14};
15use crate::feature_flags::{
16 match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagDetail, FlagValue,
17};
18use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
19use crate::{event::InnerEvent, Error, Event};
20
21use super::ClientOptions;
22
23const MAX_FLAG_CALLED_CACHE_SIZE: usize = 50_000;
26
27async fn check_response(response: reqwest::Response) -> Result<(), Error> {
28 let status = response.status().as_u16();
29 let body = response
30 .text()
31 .await
32 .unwrap_or_else(|_| "Unknown error".to_string());
33
34 match Error::from_http_response(status, body) {
35 Some(err) => Err(err),
36 None => Ok(()),
37 }
38}
39
40pub struct Client {
42 options: ClientOptions,
43 client: HttpClient,
44 local_evaluator: Option<LocalEvaluator>,
45 _flag_poller: Option<AsyncFlagPoller>,
46 flag_event_host: OnceLock<Arc<dyn FeatureFlagEvaluationsHost>>,
47}
48
49struct AsyncFlagEventHost {
54 http_client: HttpClient,
55 api_key: String,
56 capture_url: String,
57 disabled: bool,
58 disable_geoip: bool,
59 dedup_cache: Mutex<HashMap<String, HashSet<String>>>,
60 runtime: tokio::runtime::Handle,
66}
67
68impl AsyncFlagEventHost {
69 fn from_options(options: &ClientOptions, http_client: HttpClient) -> Self {
70 let capture_url = options.endpoints().build_url(Endpoint::Capture);
71 Self {
72 http_client,
73 api_key: options.api_key.clone(),
74 capture_url,
75 disabled: options.is_disabled(),
76 disable_geoip: options.disable_geoip,
77 dedup_cache: Mutex::new(HashMap::new()),
78 runtime: tokio::runtime::Handle::current(),
79 }
80 }
81
82 fn already_reported(&self, distinct_id: &str, dedup_key: &str) -> bool {
85 let mut cache = self.dedup_cache.lock().unwrap_or_else(|p| p.into_inner());
86 if let Some(seen) = cache.get(distinct_id) {
87 if seen.contains(dedup_key) {
88 return true;
89 }
90 }
91 if cache.len() >= MAX_FLAG_CALLED_CACHE_SIZE {
92 cache.clear();
93 }
94 cache
95 .entry(distinct_id.to_string())
96 .or_default()
97 .insert(dedup_key.to_string());
98 false
99 }
100
101 fn spawn_ship(&self, event: Event) {
102 if self.disabled {
103 return;
104 }
105 let inner_event = InnerEvent::new(event, self.api_key.clone());
106 let payload = match serde_json::to_string(&inner_event) {
107 Ok(p) => p,
108 Err(e) => {
109 debug!(error = %e, "failed to serialize $feature_flag_called event");
110 return;
111 }
112 };
113 let http_client = self.http_client.clone();
114 let url = self.capture_url.clone();
115 self.runtime.spawn(async move {
116 let response = match http_client
117 .post(&url)
118 .header(CONTENT_TYPE, "application/json")
119 .body(payload)
120 .send()
121 .await
122 {
123 Ok(r) => r,
124 Err(send_err) => {
125 let message = send_err.to_string();
126 debug!("failed to send $feature_flag_called event: {message}");
127 return;
128 }
129 };
130 if let Err(check_err) = check_response(response).await {
131 let message = check_err.to_string();
132 debug!("$feature_flag_called event rejected by server: {message}");
133 }
134 });
135 }
136}
137
138impl FeatureFlagEvaluationsHost for AsyncFlagEventHost {
139 fn capture_flag_called_event_if_needed(&self, params: FlagCalledEventParams) {
140 let dedup_key = build_dedup_key(¶ms.key, params.response.as_ref());
141 if self.already_reported(¶ms.distinct_id, &dedup_key) {
142 return;
143 }
144
145 let mut event = Event::new(
146 "$feature_flag_called".to_string(),
147 params.distinct_id.clone(),
148 );
149 for (k, v) in params.properties {
150 if event.insert_prop(k, v).is_err() {
151 return;
152 }
153 }
154 for (group_name, group_id) in ¶ms.groups {
155 event.add_group(group_name, group_id);
156 }
157 if params.disable_geoip.unwrap_or(self.disable_geoip) {
158 let _ = event.insert_prop("$geoip_disable", true);
159 }
160 self.spawn_ship(event);
161 }
162
163 fn log_warning(&self, message: &str) {
164 warn!("{message}");
167 }
168}
169
170fn build_dedup_key(flag_key: &str, response: Option<&FlagValue>) -> String {
171 let response_repr = match response {
172 Some(FlagValue::Boolean(true)) => "true".to_string(),
173 Some(FlagValue::Boolean(false)) => "false".to_string(),
174 Some(FlagValue::String(s)) => s.clone(),
175 None => "::null::".to_string(),
176 };
177 format!("{flag_key}_{response_repr}")
178}
179
180pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
182 let options = options.into().sanitize();
183 let client = HttpClient::builder()
184 .timeout(Duration::from_secs(options.request_timeout_seconds))
185 .build()
186 .unwrap(); let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
189 if let Some(ref personal_key) = options.personal_api_key {
190 let cache = FlagCache::new();
191
192 let config = LocalEvaluationConfig {
193 personal_api_key: personal_key.clone(),
194 project_api_key: options.api_key.clone(),
195 api_host: options.endpoints().api_host(),
196 poll_interval: Duration::from_secs(options.poll_interval_seconds),
197 request_timeout: Duration::from_secs(options.request_timeout_seconds),
198 };
199
200 let mut poller = AsyncFlagPoller::new(config, cache.clone());
201 poller.start().await;
202
203 (Some(LocalEvaluator::new(cache)), Some(poller))
204 } else {
205 warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
206 (None, None)
207 }
208 } else {
209 (None, None)
210 };
211
212 Client {
213 options,
214 client,
215 local_evaluator,
216 _flag_poller: flag_poller,
217 flag_event_host: OnceLock::new(),
218 }
219}
220
221impl Client {
222 #[instrument(skip(self, event), level = "debug")]
224 pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
225 if self.options.is_disabled() {
226 trace!("Client is disabled, skipping capture");
227 return Ok(());
228 }
229
230 if self.options.disable_geoip {
232 event.insert_prop("$geoip_disable", true).ok();
233 }
234
235 let inner_event = InnerEvent::new(event, self.options.api_key.clone());
236
237 let payload =
238 serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
239
240 let url = self.options.endpoints().build_url(Endpoint::Capture);
241
242 let response = self
243 .client
244 .post(&url)
245 .header(CONTENT_TYPE, "application/json")
246 .body(payload)
247 .send()
248 .await
249 .map_err(|e| Error::Connection(e.to_string()))?;
250
251 check_response(response).await
252 }
253
254 pub async fn capture_batch(
258 &self,
259 events: Vec<Event>,
260 historical_migration: bool,
261 ) -> Result<(), Error> {
262 if self.options.is_disabled() {
263 return Ok(());
264 }
265
266 let disable_geoip = self.options.disable_geoip;
267 let inner_events: Vec<InnerEvent> = events
268 .into_iter()
269 .map(|mut event| {
270 if disable_geoip {
271 event.insert_prop("$geoip_disable", true).ok();
272 }
273 InnerEvent::new(event, self.options.api_key.clone())
274 })
275 .collect();
276
277 let batch_request = BatchRequest {
278 api_key: self.options.api_key.clone(),
279 historical_migration,
280 batch: inner_events,
281 };
282 let payload = serde_json::to_string(&batch_request)
283 .map_err(|e| Error::Serialization(e.to_string()))?;
284 let url = self.options.endpoints().build_url(Endpoint::Batch);
285
286 let response = self
287 .client
288 .post(&url)
289 .header(CONTENT_TYPE, "application/json")
290 .body(payload)
291 .send()
292 .await
293 .map_err(|e| Error::Connection(e.to_string()))?;
294
295 check_response(response).await
296 }
297
298 #[must_use = "feature flags result should be used"]
300 pub async fn get_feature_flags<S: Into<String>>(
301 &self,
302 distinct_id: S,
303 groups: Option<HashMap<String, String>>,
304 person_properties: Option<HashMap<String, serde_json::Value>>,
305 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
306 ) -> Result<
307 (
308 HashMap<String, FlagValue>,
309 HashMap<String, serde_json::Value>,
310 ),
311 Error,
312 > {
313 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
314
315 let mut payload = json!({
316 "api_key": self.options.api_key,
317 "distinct_id": distinct_id.into(),
318 });
319
320 if let Some(groups) = groups {
321 payload["groups"] = json!(groups);
322 }
323
324 if let Some(person_properties) = person_properties {
325 payload["person_properties"] = json!(person_properties);
326 }
327
328 if let Some(group_properties) = group_properties {
329 payload["group_properties"] = json!(group_properties);
330 }
331
332 if self.options.disable_geoip {
334 payload["disable_geoip"] = json!(true);
335 }
336
337 let response = self
338 .client
339 .post(&flags_endpoint)
340 .header(CONTENT_TYPE, "application/json")
341 .json(&payload)
342 .timeout(Duration::from_secs(
343 self.options.feature_flags_request_timeout_seconds,
344 ))
345 .send()
346 .await
347 .map_err(|e| Error::Connection(e.to_string()))?;
348
349 if !response.status().is_success() {
350 let status = response.status();
351 let text = response
352 .text()
353 .await
354 .unwrap_or_else(|_| "Unknown error".to_string());
355 return Err(Error::Connection(format!(
356 "API request failed with status {status}: {text}"
357 )));
358 }
359
360 let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
361 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
362 })?;
363
364 Ok(flags_response.normalize())
365 }
366
367 #[must_use = "feature flag result should be used"]
369 #[instrument(skip_all, level = "debug")]
370 #[deprecated(
371 since = "0.6.0",
372 note = "Use Client::evaluate_flags() to fetch a snapshot, then call .get_flag(key) on it. \
373 The snapshot deduplicates $feature_flag_called events and supports attaching \
374 rich metadata to captured events via Event::with_flags()."
375 )]
376 pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
377 &self,
378 key: K,
379 distinct_id: D,
380 groups: Option<HashMap<String, String>>,
381 person_properties: Option<HashMap<String, serde_json::Value>>,
382 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
383 ) -> Result<Option<FlagValue>, Error> {
384 let key_str = key.into();
385 let distinct_id_str = distinct_id.into();
386
387 if let Some(ref evaluator) = self.local_evaluator {
389 let empty = HashMap::new();
390 let props = person_properties.as_ref().unwrap_or(&empty);
391 match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
392 Ok(Some(value)) => {
393 debug!(flag = %key_str, ?value, "Flag evaluated locally");
394 return Ok(Some(value));
395 }
396 Ok(None) => {
397 if self.options.local_evaluation_only {
398 debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
399 return Ok(None);
400 }
401 debug!(flag = %key_str, "Flag not found locally, falling back to API");
402 }
403 Err(e) => {
404 if self.options.local_evaluation_only {
405 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
406 return Ok(None);
407 }
408 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
409 }
410 }
411 }
412
413 trace!(flag = %key_str, "Fetching flag from API");
415 let (feature_flags, _payloads) = self
416 .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
417 .await?;
418 Ok(feature_flags.get(&key_str).cloned())
419 }
420
421 #[must_use = "feature flag enabled check result should be used"]
423 #[deprecated(
424 since = "0.6.0",
425 note = "Use Client::evaluate_flags() to fetch a snapshot, then call .is_enabled(key) \
426 on it. The snapshot deduplicates $feature_flag_called events and supports \
427 attaching rich metadata to captured events via Event::with_flags()."
428 )]
429 #[allow(deprecated)] pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
431 &self,
432 key: K,
433 distinct_id: D,
434 groups: Option<HashMap<String, String>>,
435 person_properties: Option<HashMap<String, serde_json::Value>>,
436 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
437 ) -> Result<bool, Error> {
438 let flag_value = self
439 .get_feature_flag(
440 key.into(),
441 distinct_id.into(),
442 groups,
443 person_properties,
444 group_properties,
445 )
446 .await?;
447 Ok(match flag_value {
448 Some(FlagValue::Boolean(b)) => b,
449 Some(FlagValue::String(_)) => true, None => false,
451 })
452 }
453
454 #[must_use = "feature flag payload result should be used"]
456 #[deprecated(
457 since = "0.6.0",
458 note = "Use Client::evaluate_flags() to fetch a snapshot, then call \
459 .get_flag_payload(key) on it. Reading the payload from a snapshot is \
460 event-free, matching this method's behavior, and avoids the per-call \
461 /flags request."
462 )]
463 pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
464 &self,
465 key: K,
466 distinct_id: D,
467 ) -> Result<Option<serde_json::Value>, Error> {
468 let key_str = key.into();
469 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
470
471 let mut payload = json!({
472 "api_key": self.options.api_key,
473 "distinct_id": distinct_id.into(),
474 });
475
476 if self.options.disable_geoip {
478 payload["disable_geoip"] = json!(true);
479 }
480
481 let response = self
482 .client
483 .post(&flags_endpoint)
484 .header(CONTENT_TYPE, "application/json")
485 .json(&payload)
486 .timeout(Duration::from_secs(
487 self.options.feature_flags_request_timeout_seconds,
488 ))
489 .send()
490 .await
491 .map_err(|e| Error::Connection(e.to_string()))?;
492
493 if !response.status().is_success() {
494 return Ok(None);
495 }
496
497 let flags_response: FeatureFlagsResponse = response
498 .json()
499 .await
500 .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
501
502 let (_flags, payloads) = flags_response.normalize();
503 Ok(payloads.get(&key_str).cloned())
504 }
505
506 pub fn evaluate_feature_flag_locally(
508 &self,
509 flag: &FeatureFlag,
510 distinct_id: &str,
511 person_properties: &HashMap<String, serde_json::Value>,
512 ) -> Result<FlagValue, Error> {
513 match_feature_flag(flag, distinct_id, person_properties)
514 .map_err(|e| Error::InconclusiveMatch(e.message))
515 }
516
517 pub async fn evaluate_flags<S: Into<String>>(
528 &self,
529 distinct_id: S,
530 options: EvaluateFlagsOptions,
531 ) -> Result<FeatureFlagEvaluations, Error> {
532 let distinct_id: String = distinct_id.into();
533 let host = self.flag_event_host();
534
535 if distinct_id.is_empty() || self.options.is_disabled() {
536 return Ok(FeatureFlagEvaluations::empty(host));
537 }
538
539 let mut records: HashMap<String, EvaluatedFlagRecord> = HashMap::new();
540 let mut locally_evaluated_keys: HashSet<String> = HashSet::new();
541
542 if let Some(evaluator) = &self.local_evaluator {
543 let person_props_owned = options.person_properties.clone().unwrap_or_default();
544 let local_results = evaluator.evaluate_all_flags(&distinct_id, &person_props_owned);
545 for (key, result) in local_results {
546 if let Some(filter) = &options.flag_keys {
547 if !filter.iter().any(|k| k == &key) {
548 continue;
549 }
550 }
551 if let Ok(value) = result {
552 records.insert(key.clone(), local_record(value));
553 locally_evaluated_keys.insert(key);
554 }
555 }
556 }
557
558 let mut request_id: Option<String> = None;
559 let mut errors_while_computing = false;
560 let mut quota_limited = false;
561
562 let local_covers_request = options
567 .flag_keys
568 .as_ref()
569 .is_some_and(|keys| keys.iter().all(|k| locally_evaluated_keys.contains(k)));
570
571 if !options.only_evaluate_locally && !local_covers_request {
572 match self.fetch_flag_details(&distinct_id, &options).await {
577 Ok(response) => {
578 request_id = response.request_id;
579 errors_while_computing = response.errors_while_computing_flags;
580 quota_limited = response.quota_limited;
581 for (key, detail) in response.flags {
582 if locally_evaluated_keys.contains(&key) {
583 continue;
584 }
585 records.insert(key, remote_record_from_detail(detail));
586 }
587 }
588 Err(e) => {
589 if records.is_empty() {
590 return Err(e);
591 }
592 debug!(
593 error = e.to_string(),
594 local_count = records.len(),
595 "/flags fetch failed; returning snapshot from local results only"
596 );
597 errors_while_computing = true;
598 }
599 }
600 }
601
602 Ok(FeatureFlagEvaluations::new(
603 host,
604 distinct_id,
605 records,
606 options.groups.unwrap_or_default(),
607 options.disable_geoip,
608 request_id,
609 None,
610 errors_while_computing,
611 quota_limited,
612 ))
613 }
614
615 fn flag_event_host(&self) -> Arc<dyn FeatureFlagEvaluationsHost> {
616 self.flag_event_host
617 .get_or_init(|| {
618 Arc::new(AsyncFlagEventHost::from_options(
619 &self.options,
620 self.client.clone(),
621 )) as Arc<dyn FeatureFlagEvaluationsHost>
622 })
623 .clone()
624 }
625
626 async fn fetch_flag_details(
627 &self,
628 distinct_id: &str,
629 options: &EvaluateFlagsOptions,
630 ) -> Result<DetailedFlagsResponse, Error> {
631 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
632
633 let mut payload = json!({
634 "api_key": self.options.api_key,
635 "distinct_id": distinct_id,
636 });
637 if let Some(groups) = &options.groups {
638 payload["groups"] = json!(groups);
639 }
640 if let Some(person_properties) = &options.person_properties {
641 payload["person_properties"] = json!(person_properties);
642 }
643 if let Some(group_properties) = &options.group_properties {
644 payload["group_properties"] = json!(group_properties);
645 }
646 let effective_disable_geoip = options.disable_geoip.unwrap_or(self.options.disable_geoip);
647 if effective_disable_geoip {
648 payload["disable_geoip"] = json!(true);
649 }
650 if let Some(flag_keys) = &options.flag_keys {
651 payload["flag_keys_to_evaluate"] = json!(flag_keys);
652 }
653
654 let response = self
655 .client
656 .post(&flags_endpoint)
657 .header(CONTENT_TYPE, "application/json")
658 .json(&payload)
659 .timeout(Duration::from_secs(
660 self.options.feature_flags_request_timeout_seconds,
661 ))
662 .send()
663 .await
664 .map_err(|e| Error::Connection(e.to_string()))?;
665
666 if !response.status().is_success() {
667 let status = response.status();
668 let text = response
669 .text()
670 .await
671 .unwrap_or_else(|_| "Unknown error".to_string());
672 return Err(Error::Connection(format!(
673 "API request failed with status {status}: {text}"
674 )));
675 }
676
677 let parsed = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
678 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
679 })?;
680 Ok(extract_flag_details(parsed))
681 }
682}
683
684struct DetailedFlagsResponse {
687 flags: HashMap<String, FlagDetail>,
688 request_id: Option<String>,
689 errors_while_computing_flags: bool,
690 quota_limited: bool,
691}
692
693fn extract_flag_details(response: FeatureFlagsResponse) -> DetailedFlagsResponse {
694 match response {
695 FeatureFlagsResponse::V2 {
696 flags,
697 request_id,
698 errors_while_computing_flags,
699 quota_limited,
700 } => DetailedFlagsResponse {
701 flags,
702 request_id,
703 errors_while_computing_flags,
704 quota_limited,
705 },
706 FeatureFlagsResponse::Legacy {
707 feature_flags,
708 feature_flag_payloads,
709 errors,
710 } => {
711 let mut flags = HashMap::new();
712 for (key, value) in feature_flags {
713 let (enabled, variant) = match value {
714 FlagValue::Boolean(b) => (b, None),
715 FlagValue::String(s) => (true, Some(s)),
716 };
717 let payload = feature_flag_payloads.get(&key).cloned();
718 flags.insert(
719 key.clone(),
720 FlagDetail {
721 key,
722 enabled,
723 variant,
724 reason: None,
725 metadata: payload.map(|payload| crate::feature_flags::FlagMetadata {
726 id: 0,
727 version: 0,
728 description: None,
729 payload: Some(payload),
730 }),
731 },
732 );
733 }
734 DetailedFlagsResponse {
735 flags,
736 request_id: None,
737 errors_while_computing_flags: errors.is_some_and(|e| !e.is_empty()),
738 quota_limited: false,
739 }
740 }
741 }
742}
743
744fn local_record(value: FlagValue) -> EvaluatedFlagRecord {
745 let (enabled, variant) = match value {
746 FlagValue::Boolean(b) => (b, None),
747 FlagValue::String(s) => (true, Some(s)),
748 };
749 EvaluatedFlagRecord {
750 enabled,
751 variant,
752 payload: None,
754 id: None,
755 version: None,
756 reason: Some("Evaluated locally".to_string()),
757 locally_evaluated: true,
758 }
759}
760
761fn remote_record_from_detail(detail: FlagDetail) -> EvaluatedFlagRecord {
762 let metadata = detail.metadata;
763 let reason = detail
764 .reason
765 .and_then(|r| r.description.or(Some(r.code)))
766 .filter(|s| !s.is_empty());
767 let id = metadata.as_ref().map(|m| m.id);
768 let version = metadata.as_ref().map(|m| m.version);
769 let payload = metadata.and_then(|m| m.payload).map(normalize_payload);
770 EvaluatedFlagRecord {
771 enabled: detail.enabled,
772 variant: detail.variant,
773 payload,
774 id,
775 version,
776 reason,
777 locally_evaluated: false,
778 }
779}
780
781fn normalize_payload(payload: serde_json::Value) -> serde_json::Value {
786 match payload {
787 serde_json::Value::String(raw) => {
788 serde_json::from_str(&raw).unwrap_or(serde_json::Value::String(raw))
789 }
790 other => other,
791 }
792}