Skip to main content

posthog_rs/client/
async_client.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
6use serde_json::json;
7use tracing::{debug, instrument, trace, warn};
8
9use crate::endpoints::Endpoint;
10use crate::event::BatchRequest;
11use crate::feature_flag_evaluations::{
12    EvaluateFlagsOptions, EvaluatedFlagRecord, FeatureFlagEvaluations, FeatureFlagEvaluationsHost,
13    FlagCalledEventParams,
14};
15use crate::feature_flags::{
16    match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagDetail, FlagValue,
17};
18use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
19use crate::{event::InnerEvent, Error, Event};
20
21use super::ClientOptions;
22
23/// Cap on the number of `distinct_id` entries in the `$feature_flag_called`
24/// dedup cache. On overflow the entire map is reset (matches the JS SDK).
25const MAX_FLAG_CALLED_CACHE_SIZE: usize = 50_000;
26
27async fn check_response(response: reqwest::Response) -> Result<(), Error> {
28    let status = response.status().as_u16();
29    let body = response
30        .text()
31        .await
32        .unwrap_or_else(|_| "Unknown error".to_string());
33
34    match Error::from_http_response(status, body) {
35        Some(err) => Err(err),
36        None => Ok(()),
37    }
38}
39
40/// A [`Client`] facilitates interactions with the PostHog API over HTTP.
41pub struct Client {
42    options: ClientOptions,
43    client: HttpClient,
44    local_evaluator: Option<LocalEvaluator>,
45    _flag_poller: Option<AsyncFlagPoller>,
46    flag_event_host: OnceLock<Arc<dyn FeatureFlagEvaluationsHost>>,
47}
48
49/// Implementation of [`FeatureFlagEvaluationsHost`] that emits dedup-aware
50/// `$feature_flag_called` events through a clone of the async [`Client`]'s
51/// HTTP transport. The event ship is fire-and-forget: errors are logged at
52/// `debug` level but do not surface to the caller, matching the JS SDK.
53struct AsyncFlagEventHost {
54    http_client: HttpClient,
55    api_key: String,
56    capture_url: String,
57    disabled: bool,
58    disable_geoip: bool,
59    dedup_cache: Mutex<HashMap<String, HashSet<String>>>,
60    /// Tokio runtime handle captured at host construction (which always runs
61    /// inside the runtime that hosts `evaluate_flags`). This lets snapshot
62    /// access methods spawn `$feature_flag_called` shipping from any thread —
63    /// including ones without an entered runtime — by routing through the
64    /// captured handle instead of the free `tokio::spawn` (which would panic).
65    runtime: tokio::runtime::Handle,
66}
67
68impl AsyncFlagEventHost {
69    fn from_options(options: &ClientOptions, http_client: HttpClient) -> Self {
70        let capture_url = options.endpoints().build_url(Endpoint::Capture);
71        Self {
72            http_client,
73            api_key: options.api_key.clone(),
74            capture_url,
75            disabled: options.is_disabled(),
76            disable_geoip: options.disable_geoip,
77            dedup_cache: Mutex::new(HashMap::new()),
78            runtime: tokio::runtime::Handle::current(),
79        }
80    }
81
82    /// Returns `true` when the helper has already shipped this
83    /// `(distinct_id, key, response)` combination and the caller should skip.
84    fn already_reported(&self, distinct_id: &str, dedup_key: &str) -> bool {
85        let mut cache = self.dedup_cache.lock().unwrap_or_else(|p| p.into_inner());
86        if let Some(seen) = cache.get(distinct_id) {
87            if seen.contains(dedup_key) {
88                return true;
89            }
90        }
91        if cache.len() >= MAX_FLAG_CALLED_CACHE_SIZE {
92            cache.clear();
93        }
94        cache
95            .entry(distinct_id.to_string())
96            .or_default()
97            .insert(dedup_key.to_string());
98        false
99    }
100
101    fn spawn_ship(&self, event: Event) {
102        if self.disabled {
103            return;
104        }
105        let inner_event = InnerEvent::new(event, self.api_key.clone());
106        let payload = match serde_json::to_string(&inner_event) {
107            Ok(p) => p,
108            Err(e) => {
109                debug!(error = %e, "failed to serialize $feature_flag_called event");
110                return;
111            }
112        };
113        let http_client = self.http_client.clone();
114        let url = self.capture_url.clone();
115        self.runtime.spawn(async move {
116            let response = match http_client
117                .post(&url)
118                .header(CONTENT_TYPE, "application/json")
119                .body(payload)
120                .send()
121                .await
122            {
123                Ok(r) => r,
124                Err(send_err) => {
125                    let message = send_err.to_string();
126                    debug!("failed to send $feature_flag_called event: {message}");
127                    return;
128                }
129            };
130            if let Err(check_err) = check_response(response).await {
131                let message = check_err.to_string();
132                debug!("$feature_flag_called event rejected by server: {message}");
133            }
134        });
135    }
136}
137
138impl FeatureFlagEvaluationsHost for AsyncFlagEventHost {
139    fn capture_flag_called_event_if_needed(&self, params: FlagCalledEventParams) {
140        let dedup_key = build_dedup_key(&params.key, params.response.as_ref());
141        if self.already_reported(&params.distinct_id, &dedup_key) {
142            return;
143        }
144
145        let mut event = Event::new(
146            "$feature_flag_called".to_string(),
147            params.distinct_id.clone(),
148        );
149        for (k, v) in params.properties {
150            if event.insert_prop(k, v).is_err() {
151                return;
152            }
153        }
154        for (group_name, group_id) in &params.groups {
155            event.add_group(group_name, group_id);
156        }
157        if params.disable_geoip.unwrap_or(self.disable_geoip) {
158            let _ = event.insert_prop("$geoip_disable", true);
159        }
160        self.spawn_ship(event);
161    }
162
163    fn log_warning(&self, message: &str) {
164        // Surface filter-helper misuse via tracing — users can silence these
165        // with their tracing-subscriber level filter (e.g. `posthog_rs=error`).
166        warn!("{message}");
167    }
168}
169
170fn build_dedup_key(flag_key: &str, response: Option<&FlagValue>) -> String {
171    let response_repr = match response {
172        Some(FlagValue::Boolean(true)) => "true".to_string(),
173        Some(FlagValue::Boolean(false)) => "false".to_string(),
174        Some(FlagValue::String(s)) => s.clone(),
175        None => "::null::".to_string(),
176    };
177    format!("{flag_key}_{response_repr}")
178}
179
180/// This function constructs a new client using the options provided.
181pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
182    let options = options.into().sanitize();
183    let client = HttpClient::builder()
184        .timeout(Duration::from_secs(options.request_timeout_seconds))
185        .build()
186        .unwrap(); // Unwrap here is as safe as `HttpClient::new`
187
188    let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
189        if let Some(ref personal_key) = options.personal_api_key {
190            let cache = FlagCache::new();
191
192            let config = LocalEvaluationConfig {
193                personal_api_key: personal_key.clone(),
194                project_api_key: options.api_key.clone(),
195                api_host: options.endpoints().api_host(),
196                poll_interval: Duration::from_secs(options.poll_interval_seconds),
197                request_timeout: Duration::from_secs(options.request_timeout_seconds),
198            };
199
200            let mut poller = AsyncFlagPoller::new(config, cache.clone());
201            poller.start().await;
202
203            (Some(LocalEvaluator::new(cache)), Some(poller))
204        } else {
205            warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
206            (None, None)
207        }
208    } else {
209        (None, None)
210    };
211
212    Client {
213        options,
214        client,
215        local_evaluator,
216        _flag_poller: flag_poller,
217        flag_event_host: OnceLock::new(),
218    }
219}
220
221impl Client {
222    /// Capture the provided event, sending it to PostHog.
223    #[instrument(skip(self, event), level = "debug")]
224    pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
225        if self.options.is_disabled() {
226            trace!("Client is disabled, skipping capture");
227            return Ok(());
228        }
229
230        // Add geoip disable property if configured
231        if self.options.disable_geoip {
232            event.insert_prop("$geoip_disable", true).ok();
233        }
234
235        let inner_event = InnerEvent::new(event, self.options.api_key.clone());
236
237        let payload =
238            serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
239
240        let url = self.options.endpoints().build_url(Endpoint::Capture);
241
242        let response = self
243            .client
244            .post(&url)
245            .header(CONTENT_TYPE, "application/json")
246            .body(payload)
247            .send()
248            .await
249            .map_err(|e| Error::Connection(e.to_string()))?;
250
251        check_response(response).await
252    }
253
254    /// Capture a collection of events with a single request. Events are sent to
255    /// the `/batch/` endpoint. Set `historical_migration` to `true` to route
256    /// events to the historical ingestion topic, bypassing the main pipeline.
257    pub async fn capture_batch(
258        &self,
259        events: Vec<Event>,
260        historical_migration: bool,
261    ) -> Result<(), Error> {
262        if self.options.is_disabled() {
263            return Ok(());
264        }
265
266        let disable_geoip = self.options.disable_geoip;
267        let inner_events: Vec<InnerEvent> = events
268            .into_iter()
269            .map(|mut event| {
270                if disable_geoip {
271                    event.insert_prop("$geoip_disable", true).ok();
272                }
273                InnerEvent::new(event, self.options.api_key.clone())
274            })
275            .collect();
276
277        let batch_request = BatchRequest {
278            api_key: self.options.api_key.clone(),
279            historical_migration,
280            batch: inner_events,
281        };
282        let payload = serde_json::to_string(&batch_request)
283            .map_err(|e| Error::Serialization(e.to_string()))?;
284        let url = self.options.endpoints().build_url(Endpoint::Batch);
285
286        let response = self
287            .client
288            .post(&url)
289            .header(CONTENT_TYPE, "application/json")
290            .body(payload)
291            .send()
292            .await
293            .map_err(|e| Error::Connection(e.to_string()))?;
294
295        check_response(response).await
296    }
297
298    /// Get all feature flags for a user
299    #[must_use = "feature flags result should be used"]
300    pub async fn get_feature_flags<S: Into<String>>(
301        &self,
302        distinct_id: S,
303        groups: Option<HashMap<String, String>>,
304        person_properties: Option<HashMap<String, serde_json::Value>>,
305        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
306    ) -> Result<
307        (
308            HashMap<String, FlagValue>,
309            HashMap<String, serde_json::Value>,
310        ),
311        Error,
312    > {
313        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
314
315        let mut payload = json!({
316            "api_key": self.options.api_key,
317            "distinct_id": distinct_id.into(),
318        });
319
320        if let Some(groups) = groups {
321            payload["groups"] = json!(groups);
322        }
323
324        if let Some(person_properties) = person_properties {
325            payload["person_properties"] = json!(person_properties);
326        }
327
328        if let Some(group_properties) = group_properties {
329            payload["group_properties"] = json!(group_properties);
330        }
331
332        // Add geoip disable parameter if configured
333        if self.options.disable_geoip {
334            payload["disable_geoip"] = json!(true);
335        }
336
337        let response = self
338            .client
339            .post(&flags_endpoint)
340            .header(CONTENT_TYPE, "application/json")
341            .json(&payload)
342            .timeout(Duration::from_secs(
343                self.options.feature_flags_request_timeout_seconds,
344            ))
345            .send()
346            .await
347            .map_err(|e| Error::Connection(e.to_string()))?;
348
349        if !response.status().is_success() {
350            let status = response.status();
351            let text = response
352                .text()
353                .await
354                .unwrap_or_else(|_| "Unknown error".to_string());
355            return Err(Error::Connection(format!(
356                "API request failed with status {status}: {text}"
357            )));
358        }
359
360        let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
361            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
362        })?;
363
364        Ok(flags_response.normalize())
365    }
366
367    /// Get a specific feature flag value for a user.
368    #[must_use = "feature flag result should be used"]
369    #[instrument(skip_all, level = "debug")]
370    #[deprecated(
371        since = "0.6.0",
372        note = "Use Client::evaluate_flags() to fetch a snapshot, then call .get_flag(key) on it. \
373                The snapshot deduplicates $feature_flag_called events and supports attaching \
374                rich metadata to captured events via Event::with_flags()."
375    )]
376    pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
377        &self,
378        key: K,
379        distinct_id: D,
380        groups: Option<HashMap<String, String>>,
381        person_properties: Option<HashMap<String, serde_json::Value>>,
382        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
383    ) -> Result<Option<FlagValue>, Error> {
384        let key_str = key.into();
385        let distinct_id_str = distinct_id.into();
386
387        // Try local evaluation first if available
388        if let Some(ref evaluator) = self.local_evaluator {
389            let empty_props = HashMap::new();
390            let empty_groups: HashMap<String, String> = HashMap::new();
391            let empty_group_props: HashMap<String, HashMap<String, serde_json::Value>> =
392                HashMap::new();
393            let props = person_properties.as_ref().unwrap_or(&empty_props);
394            let groups_ref = groups.as_ref().unwrap_or(&empty_groups);
395            let group_props_ref = group_properties.as_ref().unwrap_or(&empty_group_props);
396            match evaluator.evaluate_flag(
397                &key_str,
398                &distinct_id_str,
399                props,
400                groups_ref,
401                group_props_ref,
402            ) {
403                Ok(Some(value)) => {
404                    debug!(flag = %key_str, ?value, "Flag evaluated locally");
405                    return Ok(Some(value));
406                }
407                Ok(None) => {
408                    if self.options.local_evaluation_only {
409                        debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
410                        return Ok(None);
411                    }
412                    debug!(flag = %key_str, "Flag not found locally, falling back to API");
413                }
414                Err(e) => {
415                    if self.options.local_evaluation_only {
416                        debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
417                        return Ok(None);
418                    }
419                    debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
420                }
421            }
422        }
423
424        // Fall back to API
425        trace!(flag = %key_str, "Fetching flag from API");
426        let (feature_flags, _payloads) = self
427            .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
428            .await?;
429        Ok(feature_flags.get(&key_str).cloned())
430    }
431
432    /// Check if a feature flag is enabled for a user.
433    #[must_use = "feature flag enabled check result should be used"]
434    #[deprecated(
435        since = "0.6.0",
436        note = "Use Client::evaluate_flags() to fetch a snapshot, then call .is_enabled(key) \
437                on it. The snapshot deduplicates $feature_flag_called events and supports \
438                attaching rich metadata to captured events via Event::with_flags()."
439    )]
440    #[allow(deprecated)] // calls deprecated get_feature_flag internally
441    pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
442        &self,
443        key: K,
444        distinct_id: D,
445        groups: Option<HashMap<String, String>>,
446        person_properties: Option<HashMap<String, serde_json::Value>>,
447        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
448    ) -> Result<bool, Error> {
449        let flag_value = self
450            .get_feature_flag(
451                key.into(),
452                distinct_id.into(),
453                groups,
454                person_properties,
455                group_properties,
456            )
457            .await?;
458        Ok(match flag_value {
459            Some(FlagValue::Boolean(b)) => b,
460            Some(FlagValue::String(_)) => true, // Variants are considered enabled
461            None => false,
462        })
463    }
464
465    /// Get a feature flag payload for a user.
466    #[must_use = "feature flag payload result should be used"]
467    #[deprecated(
468        since = "0.6.0",
469        note = "Use Client::evaluate_flags() to fetch a snapshot, then call \
470                .get_flag_payload(key) on it. Reading the payload from a snapshot is \
471                event-free, matching this method's behavior, and avoids the per-call \
472                /flags request."
473    )]
474    pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
475        &self,
476        key: K,
477        distinct_id: D,
478    ) -> Result<Option<serde_json::Value>, Error> {
479        let key_str = key.into();
480        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
481
482        let mut payload = json!({
483            "api_key": self.options.api_key,
484            "distinct_id": distinct_id.into(),
485        });
486
487        // Add geoip disable parameter if configured
488        if self.options.disable_geoip {
489            payload["disable_geoip"] = json!(true);
490        }
491
492        let response = self
493            .client
494            .post(&flags_endpoint)
495            .header(CONTENT_TYPE, "application/json")
496            .json(&payload)
497            .timeout(Duration::from_secs(
498                self.options.feature_flags_request_timeout_seconds,
499            ))
500            .send()
501            .await
502            .map_err(|e| Error::Connection(e.to_string()))?;
503
504        if !response.status().is_success() {
505            return Ok(None);
506        }
507
508        let flags_response: FeatureFlagsResponse = response
509            .json()
510            .await
511            .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
512
513        let (_flags, payloads) = flags_response.normalize();
514        Ok(payloads.get(&key_str).cloned())
515    }
516
517    /// Evaluate a feature flag locally (requires feature flags to be loaded).
518    ///
519    /// `groups` and `group_properties` are only consulted when the flag (or one
520    /// of its conditions) targets a group; pass empty maps for person flags.
521    #[allow(clippy::too_many_arguments)]
522    pub fn evaluate_feature_flag_locally(
523        &self,
524        flag: &FeatureFlag,
525        distinct_id: &str,
526        person_properties: &HashMap<String, serde_json::Value>,
527        groups: &HashMap<String, String>,
528        group_properties: &HashMap<String, HashMap<String, serde_json::Value>>,
529    ) -> Result<FlagValue, Error> {
530        let group_type_mapping = self
531            .local_evaluator
532            .as_ref()
533            .map(|ev| ev.cache().get_group_type_mapping())
534            .unwrap_or_default();
535        match_feature_flag(
536            flag,
537            distinct_id,
538            person_properties,
539            groups,
540            group_properties,
541            &group_type_mapping,
542        )
543        .map_err(|e| Error::InconclusiveMatch(e.message))
544    }
545
546    /// Evaluate every feature flag for `distinct_id` in a single round-trip,
547    /// returning a [`FeatureFlagEvaluations`] snapshot.
548    ///
549    /// Each `is_enabled` / `get_flag` call on the returned snapshot fires a
550    /// dedup-aware `$feature_flag_called` event with full metadata, and the
551    /// snapshot can be passed to [`Event::with_flags`] so a downstream
552    /// [`Client::capture`] inherits `$feature/<key>` and `$active_feature_flags`
553    /// without an extra `/flags` request.
554    ///
555    /// [`Event::with_flags`]: crate::Event::with_flags
556    pub async fn evaluate_flags<S: Into<String>>(
557        &self,
558        distinct_id: S,
559        options: EvaluateFlagsOptions,
560    ) -> Result<FeatureFlagEvaluations, Error> {
561        let distinct_id: String = distinct_id.into();
562        let host = self.flag_event_host();
563
564        if distinct_id.is_empty() || self.options.is_disabled() {
565            return Ok(FeatureFlagEvaluations::empty(host));
566        }
567
568        let mut records: HashMap<String, EvaluatedFlagRecord> = HashMap::new();
569        let mut locally_evaluated_keys: HashSet<String> = HashSet::new();
570
571        if let Some(evaluator) = &self.local_evaluator {
572            let person_props_owned = options.person_properties.clone().unwrap_or_default();
573            let groups_owned = options.groups.clone().unwrap_or_default();
574            let group_props_owned = options.group_properties.clone().unwrap_or_default();
575            let local_results = evaluator.evaluate_all_flags(
576                &distinct_id,
577                &person_props_owned,
578                &groups_owned,
579                &group_props_owned,
580            );
581            for (key, result) in local_results {
582                if let Some(filter) = &options.flag_keys {
583                    if !filter.iter().any(|k| k == &key) {
584                        continue;
585                    }
586                }
587                if let Ok(value) = result {
588                    records.insert(key.clone(), local_record(value));
589                    locally_evaluated_keys.insert(key);
590                }
591            }
592        }
593
594        let mut request_id: Option<String> = None;
595        let mut errors_while_computing = false;
596        let mut quota_limited = false;
597
598        // Skip the remote round-trip when local evaluation has already covered
599        // every requested flag. Without `flag_keys` we have to assume the caller
600        // wants every flag the project has and still hit `/flags` to discover
601        // any not loaded by the poller.
602        let local_covers_request = options
603            .flag_keys
604            .as_ref()
605            .is_some_and(|keys| keys.iter().all(|k| locally_evaluated_keys.contains(k)));
606
607        if !options.only_evaluate_locally && !local_covers_request {
608            // Don't lose successful local evaluations if `/flags` fails — degrade
609            // to a snapshot built from the local results we already have. The
610            // alternative (returning Err) wastes useful data and surprises
611            // callers who would otherwise get partial coverage.
612            match self.fetch_flag_details(&distinct_id, &options).await {
613                Ok(response) => {
614                    request_id = response.request_id;
615                    errors_while_computing = response.errors_while_computing_flags;
616                    quota_limited = response.quota_limited;
617                    for (key, detail) in response.flags {
618                        if locally_evaluated_keys.contains(&key) {
619                            continue;
620                        }
621                        records.insert(key, remote_record_from_detail(detail));
622                    }
623                }
624                Err(e) => {
625                    if records.is_empty() {
626                        return Err(e);
627                    }
628                    debug!(
629                        error = e.to_string(),
630                        local_count = records.len(),
631                        "/flags fetch failed; returning snapshot from local results only"
632                    );
633                    errors_while_computing = true;
634                }
635            }
636        }
637
638        Ok(FeatureFlagEvaluations::new(
639            host,
640            distinct_id,
641            records,
642            options.groups.unwrap_or_default(),
643            options.disable_geoip,
644            request_id,
645            None,
646            errors_while_computing,
647            quota_limited,
648        ))
649    }
650
651    fn flag_event_host(&self) -> Arc<dyn FeatureFlagEvaluationsHost> {
652        self.flag_event_host
653            .get_or_init(|| {
654                Arc::new(AsyncFlagEventHost::from_options(
655                    &self.options,
656                    self.client.clone(),
657                )) as Arc<dyn FeatureFlagEvaluationsHost>
658            })
659            .clone()
660    }
661
662    async fn fetch_flag_details(
663        &self,
664        distinct_id: &str,
665        options: &EvaluateFlagsOptions,
666    ) -> Result<DetailedFlagsResponse, Error> {
667        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
668
669        let mut payload = json!({
670            "api_key": self.options.api_key,
671            "distinct_id": distinct_id,
672        });
673        if let Some(groups) = &options.groups {
674            payload["groups"] = json!(groups);
675        }
676        if let Some(person_properties) = &options.person_properties {
677            payload["person_properties"] = json!(person_properties);
678        }
679        if let Some(group_properties) = &options.group_properties {
680            payload["group_properties"] = json!(group_properties);
681        }
682        let effective_disable_geoip = options.disable_geoip.unwrap_or(self.options.disable_geoip);
683        if effective_disable_geoip {
684            payload["disable_geoip"] = json!(true);
685        }
686        if let Some(flag_keys) = &options.flag_keys {
687            payload["flag_keys_to_evaluate"] = json!(flag_keys);
688        }
689
690        let response = self
691            .client
692            .post(&flags_endpoint)
693            .header(CONTENT_TYPE, "application/json")
694            .json(&payload)
695            .timeout(Duration::from_secs(
696                self.options.feature_flags_request_timeout_seconds,
697            ))
698            .send()
699            .await
700            .map_err(|e| Error::Connection(e.to_string()))?;
701
702        if !response.status().is_success() {
703            let status = response.status();
704            let text = response
705                .text()
706                .await
707                .unwrap_or_else(|_| "Unknown error".to_string());
708            return Err(Error::Connection(format!(
709                "API request failed with status {status}: {text}"
710            )));
711        }
712
713        let parsed = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
714            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
715        })?;
716        Ok(extract_flag_details(parsed))
717    }
718}
719
720/// Normalised view of a `/flags?v=2` response surfacing the per-flag detail
721/// shape needed by the snapshot path.
722struct DetailedFlagsResponse {
723    flags: HashMap<String, FlagDetail>,
724    request_id: Option<String>,
725    errors_while_computing_flags: bool,
726    quota_limited: bool,
727}
728
729fn extract_flag_details(response: FeatureFlagsResponse) -> DetailedFlagsResponse {
730    match response {
731        FeatureFlagsResponse::V2 {
732            flags,
733            request_id,
734            errors_while_computing_flags,
735            quota_limited,
736        } => DetailedFlagsResponse {
737            flags,
738            request_id,
739            errors_while_computing_flags,
740            quota_limited,
741        },
742        FeatureFlagsResponse::Legacy {
743            feature_flags,
744            feature_flag_payloads,
745            errors,
746        } => {
747            let mut flags = HashMap::new();
748            for (key, value) in feature_flags {
749                let (enabled, variant) = match value {
750                    FlagValue::Boolean(b) => (b, None),
751                    FlagValue::String(s) => (true, Some(s)),
752                };
753                let payload = feature_flag_payloads.get(&key).cloned();
754                flags.insert(
755                    key.clone(),
756                    FlagDetail {
757                        key,
758                        enabled,
759                        variant,
760                        reason: None,
761                        metadata: payload.map(|payload| crate::feature_flags::FlagMetadata {
762                            id: 0,
763                            version: 0,
764                            description: None,
765                            payload: Some(payload),
766                        }),
767                    },
768                );
769            }
770            DetailedFlagsResponse {
771                flags,
772                request_id: None,
773                errors_while_computing_flags: errors.is_some_and(|e| !e.is_empty()),
774                quota_limited: false,
775            }
776        }
777    }
778}
779
780fn local_record(value: FlagValue) -> EvaluatedFlagRecord {
781    let (enabled, variant) = match value {
782        FlagValue::Boolean(b) => (b, None),
783        FlagValue::String(s) => (true, Some(s)),
784    };
785    EvaluatedFlagRecord {
786        enabled,
787        variant,
788        // Local definitions do not surface a payload through the poller today.
789        payload: None,
790        id: None,
791        version: None,
792        reason: Some("Evaluated locally".to_string()),
793        locally_evaluated: true,
794    }
795}
796
797fn remote_record_from_detail(detail: FlagDetail) -> EvaluatedFlagRecord {
798    let metadata = detail.metadata;
799    let reason = detail
800        .reason
801        .and_then(|r| r.description.or(Some(r.code)))
802        .filter(|s| !s.is_empty());
803    let id = metadata.as_ref().map(|m| m.id);
804    let version = metadata.as_ref().map(|m| m.version);
805    let payload = metadata.and_then(|m| m.payload).map(normalize_payload);
806    EvaluatedFlagRecord {
807        enabled: detail.enabled,
808        variant: detail.variant,
809        payload,
810        id,
811        version,
812        reason,
813        locally_evaluated: false,
814    }
815}
816
817/// `metadata.payload` from `/flags?v=2` is sometimes a JSON-encoded string
818/// (e.g. `"{\"color\":\"blue\"}"`) rather than already-parsed JSON. Try to
819/// parse a `String` payload as JSON and fall back to the raw string on
820/// failure so users can branch on a uniform [`serde_json::Value`].
821fn normalize_payload(payload: serde_json::Value) -> serde_json::Value {
822    match payload {
823        serde_json::Value::String(raw) => {
824            serde_json::from_str(&raw).unwrap_or(serde_json::Value::String(raw))
825        }
826        other => other,
827    }
828}