Skip to main content

posthog_rs/client/
async_client.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::Duration;
4
5use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
6use serde_json::json;
7use tracing::{debug, instrument, trace, warn};
8
9use crate::endpoints::Endpoint;
10use crate::event::BatchRequest;
11use crate::feature_flag_evaluations::{
12    EvaluateFlagsOptions, EvaluatedFlagRecord, FeatureFlagEvaluations, FeatureFlagEvaluationsHost,
13    FlagCalledEventParams,
14};
15use crate::feature_flags::{
16    match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagDetail, FlagValue,
17};
18use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
19use crate::{event::InnerEvent, Error, Event};
20
21use super::ClientOptions;
22
23/// Cap on the number of `distinct_id` entries in the `$feature_flag_called`
24/// dedup cache. On overflow the entire map is reset (matches the JS SDK).
25const MAX_FLAG_CALLED_CACHE_SIZE: usize = 50_000;
26
27async fn check_response(response: reqwest::Response) -> Result<(), Error> {
28    let status = response.status().as_u16();
29    let body = response
30        .text()
31        .await
32        .unwrap_or_else(|_| "Unknown error".to_string());
33
34    match Error::from_http_response(status, body) {
35        Some(err) => Err(err),
36        None => Ok(()),
37    }
38}
39
40/// A [`Client`] facilitates interactions with the PostHog API over HTTP.
41pub struct Client {
42    options: ClientOptions,
43    client: HttpClient,
44    local_evaluator: Option<LocalEvaluator>,
45    _flag_poller: Option<AsyncFlagPoller>,
46    flag_event_host: OnceLock<Arc<dyn FeatureFlagEvaluationsHost>>,
47}
48
49/// Implementation of [`FeatureFlagEvaluationsHost`] that emits dedup-aware
50/// `$feature_flag_called` events through a clone of the async [`Client`]'s
51/// HTTP transport. The event ship is fire-and-forget: errors are logged at
52/// `debug` level but do not surface to the caller, matching the JS SDK.
53struct AsyncFlagEventHost {
54    http_client: HttpClient,
55    api_key: String,
56    capture_url: String,
57    disabled: bool,
58    disable_geoip: bool,
59    dedup_cache: Mutex<HashMap<String, HashSet<String>>>,
60    /// Tokio runtime handle captured at host construction (which always runs
61    /// inside the runtime that hosts `evaluate_flags`). This lets snapshot
62    /// access methods spawn `$feature_flag_called` shipping from any thread —
63    /// including ones without an entered runtime — by routing through the
64    /// captured handle instead of the free `tokio::spawn` (which would panic).
65    runtime: tokio::runtime::Handle,
66}
67
68impl AsyncFlagEventHost {
69    fn from_options(options: &ClientOptions, http_client: HttpClient) -> Self {
70        let capture_url = options.endpoints().build_url(Endpoint::Capture);
71        Self {
72            http_client,
73            api_key: options.api_key.clone(),
74            capture_url,
75            disabled: options.is_disabled(),
76            disable_geoip: options.disable_geoip,
77            dedup_cache: Mutex::new(HashMap::new()),
78            runtime: tokio::runtime::Handle::current(),
79        }
80    }
81
82    /// Returns `true` when the helper has already shipped this
83    /// `(distinct_id, key, response)` combination and the caller should skip.
84    fn already_reported(&self, distinct_id: &str, dedup_key: &str) -> bool {
85        let mut cache = self.dedup_cache.lock().unwrap_or_else(|p| p.into_inner());
86        if let Some(seen) = cache.get(distinct_id) {
87            if seen.contains(dedup_key) {
88                return true;
89            }
90        }
91        if cache.len() >= MAX_FLAG_CALLED_CACHE_SIZE {
92            cache.clear();
93        }
94        cache
95            .entry(distinct_id.to_string())
96            .or_default()
97            .insert(dedup_key.to_string());
98        false
99    }
100
101    fn spawn_ship(&self, event: Event) {
102        if self.disabled {
103            return;
104        }
105        let inner_event = InnerEvent::new(event, self.api_key.clone());
106        let payload = match serde_json::to_string(&inner_event) {
107            Ok(p) => p,
108            Err(e) => {
109                debug!(error = %e, "failed to serialize $feature_flag_called event");
110                return;
111            }
112        };
113        let http_client = self.http_client.clone();
114        let url = self.capture_url.clone();
115        self.runtime.spawn(async move {
116            let response = match http_client
117                .post(&url)
118                .header(CONTENT_TYPE, "application/json")
119                .body(payload)
120                .send()
121                .await
122            {
123                Ok(r) => r,
124                Err(send_err) => {
125                    let message = send_err.to_string();
126                    debug!("failed to send $feature_flag_called event: {message}");
127                    return;
128                }
129            };
130            if let Err(check_err) = check_response(response).await {
131                let message = check_err.to_string();
132                debug!("$feature_flag_called event rejected by server: {message}");
133            }
134        });
135    }
136}
137
138impl FeatureFlagEvaluationsHost for AsyncFlagEventHost {
139    fn capture_flag_called_event_if_needed(&self, params: FlagCalledEventParams) {
140        let dedup_key = build_dedup_key(&params.key, params.response.as_ref());
141        if self.already_reported(&params.distinct_id, &dedup_key) {
142            return;
143        }
144
145        let mut event = Event::new(
146            "$feature_flag_called".to_string(),
147            params.distinct_id.clone(),
148        );
149        for (k, v) in params.properties {
150            if event.insert_prop(k, v).is_err() {
151                return;
152            }
153        }
154        for (group_name, group_id) in &params.groups {
155            event.add_group(group_name, group_id);
156        }
157        if params.disable_geoip.unwrap_or(self.disable_geoip) {
158            let _ = event.insert_prop("$geoip_disable", true);
159        }
160        self.spawn_ship(event);
161    }
162
163    fn log_warning(&self, message: &str) {
164        // Surface filter-helper misuse via tracing — users can silence these
165        // with their tracing-subscriber level filter (e.g. `posthog_rs=error`).
166        warn!("{message}");
167    }
168}
169
170fn build_dedup_key(flag_key: &str, response: Option<&FlagValue>) -> String {
171    let response_repr = match response {
172        Some(FlagValue::Boolean(true)) => "true".to_string(),
173        Some(FlagValue::Boolean(false)) => "false".to_string(),
174        Some(FlagValue::String(s)) => s.clone(),
175        None => "::null::".to_string(),
176    };
177    format!("{flag_key}_{response_repr}")
178}
179
180/// This function constructs a new client using the options provided.
181pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
182    let options = options.into().sanitize();
183    let client = HttpClient::builder()
184        .timeout(Duration::from_secs(options.request_timeout_seconds))
185        .build()
186        .unwrap(); // Unwrap here is as safe as `HttpClient::new`
187
188    let (local_evaluator, flag_poller) = if options.enable_local_evaluation
189        && !options.is_disabled()
190    {
191        if let Some(ref personal_key) = options.personal_api_key {
192            let cache = FlagCache::new();
193
194            let config = LocalEvaluationConfig {
195                personal_api_key: personal_key.clone(),
196                project_api_key: options.api_key.clone(),
197                api_host: options.endpoints().api_host(),
198                poll_interval: Duration::from_secs(options.poll_interval_seconds),
199                request_timeout: Duration::from_secs(options.request_timeout_seconds),
200            };
201
202            let mut poller = AsyncFlagPoller::new(config, cache.clone());
203            poller.start().await;
204
205            (Some(LocalEvaluator::new(cache)), Some(poller))
206        } else {
207            warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
208            (None, None)
209        }
210    } else {
211        (None, None)
212    };
213
214    Client {
215        options,
216        client,
217        local_evaluator,
218        _flag_poller: flag_poller,
219        flag_event_host: OnceLock::new(),
220    }
221}
222
223impl Client {
224    /// Capture the provided event, sending it to PostHog.
225    #[instrument(skip(self, event), level = "debug")]
226    pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
227        if self.options.is_disabled() {
228            trace!("Client is disabled, skipping capture");
229            return Ok(());
230        }
231
232        // Add geoip disable property if configured
233        if self.options.disable_geoip {
234            event.insert_prop("$geoip_disable", true).ok();
235        }
236
237        let inner_event = InnerEvent::new(event, self.options.api_key.clone());
238
239        let payload =
240            serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
241
242        let url = self.options.endpoints().build_url(Endpoint::Capture);
243
244        let response = self
245            .client
246            .post(&url)
247            .header(CONTENT_TYPE, "application/json")
248            .body(payload)
249            .send()
250            .await
251            .map_err(|e| Error::Connection(e.to_string()))?;
252
253        check_response(response).await
254    }
255
256    /// Capture a collection of events with a single request. Events are sent to
257    /// the `/batch/` endpoint. Set `historical_migration` to `true` to route
258    /// events to the historical ingestion topic, bypassing the main pipeline.
259    pub async fn capture_batch(
260        &self,
261        events: Vec<Event>,
262        historical_migration: bool,
263    ) -> Result<(), Error> {
264        if self.options.is_disabled() {
265            return Ok(());
266        }
267
268        let disable_geoip = self.options.disable_geoip;
269        let inner_events: Vec<InnerEvent> = events
270            .into_iter()
271            .map(|mut event| {
272                if disable_geoip {
273                    event.insert_prop("$geoip_disable", true).ok();
274                }
275                InnerEvent::new(event, self.options.api_key.clone())
276            })
277            .collect();
278
279        let batch_request = BatchRequest {
280            api_key: self.options.api_key.clone(),
281            historical_migration,
282            batch: inner_events,
283        };
284        let payload = serde_json::to_string(&batch_request)
285            .map_err(|e| Error::Serialization(e.to_string()))?;
286        let url = self.options.endpoints().build_url(Endpoint::Batch);
287
288        let response = self
289            .client
290            .post(&url)
291            .header(CONTENT_TYPE, "application/json")
292            .body(payload)
293            .send()
294            .await
295            .map_err(|e| Error::Connection(e.to_string()))?;
296
297        check_response(response).await
298    }
299
300    /// Get all feature flags for a user
301    #[must_use = "feature flags result should be used"]
302    pub async fn get_feature_flags<S: Into<String>>(
303        &self,
304        distinct_id: S,
305        groups: Option<HashMap<String, String>>,
306        person_properties: Option<HashMap<String, serde_json::Value>>,
307        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
308    ) -> Result<
309        (
310            HashMap<String, FlagValue>,
311            HashMap<String, serde_json::Value>,
312        ),
313        Error,
314    > {
315        if self.options.is_disabled() {
316            trace!("Client is disabled, skipping feature flags request");
317            return Ok((HashMap::new(), HashMap::new()));
318        }
319
320        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
321
322        let mut payload = json!({
323            "api_key": self.options.api_key,
324            "distinct_id": distinct_id.into(),
325        });
326
327        if let Some(groups) = groups {
328            payload["groups"] = json!(groups);
329        }
330
331        if let Some(person_properties) = person_properties {
332            payload["person_properties"] = json!(person_properties);
333        }
334
335        if let Some(group_properties) = group_properties {
336            payload["group_properties"] = json!(group_properties);
337        }
338
339        // Add geoip disable parameter if configured
340        if self.options.disable_geoip {
341            payload["disable_geoip"] = json!(true);
342        }
343
344        let response = self
345            .client
346            .post(&flags_endpoint)
347            .header(CONTENT_TYPE, "application/json")
348            .json(&payload)
349            .timeout(Duration::from_secs(
350                self.options.feature_flags_request_timeout_seconds,
351            ))
352            .send()
353            .await
354            .map_err(|e| Error::Connection(e.to_string()))?;
355
356        if !response.status().is_success() {
357            let status = response.status();
358            let text = response
359                .text()
360                .await
361                .unwrap_or_else(|_| "Unknown error".to_string());
362            return Err(Error::Connection(format!(
363                "API request failed with status {status}: {text}"
364            )));
365        }
366
367        let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
368            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
369        })?;
370
371        Ok(flags_response.normalize())
372    }
373
374    /// Get a specific feature flag value for a user.
375    #[must_use = "feature flag result should be used"]
376    #[instrument(skip_all, level = "debug")]
377    #[deprecated(
378        since = "0.6.0",
379        note = "Use Client::evaluate_flags() to fetch a snapshot, then call .get_flag(key) on it. \
380                The snapshot deduplicates $feature_flag_called events and supports attaching \
381                rich metadata to captured events via Event::with_flags()."
382    )]
383    pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
384        &self,
385        key: K,
386        distinct_id: D,
387        groups: Option<HashMap<String, String>>,
388        person_properties: Option<HashMap<String, serde_json::Value>>,
389        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
390    ) -> Result<Option<FlagValue>, Error> {
391        let key_str = key.into();
392        let distinct_id_str = distinct_id.into();
393
394        // Try local evaluation first if available
395        if let Some(ref evaluator) = self.local_evaluator {
396            let empty_props = HashMap::new();
397            let empty_groups: HashMap<String, String> = HashMap::new();
398            let empty_group_props: HashMap<String, HashMap<String, serde_json::Value>> =
399                HashMap::new();
400            let props = person_properties.as_ref().unwrap_or(&empty_props);
401            let groups_ref = groups.as_ref().unwrap_or(&empty_groups);
402            let group_props_ref = group_properties.as_ref().unwrap_or(&empty_group_props);
403            match evaluator.evaluate_flag(
404                &key_str,
405                &distinct_id_str,
406                props,
407                groups_ref,
408                group_props_ref,
409            ) {
410                Ok(Some(value)) => {
411                    debug!(flag = %key_str, ?value, "Flag evaluated locally");
412                    return Ok(Some(value));
413                }
414                Ok(None) => {
415                    if self.options.local_evaluation_only {
416                        debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
417                        return Ok(None);
418                    }
419                    debug!(flag = %key_str, "Flag not found locally, falling back to API");
420                }
421                Err(e) => {
422                    if self.options.local_evaluation_only {
423                        debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
424                        return Ok(None);
425                    }
426                    debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
427                }
428            }
429        }
430
431        // Fall back to API
432        trace!(flag = %key_str, "Fetching flag from API");
433        let (feature_flags, _payloads) = self
434            .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
435            .await?;
436        Ok(feature_flags.get(&key_str).cloned())
437    }
438
439    /// Check if a feature flag is enabled for a user.
440    #[must_use = "feature flag enabled check result should be used"]
441    #[deprecated(
442        since = "0.6.0",
443        note = "Use Client::evaluate_flags() to fetch a snapshot, then call .is_enabled(key) \
444                on it. The snapshot deduplicates $feature_flag_called events and supports \
445                attaching rich metadata to captured events via Event::with_flags()."
446    )]
447    #[allow(deprecated)] // calls deprecated get_feature_flag internally
448    pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
449        &self,
450        key: K,
451        distinct_id: D,
452        groups: Option<HashMap<String, String>>,
453        person_properties: Option<HashMap<String, serde_json::Value>>,
454        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
455    ) -> Result<bool, Error> {
456        let flag_value = self
457            .get_feature_flag(
458                key.into(),
459                distinct_id.into(),
460                groups,
461                person_properties,
462                group_properties,
463            )
464            .await?;
465        Ok(match flag_value {
466            Some(FlagValue::Boolean(b)) => b,
467            Some(FlagValue::String(_)) => true, // Variants are considered enabled
468            None => false,
469        })
470    }
471
472    /// Get a feature flag payload for a user.
473    #[must_use = "feature flag payload result should be used"]
474    #[deprecated(
475        since = "0.6.0",
476        note = "Use Client::evaluate_flags() to fetch a snapshot, then call \
477                .get_flag_payload(key) on it. Reading the payload from a snapshot is \
478                event-free, matching this method's behavior, and avoids the per-call \
479                /flags request."
480    )]
481    pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
482        &self,
483        key: K,
484        distinct_id: D,
485    ) -> Result<Option<serde_json::Value>, Error> {
486        if self.options.is_disabled() {
487            trace!("Client is disabled, skipping feature flag payload request");
488            return Ok(None);
489        }
490
491        let key_str = key.into();
492        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
493
494        let mut payload = json!({
495            "api_key": self.options.api_key,
496            "distinct_id": distinct_id.into(),
497        });
498
499        // Add geoip disable parameter if configured
500        if self.options.disable_geoip {
501            payload["disable_geoip"] = json!(true);
502        }
503
504        let response = self
505            .client
506            .post(&flags_endpoint)
507            .header(CONTENT_TYPE, "application/json")
508            .json(&payload)
509            .timeout(Duration::from_secs(
510                self.options.feature_flags_request_timeout_seconds,
511            ))
512            .send()
513            .await
514            .map_err(|e| Error::Connection(e.to_string()))?;
515
516        if !response.status().is_success() {
517            return Ok(None);
518        }
519
520        let flags_response: FeatureFlagsResponse = response
521            .json()
522            .await
523            .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
524
525        let (_flags, payloads) = flags_response.normalize();
526        Ok(payloads.get(&key_str).cloned())
527    }
528
529    /// Evaluate a feature flag locally (requires feature flags to be loaded).
530    ///
531    /// `groups` and `group_properties` are only consulted when the flag (or one
532    /// of its conditions) targets a group; pass empty maps for person flags.
533    #[allow(clippy::too_many_arguments)]
534    pub fn evaluate_feature_flag_locally(
535        &self,
536        flag: &FeatureFlag,
537        distinct_id: &str,
538        person_properties: &HashMap<String, serde_json::Value>,
539        groups: &HashMap<String, String>,
540        group_properties: &HashMap<String, HashMap<String, serde_json::Value>>,
541    ) -> Result<FlagValue, Error> {
542        let group_type_mapping = self
543            .local_evaluator
544            .as_ref()
545            .map(|ev| ev.cache().get_group_type_mapping())
546            .unwrap_or_default();
547        match_feature_flag(
548            flag,
549            distinct_id,
550            person_properties,
551            groups,
552            group_properties,
553            &group_type_mapping,
554        )
555        .map_err(|e| Error::InconclusiveMatch(e.message))
556    }
557
558    /// Evaluate every feature flag for `distinct_id` in a single round-trip,
559    /// returning a [`FeatureFlagEvaluations`] snapshot.
560    ///
561    /// Each `is_enabled` / `get_flag` call on the returned snapshot fires a
562    /// dedup-aware `$feature_flag_called` event with full metadata, and the
563    /// snapshot can be passed to [`Event::with_flags`] so a downstream
564    /// [`Client::capture`] inherits `$feature/<key>` and `$active_feature_flags`
565    /// without an extra `/flags` request.
566    ///
567    /// [`Event::with_flags`]: crate::Event::with_flags
568    pub async fn evaluate_flags<S: Into<String>>(
569        &self,
570        distinct_id: S,
571        options: EvaluateFlagsOptions,
572    ) -> Result<FeatureFlagEvaluations, Error> {
573        let distinct_id: String = distinct_id.into();
574        let host = self.flag_event_host();
575
576        if distinct_id.is_empty() || self.options.is_disabled() {
577            return Ok(FeatureFlagEvaluations::empty(host));
578        }
579
580        let mut records: HashMap<String, EvaluatedFlagRecord> = HashMap::new();
581        let mut locally_evaluated_keys: HashSet<String> = HashSet::new();
582
583        if let Some(evaluator) = &self.local_evaluator {
584            let person_props_owned = options.person_properties.clone().unwrap_or_default();
585            let groups_owned = options.groups.clone().unwrap_or_default();
586            let group_props_owned = options.group_properties.clone().unwrap_or_default();
587            let local_results = evaluator.evaluate_all_flags(
588                &distinct_id,
589                &person_props_owned,
590                &groups_owned,
591                &group_props_owned,
592            );
593            for (key, result) in local_results {
594                if let Some(filter) = &options.flag_keys {
595                    if !filter.iter().any(|k| k == &key) {
596                        continue;
597                    }
598                }
599                if let Ok(value) = result {
600                    records.insert(key.clone(), local_record(value));
601                    locally_evaluated_keys.insert(key);
602                }
603            }
604        }
605
606        let mut request_id: Option<String> = None;
607        let mut errors_while_computing = false;
608        let mut quota_limited = false;
609
610        // Skip the remote round-trip when local evaluation has already covered
611        // every requested flag. Without `flag_keys` we have to assume the caller
612        // wants every flag the project has and still hit `/flags` to discover
613        // any not loaded by the poller.
614        let local_covers_request = options
615            .flag_keys
616            .as_ref()
617            .is_some_and(|keys| keys.iter().all(|k| locally_evaluated_keys.contains(k)));
618
619        if !options.only_evaluate_locally && !local_covers_request {
620            // Don't lose successful local evaluations if `/flags` fails — degrade
621            // to a snapshot built from the local results we already have. The
622            // alternative (returning Err) wastes useful data and surprises
623            // callers who would otherwise get partial coverage.
624            match self.fetch_flag_details(&distinct_id, &options).await {
625                Ok(response) => {
626                    request_id = response.request_id;
627                    errors_while_computing = response.errors_while_computing_flags;
628                    quota_limited = response.quota_limited;
629                    for (key, detail) in response.flags {
630                        if locally_evaluated_keys.contains(&key) {
631                            continue;
632                        }
633                        records.insert(key, remote_record_from_detail(detail));
634                    }
635                }
636                Err(e) => {
637                    if records.is_empty() {
638                        return Err(e);
639                    }
640                    debug!(
641                        error = e.to_string(),
642                        local_count = records.len(),
643                        "/flags fetch failed; returning snapshot from local results only"
644                    );
645                    errors_while_computing = true;
646                }
647            }
648        }
649
650        Ok(FeatureFlagEvaluations::new(
651            host,
652            distinct_id,
653            records,
654            options.groups.unwrap_or_default(),
655            options.disable_geoip,
656            request_id,
657            None,
658            errors_while_computing,
659            quota_limited,
660        ))
661    }
662
663    fn flag_event_host(&self) -> Arc<dyn FeatureFlagEvaluationsHost> {
664        self.flag_event_host
665            .get_or_init(|| {
666                Arc::new(AsyncFlagEventHost::from_options(
667                    &self.options,
668                    self.client.clone(),
669                )) as Arc<dyn FeatureFlagEvaluationsHost>
670            })
671            .clone()
672    }
673
674    async fn fetch_flag_details(
675        &self,
676        distinct_id: &str,
677        options: &EvaluateFlagsOptions,
678    ) -> Result<DetailedFlagsResponse, Error> {
679        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
680
681        let mut payload = json!({
682            "api_key": self.options.api_key,
683            "distinct_id": distinct_id,
684        });
685        if let Some(groups) = &options.groups {
686            payload["groups"] = json!(groups);
687        }
688        if let Some(person_properties) = &options.person_properties {
689            payload["person_properties"] = json!(person_properties);
690        }
691        if let Some(group_properties) = &options.group_properties {
692            payload["group_properties"] = json!(group_properties);
693        }
694        let effective_disable_geoip = options.disable_geoip.unwrap_or(self.options.disable_geoip);
695        if effective_disable_geoip {
696            payload["disable_geoip"] = json!(true);
697        }
698        if let Some(flag_keys) = &options.flag_keys {
699            payload["flag_keys_to_evaluate"] = json!(flag_keys);
700        }
701
702        let response = self
703            .client
704            .post(&flags_endpoint)
705            .header(CONTENT_TYPE, "application/json")
706            .json(&payload)
707            .timeout(Duration::from_secs(
708                self.options.feature_flags_request_timeout_seconds,
709            ))
710            .send()
711            .await
712            .map_err(|e| Error::Connection(e.to_string()))?;
713
714        if !response.status().is_success() {
715            let status = response.status();
716            let text = response
717                .text()
718                .await
719                .unwrap_or_else(|_| "Unknown error".to_string());
720            return Err(Error::Connection(format!(
721                "API request failed with status {status}: {text}"
722            )));
723        }
724
725        let parsed = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
726            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
727        })?;
728        Ok(extract_flag_details(parsed))
729    }
730}
731
732/// Normalised view of a `/flags?v=2` response surfacing the per-flag detail
733/// shape needed by the snapshot path.
734struct DetailedFlagsResponse {
735    flags: HashMap<String, FlagDetail>,
736    request_id: Option<String>,
737    errors_while_computing_flags: bool,
738    quota_limited: bool,
739}
740
741fn extract_flag_details(response: FeatureFlagsResponse) -> DetailedFlagsResponse {
742    match response {
743        FeatureFlagsResponse::V2 {
744            flags,
745            request_id,
746            errors_while_computing_flags,
747            quota_limited,
748        } => DetailedFlagsResponse {
749            flags,
750            request_id,
751            errors_while_computing_flags,
752            quota_limited,
753        },
754        FeatureFlagsResponse::Legacy {
755            feature_flags,
756            feature_flag_payloads,
757            errors,
758        } => {
759            let mut flags = HashMap::new();
760            for (key, value) in feature_flags {
761                let (enabled, variant) = match value {
762                    FlagValue::Boolean(b) => (b, None),
763                    FlagValue::String(s) => (true, Some(s)),
764                };
765                let payload = feature_flag_payloads.get(&key).cloned();
766                flags.insert(
767                    key.clone(),
768                    FlagDetail {
769                        key,
770                        enabled,
771                        variant,
772                        reason: None,
773                        metadata: payload.map(|payload| crate::feature_flags::FlagMetadata {
774                            id: 0,
775                            version: 0,
776                            description: None,
777                            payload: Some(payload),
778                        }),
779                    },
780                );
781            }
782            DetailedFlagsResponse {
783                flags,
784                request_id: None,
785                errors_while_computing_flags: errors.is_some_and(|e| !e.is_empty()),
786                quota_limited: false,
787            }
788        }
789    }
790}
791
792fn local_record(value: FlagValue) -> EvaluatedFlagRecord {
793    let (enabled, variant) = match value {
794        FlagValue::Boolean(b) => (b, None),
795        FlagValue::String(s) => (true, Some(s)),
796    };
797    EvaluatedFlagRecord {
798        enabled,
799        variant,
800        // Local definitions do not surface a payload through the poller today.
801        payload: None,
802        id: None,
803        version: None,
804        reason: Some("Evaluated locally".to_string()),
805        locally_evaluated: true,
806    }
807}
808
809fn remote_record_from_detail(detail: FlagDetail) -> EvaluatedFlagRecord {
810    let metadata = detail.metadata;
811    let reason = detail
812        .reason
813        .and_then(|r| r.description.or(Some(r.code)))
814        .filter(|s| !s.is_empty());
815    let id = metadata.as_ref().map(|m| m.id);
816    let version = metadata.as_ref().map(|m| m.version);
817    let payload = metadata.and_then(|m| m.payload).map(normalize_payload);
818    EvaluatedFlagRecord {
819        enabled: detail.enabled,
820        variant: detail.variant,
821        payload,
822        id,
823        version,
824        reason,
825        locally_evaluated: false,
826    }
827}
828
829/// `metadata.payload` from `/flags?v=2` is sometimes a JSON-encoded string
830/// (e.g. `"{\"color\":\"blue\"}"`) rather than already-parsed JSON. Try to
831/// parse a `String` payload as JSON and fall back to the raw string on
832/// failure so users can branch on a uniform [`serde_json::Value`].
833fn normalize_payload(payload: serde_json::Value) -> serde_json::Value {
834    match payload {
835        serde_json::Value::String(raw) => {
836            serde_json::from_str(&raw).unwrap_or(serde_json::Value::String(raw))
837        }
838        other => other,
839    }
840}