Skip to main content

posthog_rs/client/
async_client.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
6use serde_json::json;
7use tracing::{debug, instrument, trace, warn};
8
9use crate::endpoints::Endpoint;
10use crate::event::BatchRequest;
11use crate::feature_flag_evaluations::{
12    EvaluateFlagsOptions, EvaluatedFlagRecord, FeatureFlagEvaluations, FeatureFlagEvaluationsHost,
13    FlagCalledEventParams,
14};
15use crate::feature_flags::{
16    match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagDetail, FlagValue,
17};
18use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
19use crate::{event::InnerEvent, Error, Event};
20
21use super::ClientOptions;
22
23/// Cap on the number of `distinct_id` entries in the `$feature_flag_called`
24/// dedup cache. On overflow the entire map is reset (matches the JS SDK).
25const MAX_FLAG_CALLED_CACHE_SIZE: usize = 50_000;
26
27async fn check_response(response: reqwest::Response) -> Result<(), Error> {
28    let status = response.status().as_u16();
29    let body = response
30        .text()
31        .await
32        .unwrap_or_else(|_| "Unknown error".to_string());
33
34    match Error::from_http_response(status, body) {
35        Some(err) => Err(err),
36        None => Ok(()),
37    }
38}
39
40/// A [`Client`] facilitates interactions with the PostHog API over HTTP.
41pub struct Client {
42    options: ClientOptions,
43    client: HttpClient,
44    local_evaluator: Option<LocalEvaluator>,
45    _flag_poller: Option<AsyncFlagPoller>,
46    flag_event_host: OnceLock<Arc<dyn FeatureFlagEvaluationsHost>>,
47}
48
49/// Implementation of [`FeatureFlagEvaluationsHost`] that emits dedup-aware
50/// `$feature_flag_called` events through a clone of the async [`Client`]'s
51/// HTTP transport. The event ship is fire-and-forget: errors are logged at
52/// `debug` level but do not surface to the caller, matching the JS SDK.
53struct AsyncFlagEventHost {
54    http_client: HttpClient,
55    api_key: String,
56    capture_url: String,
57    disabled: bool,
58    disable_geoip: bool,
59    dedup_cache: Mutex<HashMap<String, HashSet<String>>>,
60    /// Tokio runtime handle captured at host construction (which always runs
61    /// inside the runtime that hosts `evaluate_flags`). This lets snapshot
62    /// access methods spawn `$feature_flag_called` shipping from any thread —
63    /// including ones without an entered runtime — by routing through the
64    /// captured handle instead of the free `tokio::spawn` (which would panic).
65    runtime: tokio::runtime::Handle,
66}
67
68impl AsyncFlagEventHost {
69    fn from_options(options: &ClientOptions, http_client: HttpClient) -> Self {
70        let capture_url = options.endpoints().build_url(Endpoint::Capture);
71        Self {
72            http_client,
73            api_key: options.api_key.clone(),
74            capture_url,
75            disabled: options.is_disabled(),
76            disable_geoip: options.disable_geoip,
77            dedup_cache: Mutex::new(HashMap::new()),
78            runtime: tokio::runtime::Handle::current(),
79        }
80    }
81
82    /// Returns `true` when the helper has already shipped this
83    /// `(distinct_id, key, response)` combination and the caller should skip.
84    fn already_reported(&self, distinct_id: &str, dedup_key: &str) -> bool {
85        let mut cache = self.dedup_cache.lock().unwrap_or_else(|p| p.into_inner());
86        if let Some(seen) = cache.get(distinct_id) {
87            if seen.contains(dedup_key) {
88                return true;
89            }
90        }
91        if cache.len() >= MAX_FLAG_CALLED_CACHE_SIZE {
92            cache.clear();
93        }
94        cache
95            .entry(distinct_id.to_string())
96            .or_default()
97            .insert(dedup_key.to_string());
98        false
99    }
100
101    fn spawn_ship(&self, event: Event) {
102        if self.disabled {
103            return;
104        }
105        let inner_event = InnerEvent::new(event, self.api_key.clone());
106        let payload = match serde_json::to_string(&inner_event) {
107            Ok(p) => p,
108            Err(e) => {
109                debug!(error = %e, "failed to serialize $feature_flag_called event");
110                return;
111            }
112        };
113        let http_client = self.http_client.clone();
114        let url = self.capture_url.clone();
115        self.runtime.spawn(async move {
116            let response = match http_client
117                .post(&url)
118                .header(CONTENT_TYPE, "application/json")
119                .body(payload)
120                .send()
121                .await
122            {
123                Ok(r) => r,
124                Err(send_err) => {
125                    let message = send_err.to_string();
126                    debug!("failed to send $feature_flag_called event: {message}");
127                    return;
128                }
129            };
130            if let Err(check_err) = check_response(response).await {
131                let message = check_err.to_string();
132                debug!("$feature_flag_called event rejected by server: {message}");
133            }
134        });
135    }
136}
137
138impl FeatureFlagEvaluationsHost for AsyncFlagEventHost {
139    fn capture_flag_called_event_if_needed(&self, params: FlagCalledEventParams) {
140        let dedup_key = build_dedup_key(&params.key, params.response.as_ref(), &params.groups);
141        if self.already_reported(&params.distinct_id, &dedup_key) {
142            return;
143        }
144
145        let mut event = Event::new(
146            "$feature_flag_called".to_string(),
147            params.distinct_id.clone(),
148        );
149        for (k, v) in params.properties {
150            if event.insert_prop(k, v).is_err() {
151                return;
152            }
153        }
154        for (group_name, group_id) in &params.groups {
155            event.add_group(group_name, group_id);
156        }
157        if params.disable_geoip.unwrap_or(self.disable_geoip) {
158            let _ = event.insert_prop("$geoip_disable", true);
159        }
160        self.spawn_ship(event);
161    }
162
163    fn log_warning(&self, message: &str) {
164        // Surface filter-helper misuse via tracing — users can silence these
165        // with their tracing-subscriber level filter (e.g. `posthog_rs=error`).
166        warn!("{message}");
167    }
168}
169
170fn build_dedup_key(
171    flag_key: &str,
172    response: Option<&FlagValue>,
173    groups: &HashMap<String, String>,
174) -> String {
175    let response_repr = match response {
176        Some(FlagValue::Boolean(true)) => "true".to_string(),
177        Some(FlagValue::Boolean(false)) => "false".to_string(),
178        Some(FlagValue::String(s)) => s.clone(),
179        None => "::null::".to_string(),
180    };
181    if groups.is_empty() {
182        format!("{flag_key}_{response_repr}")
183    } else {
184        // Canonicalize so two equal group maps with different insertion orders
185        // produce the same dedup key — necessary for group-scoped flags to fire
186        // exactly once per distinct group context.
187        let mut sorted: Vec<(&String, &String)> = groups.iter().collect();
188        sorted.sort_by(|a, b| a.0.cmp(b.0));
189        let groups_repr: String = sorted
190            .iter()
191            .map(|(k, v)| format!("{}={}", pct(k), pct(v)))
192            .collect::<Vec<_>>()
193            .join(";");
194        format!("{flag_key}_{response_repr}_{groups_repr}")
195    }
196}
197
198fn pct(s: &str) -> String {
199    s.replace('%', "%25")
200        .replace('=', "%3D")
201        .replace(';', "%3B")
202}
203
204/// This function constructs a new client using the options provided.
205pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
206    let options = options.into().sanitize();
207    let client = HttpClient::builder()
208        .timeout(Duration::from_secs(options.request_timeout_seconds))
209        .build()
210        .unwrap(); // Unwrap here is as safe as `HttpClient::new`
211
212    let (local_evaluator, flag_poller) = if options.enable_local_evaluation
213        && !options.is_disabled()
214    {
215        if let Some(ref personal_key) = options.personal_api_key {
216            let cache = FlagCache::new();
217
218            let config = LocalEvaluationConfig {
219                personal_api_key: personal_key.clone(),
220                project_api_key: options.api_key.clone(),
221                api_host: options.endpoints().api_host(),
222                poll_interval: Duration::from_secs(options.poll_interval_seconds),
223                request_timeout: Duration::from_secs(options.request_timeout_seconds),
224            };
225
226            let mut poller = AsyncFlagPoller::new(config, cache.clone());
227            poller.start().await;
228
229            (Some(LocalEvaluator::new(cache)), Some(poller))
230        } else {
231            warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
232            (None, None)
233        }
234    } else {
235        (None, None)
236    };
237
238    Client {
239        options,
240        client,
241        local_evaluator,
242        _flag_poller: flag_poller,
243        flag_event_host: OnceLock::new(),
244    }
245}
246
247impl Client {
248    /// Capture the provided event, sending it to PostHog.
249    #[instrument(skip(self, event), level = "debug")]
250    pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
251        if self.options.is_disabled() {
252            trace!("Client is disabled, skipping capture");
253            return Ok(());
254        }
255
256        // Add geoip disable property if configured
257        if self.options.disable_geoip {
258            event.insert_prop("$geoip_disable", true).ok();
259        }
260
261        let inner_event = InnerEvent::new(event, self.options.api_key.clone());
262
263        let payload =
264            serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
265
266        let url = self.options.endpoints().build_url(Endpoint::Capture);
267
268        let response = self
269            .client
270            .post(&url)
271            .header(CONTENT_TYPE, "application/json")
272            .body(payload)
273            .send()
274            .await
275            .map_err(|e| Error::Connection(e.to_string()))?;
276
277        check_response(response).await
278    }
279
280    /// Capture a collection of events with a single request. Events are sent to
281    /// the `/batch/` endpoint. Set `historical_migration` to `true` to route
282    /// events to the historical ingestion topic, bypassing the main pipeline.
283    pub async fn capture_batch(
284        &self,
285        events: Vec<Event>,
286        historical_migration: bool,
287    ) -> Result<(), Error> {
288        if self.options.is_disabled() {
289            return Ok(());
290        }
291
292        let disable_geoip = self.options.disable_geoip;
293        let inner_events: Vec<InnerEvent> = events
294            .into_iter()
295            .map(|mut event| {
296                if disable_geoip {
297                    event.insert_prop("$geoip_disable", true).ok();
298                }
299                InnerEvent::new(event, self.options.api_key.clone())
300            })
301            .collect();
302
303        let batch_request = BatchRequest {
304            api_key: self.options.api_key.clone(),
305            historical_migration,
306            batch: inner_events,
307        };
308        let payload = serde_json::to_string(&batch_request)
309            .map_err(|e| Error::Serialization(e.to_string()))?;
310        let url = self.options.endpoints().build_url(Endpoint::Batch);
311
312        let response = self
313            .client
314            .post(&url)
315            .header(CONTENT_TYPE, "application/json")
316            .body(payload)
317            .send()
318            .await
319            .map_err(|e| Error::Connection(e.to_string()))?;
320
321        check_response(response).await
322    }
323
324    /// Get all feature flags for a user
325    #[must_use = "feature flags result should be used"]
326    pub async fn get_feature_flags<S: Into<String>>(
327        &self,
328        distinct_id: S,
329        groups: Option<HashMap<String, String>>,
330        person_properties: Option<HashMap<String, serde_json::Value>>,
331        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
332    ) -> Result<
333        (
334            HashMap<String, FlagValue>,
335            HashMap<String, serde_json::Value>,
336        ),
337        Error,
338    > {
339        if self.options.is_disabled() {
340            trace!("Client is disabled, skipping feature flags request");
341            return Ok((HashMap::new(), HashMap::new()));
342        }
343
344        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
345
346        let mut payload = json!({
347            "api_key": self.options.api_key,
348            "distinct_id": distinct_id.into(),
349        });
350
351        if let Some(groups) = groups {
352            payload["groups"] = json!(groups);
353        }
354
355        if let Some(person_properties) = person_properties {
356            payload["person_properties"] = json!(person_properties);
357        }
358
359        if let Some(group_properties) = group_properties {
360            payload["group_properties"] = json!(group_properties);
361        }
362
363        // Add geoip disable parameter if configured
364        if self.options.disable_geoip {
365            payload["disable_geoip"] = json!(true);
366        }
367
368        let response = self
369            .client
370            .post(&flags_endpoint)
371            .header(CONTENT_TYPE, "application/json")
372            .json(&payload)
373            .timeout(Duration::from_secs(
374                self.options.feature_flags_request_timeout_seconds,
375            ))
376            .send()
377            .await
378            .map_err(|e| Error::Connection(e.to_string()))?;
379
380        if !response.status().is_success() {
381            let status = response.status();
382            let text = response
383                .text()
384                .await
385                .unwrap_or_else(|_| "Unknown error".to_string());
386            return Err(Error::Connection(format!(
387                "API request failed with status {status}: {text}"
388            )));
389        }
390
391        let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
392            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
393        })?;
394
395        Ok(flags_response.normalize())
396    }
397
398    /// Get a specific feature flag value for a user.
399    #[must_use = "feature flag result should be used"]
400    #[instrument(skip_all, level = "debug")]
401    #[deprecated(
402        since = "0.6.0",
403        note = "Use Client::evaluate_flags() to fetch a snapshot, then call .get_flag(key) on it. \
404                The snapshot deduplicates $feature_flag_called events and supports attaching \
405                rich metadata to captured events via Event::with_flags()."
406    )]
407    pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
408        &self,
409        key: K,
410        distinct_id: D,
411        groups: Option<HashMap<String, String>>,
412        person_properties: Option<HashMap<String, serde_json::Value>>,
413        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
414    ) -> Result<Option<FlagValue>, Error> {
415        let key_str = key.into();
416        let distinct_id_str = distinct_id.into();
417
418        // Try local evaluation first if available
419        if let Some(ref evaluator) = self.local_evaluator {
420            let empty_props = HashMap::new();
421            let empty_groups: HashMap<String, String> = HashMap::new();
422            let empty_group_props: HashMap<String, HashMap<String, serde_json::Value>> =
423                HashMap::new();
424            let props = person_properties.as_ref().unwrap_or(&empty_props);
425            let groups_ref = groups.as_ref().unwrap_or(&empty_groups);
426            let group_props_ref = group_properties.as_ref().unwrap_or(&empty_group_props);
427            match evaluator.evaluate_flag(
428                &key_str,
429                &distinct_id_str,
430                props,
431                groups_ref,
432                group_props_ref,
433            ) {
434                Ok(Some(value)) => {
435                    debug!(flag = %key_str, ?value, "Flag evaluated locally");
436                    return Ok(Some(value));
437                }
438                Ok(None) => {
439                    if self.options.local_evaluation_only {
440                        debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
441                        return Ok(None);
442                    }
443                    debug!(flag = %key_str, "Flag not found locally, falling back to API");
444                }
445                Err(e) => {
446                    if self.options.local_evaluation_only {
447                        debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
448                        return Ok(None);
449                    }
450                    debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
451                }
452            }
453        }
454
455        // Fall back to API
456        trace!(flag = %key_str, "Fetching flag from API");
457        let (feature_flags, _payloads) = self
458            .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
459            .await?;
460        Ok(feature_flags.get(&key_str).cloned())
461    }
462
463    /// Check if a feature flag is enabled for a user.
464    #[must_use = "feature flag enabled check result should be used"]
465    #[deprecated(
466        since = "0.6.0",
467        note = "Use Client::evaluate_flags() to fetch a snapshot, then call .is_enabled(key) \
468                on it. The snapshot deduplicates $feature_flag_called events and supports \
469                attaching rich metadata to captured events via Event::with_flags()."
470    )]
471    #[allow(deprecated)] // calls deprecated get_feature_flag internally
472    pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
473        &self,
474        key: K,
475        distinct_id: D,
476        groups: Option<HashMap<String, String>>,
477        person_properties: Option<HashMap<String, serde_json::Value>>,
478        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
479    ) -> Result<bool, Error> {
480        let flag_value = self
481            .get_feature_flag(
482                key.into(),
483                distinct_id.into(),
484                groups,
485                person_properties,
486                group_properties,
487            )
488            .await?;
489        Ok(match flag_value {
490            Some(FlagValue::Boolean(b)) => b,
491            Some(FlagValue::String(_)) => true, // Variants are considered enabled
492            None => false,
493        })
494    }
495
496    /// Get a feature flag payload for a user.
497    #[must_use = "feature flag payload result should be used"]
498    #[deprecated(
499        since = "0.6.0",
500        note = "Use Client::evaluate_flags() to fetch a snapshot, then call \
501                .get_flag_payload(key) on it. Reading the payload from a snapshot is \
502                event-free, matching this method's behavior, and avoids the per-call \
503                /flags request."
504    )]
505    pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
506        &self,
507        key: K,
508        distinct_id: D,
509    ) -> Result<Option<serde_json::Value>, Error> {
510        if self.options.is_disabled() {
511            trace!("Client is disabled, skipping feature flag payload request");
512            return Ok(None);
513        }
514
515        let key_str = key.into();
516        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
517
518        let mut payload = json!({
519            "api_key": self.options.api_key,
520            "distinct_id": distinct_id.into(),
521        });
522
523        // Add geoip disable parameter if configured
524        if self.options.disable_geoip {
525            payload["disable_geoip"] = json!(true);
526        }
527
528        let response = self
529            .client
530            .post(&flags_endpoint)
531            .header(CONTENT_TYPE, "application/json")
532            .json(&payload)
533            .timeout(Duration::from_secs(
534                self.options.feature_flags_request_timeout_seconds,
535            ))
536            .send()
537            .await
538            .map_err(|e| Error::Connection(e.to_string()))?;
539
540        if !response.status().is_success() {
541            return Ok(None);
542        }
543
544        let flags_response: FeatureFlagsResponse = response
545            .json()
546            .await
547            .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
548
549        let (_flags, payloads) = flags_response.normalize();
550        Ok(payloads.get(&key_str).cloned())
551    }
552
553    /// Evaluate a feature flag locally (requires feature flags to be loaded).
554    ///
555    /// `groups` and `group_properties` are only consulted when the flag (or one
556    /// of its conditions) targets a group; pass empty maps for person flags.
557    #[allow(clippy::too_many_arguments)]
558    pub fn evaluate_feature_flag_locally(
559        &self,
560        flag: &FeatureFlag,
561        distinct_id: &str,
562        person_properties: &HashMap<String, serde_json::Value>,
563        groups: &HashMap<String, String>,
564        group_properties: &HashMap<String, HashMap<String, serde_json::Value>>,
565    ) -> Result<FlagValue, Error> {
566        let group_type_mapping = self
567            .local_evaluator
568            .as_ref()
569            .map(|ev| ev.cache().get_group_type_mapping())
570            .unwrap_or_default();
571        match_feature_flag(
572            flag,
573            distinct_id,
574            person_properties,
575            groups,
576            group_properties,
577            &group_type_mapping,
578        )
579        .map_err(|e| Error::InconclusiveMatch(e.message))
580    }
581
582    /// Evaluate every feature flag for `distinct_id` in a single round-trip,
583    /// returning a [`FeatureFlagEvaluations`] snapshot.
584    ///
585    /// Each `is_enabled` / `get_flag` call on the returned snapshot fires a
586    /// dedup-aware `$feature_flag_called` event with full metadata, and the
587    /// snapshot can be passed to [`Event::with_flags`] so a downstream
588    /// [`Client::capture`] inherits `$feature/<key>` and `$active_feature_flags`
589    /// without an extra `/flags` request.
590    ///
591    /// [`Event::with_flags`]: crate::Event::with_flags
592    pub async fn evaluate_flags<S: Into<String>>(
593        &self,
594        distinct_id: S,
595        options: EvaluateFlagsOptions,
596    ) -> Result<FeatureFlagEvaluations, Error> {
597        let distinct_id: String = distinct_id.into();
598        let host = self.flag_event_host();
599
600        if distinct_id.is_empty() || self.options.is_disabled() {
601            return Ok(FeatureFlagEvaluations::empty(host));
602        }
603
604        let mut records: HashMap<String, EvaluatedFlagRecord> = HashMap::new();
605        let mut locally_evaluated_keys: HashSet<String> = HashSet::new();
606
607        if let Some(evaluator) = &self.local_evaluator {
608            let person_props_owned = options.person_properties.clone().unwrap_or_default();
609            let groups_owned = options.groups.clone().unwrap_or_default();
610            let group_props_owned = options.group_properties.clone().unwrap_or_default();
611            let local_results = evaluator.evaluate_all_flags(
612                &distinct_id,
613                &person_props_owned,
614                &groups_owned,
615                &group_props_owned,
616            );
617            for (key, result) in local_results {
618                if let Some(filter) = &options.flag_keys {
619                    if !filter.iter().any(|k| k == &key) {
620                        continue;
621                    }
622                }
623                if let Ok(value) = result {
624                    records.insert(key.clone(), local_record(value));
625                    locally_evaluated_keys.insert(key);
626                }
627            }
628        }
629
630        let mut request_id: Option<String> = None;
631        let mut errors_while_computing = false;
632        let mut quota_limited = false;
633
634        // Skip the remote round-trip when local evaluation has already covered
635        // every requested flag. Without `flag_keys` we have to assume the caller
636        // wants every flag the project has and still hit `/flags` to discover
637        // any not loaded by the poller.
638        let local_covers_request = options
639            .flag_keys
640            .as_ref()
641            .is_some_and(|keys| keys.iter().all(|k| locally_evaluated_keys.contains(k)));
642
643        if !options.only_evaluate_locally && !local_covers_request {
644            // Don't lose successful local evaluations if `/flags` fails — degrade
645            // to a snapshot built from the local results we already have. The
646            // alternative (returning Err) wastes useful data and surprises
647            // callers who would otherwise get partial coverage.
648            match self.fetch_flag_details(&distinct_id, &options).await {
649                Ok(response) => {
650                    request_id = response.request_id;
651                    errors_while_computing = response.errors_while_computing_flags;
652                    quota_limited = response.quota_limited;
653                    for (key, detail) in response.flags {
654                        if locally_evaluated_keys.contains(&key) {
655                            continue;
656                        }
657                        records.insert(key, remote_record_from_detail(detail));
658                    }
659                }
660                Err(e) => {
661                    if records.is_empty() {
662                        return Err(e);
663                    }
664                    debug!(
665                        error = e.to_string(),
666                        local_count = records.len(),
667                        "/flags fetch failed; returning snapshot from local results only"
668                    );
669                    errors_while_computing = true;
670                }
671            }
672        }
673
674        Ok(FeatureFlagEvaluations::new(
675            host,
676            distinct_id,
677            records,
678            options.groups.unwrap_or_default(),
679            options.disable_geoip,
680            request_id,
681            None,
682            errors_while_computing,
683            quota_limited,
684        ))
685    }
686
687    fn flag_event_host(&self) -> Arc<dyn FeatureFlagEvaluationsHost> {
688        self.flag_event_host
689            .get_or_init(|| {
690                Arc::new(AsyncFlagEventHost::from_options(
691                    &self.options,
692                    self.client.clone(),
693                )) as Arc<dyn FeatureFlagEvaluationsHost>
694            })
695            .clone()
696    }
697
698    async fn fetch_flag_details(
699        &self,
700        distinct_id: &str,
701        options: &EvaluateFlagsOptions,
702    ) -> Result<DetailedFlagsResponse, Error> {
703        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
704
705        let mut payload = json!({
706            "api_key": self.options.api_key,
707            "distinct_id": distinct_id,
708        });
709        if let Some(groups) = &options.groups {
710            payload["groups"] = json!(groups);
711        }
712        if let Some(person_properties) = &options.person_properties {
713            payload["person_properties"] = json!(person_properties);
714        }
715        if let Some(group_properties) = &options.group_properties {
716            payload["group_properties"] = json!(group_properties);
717        }
718        let effective_disable_geoip = options.disable_geoip.unwrap_or(self.options.disable_geoip);
719        if effective_disable_geoip {
720            payload["disable_geoip"] = json!(true);
721        }
722        if let Some(flag_keys) = &options.flag_keys {
723            payload["flag_keys_to_evaluate"] = json!(flag_keys);
724        }
725
726        let response = self
727            .client
728            .post(&flags_endpoint)
729            .header(CONTENT_TYPE, "application/json")
730            .json(&payload)
731            .timeout(Duration::from_secs(
732                self.options.feature_flags_request_timeout_seconds,
733            ))
734            .send()
735            .await
736            .map_err(|e| Error::Connection(e.to_string()))?;
737
738        if !response.status().is_success() {
739            let status = response.status();
740            let text = response
741                .text()
742                .await
743                .unwrap_or_else(|_| "Unknown error".to_string());
744            return Err(Error::Connection(format!(
745                "API request failed with status {status}: {text}"
746            )));
747        }
748
749        let parsed = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
750            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
751        })?;
752        Ok(extract_flag_details(parsed))
753    }
754}
755
756/// Normalised view of a `/flags?v=2` response surfacing the per-flag detail
757/// shape needed by the snapshot path.
758struct DetailedFlagsResponse {
759    flags: HashMap<String, FlagDetail>,
760    request_id: Option<String>,
761    errors_while_computing_flags: bool,
762    quota_limited: bool,
763}
764
765fn extract_flag_details(response: FeatureFlagsResponse) -> DetailedFlagsResponse {
766    match response {
767        FeatureFlagsResponse::V2 {
768            flags,
769            request_id,
770            errors_while_computing_flags,
771            quota_limited,
772        } => DetailedFlagsResponse {
773            flags,
774            request_id,
775            errors_while_computing_flags,
776            quota_limited,
777        },
778        FeatureFlagsResponse::Legacy {
779            feature_flags,
780            feature_flag_payloads,
781            errors,
782        } => {
783            let mut flags = HashMap::new();
784            for (key, value) in feature_flags {
785                let (enabled, variant) = match value {
786                    FlagValue::Boolean(b) => (b, None),
787                    FlagValue::String(s) => (true, Some(s)),
788                };
789                let payload = feature_flag_payloads.get(&key).cloned();
790                flags.insert(
791                    key.clone(),
792                    FlagDetail {
793                        key,
794                        enabled,
795                        variant,
796                        reason: None,
797                        metadata: payload.map(|payload| crate::feature_flags::FlagMetadata {
798                            id: 0,
799                            version: 0,
800                            description: None,
801                            payload: Some(payload),
802                        }),
803                    },
804                );
805            }
806            DetailedFlagsResponse {
807                flags,
808                request_id: None,
809                errors_while_computing_flags: errors.is_some_and(|e| !e.is_empty()),
810                quota_limited: false,
811            }
812        }
813    }
814}
815
816fn local_record(value: FlagValue) -> EvaluatedFlagRecord {
817    let (enabled, variant) = match value {
818        FlagValue::Boolean(b) => (b, None),
819        FlagValue::String(s) => (true, Some(s)),
820    };
821    EvaluatedFlagRecord {
822        enabled,
823        variant,
824        // Local definitions do not surface a payload through the poller today.
825        payload: None,
826        id: None,
827        version: None,
828        reason: Some("Evaluated locally".to_string()),
829        locally_evaluated: true,
830    }
831}
832
833fn remote_record_from_detail(detail: FlagDetail) -> EvaluatedFlagRecord {
834    let metadata = detail.metadata;
835    let reason = detail
836        .reason
837        .and_then(|r| r.description.or(Some(r.code)))
838        .filter(|s| !s.is_empty());
839    let id = metadata.as_ref().map(|m| m.id);
840    let version = metadata.as_ref().map(|m| m.version);
841    let payload = metadata.and_then(|m| m.payload).map(normalize_payload);
842    EvaluatedFlagRecord {
843        enabled: detail.enabled,
844        variant: detail.variant,
845        payload,
846        id,
847        version,
848        reason,
849        locally_evaluated: false,
850    }
851}
852
853/// `metadata.payload` from `/flags?v=2` is sometimes a JSON-encoded string
854/// (e.g. `"{\"color\":\"blue\"}"`) rather than already-parsed JSON. Try to
855/// parse a `String` payload as JSON and fall back to the raw string on
856/// failure so users can branch on a uniform [`serde_json::Value`].
857fn normalize_payload(payload: serde_json::Value) -> serde_json::Value {
858    match payload {
859        serde_json::Value::String(raw) => {
860            serde_json::from_str(&raw).unwrap_or(serde_json::Value::String(raw))
861        }
862        other => other,
863    }
864}