Skip to main content

posthog_rs/client/
async_client.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
5use serde_json::json;
6use tracing::{debug, instrument, trace, warn};
7
8use crate::endpoints::Endpoint;
9use crate::event::BatchRequest;
10use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagValue};
11use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
12use crate::{event::InnerEvent, Error, Event};
13
14use super::ClientOptions;
15
16async fn check_response(response: reqwest::Response) -> Result<(), Error> {
17    let status = response.status().as_u16();
18    let body = response
19        .text()
20        .await
21        .unwrap_or_else(|_| "Unknown error".to_string());
22
23    match Error::from_http_response(status, body) {
24        Some(err) => Err(err),
25        None => Ok(()),
26    }
27}
28
29/// A [`Client`] facilitates interactions with the PostHog API over HTTP.
30pub struct Client {
31    options: ClientOptions,
32    client: HttpClient,
33    local_evaluator: Option<LocalEvaluator>,
34    _flag_poller: Option<AsyncFlagPoller>,
35}
36
37/// This function constructs a new client using the options provided.
38pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
39    let options = options.into().sanitize();
40    let client = HttpClient::builder()
41        .timeout(Duration::from_secs(options.request_timeout_seconds))
42        .build()
43        .unwrap(); // Unwrap here is as safe as `HttpClient::new`
44
45    let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
46        if let Some(ref personal_key) = options.personal_api_key {
47            let cache = FlagCache::new();
48
49            let config = LocalEvaluationConfig {
50                personal_api_key: personal_key.clone(),
51                project_api_key: options.api_key.clone(),
52                api_host: options.endpoints().api_host(),
53                poll_interval: Duration::from_secs(options.poll_interval_seconds),
54                request_timeout: Duration::from_secs(options.request_timeout_seconds),
55            };
56
57            let mut poller = AsyncFlagPoller::new(config, cache.clone());
58            poller.start().await;
59
60            (Some(LocalEvaluator::new(cache)), Some(poller))
61        } else {
62            warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
63            (None, None)
64        }
65    } else {
66        (None, None)
67    };
68
69    Client {
70        options,
71        client,
72        local_evaluator,
73        _flag_poller: flag_poller,
74    }
75}
76
77impl Client {
78    /// Capture the provided event, sending it to PostHog.
79    #[instrument(skip(self, event), level = "debug")]
80    pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
81        if self.options.is_disabled() {
82            trace!("Client is disabled, skipping capture");
83            return Ok(());
84        }
85
86        // Add geoip disable property if configured
87        if self.options.disable_geoip {
88            event.insert_prop("$geoip_disable", true).ok();
89        }
90
91        let inner_event = InnerEvent::new(event, self.options.api_key.clone());
92
93        let payload =
94            serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
95
96        let url = self.options.endpoints().build_url(Endpoint::Capture);
97
98        let response = self
99            .client
100            .post(&url)
101            .header(CONTENT_TYPE, "application/json")
102            .body(payload)
103            .send()
104            .await
105            .map_err(|e| Error::Connection(e.to_string()))?;
106
107        check_response(response).await
108    }
109
110    /// Capture a collection of events with a single request. Events are sent to
111    /// the `/batch/` endpoint. Set `historical_migration` to `true` to route
112    /// events to the historical ingestion topic, bypassing the main pipeline.
113    pub async fn capture_batch(
114        &self,
115        events: Vec<Event>,
116        historical_migration: bool,
117    ) -> Result<(), Error> {
118        if self.options.is_disabled() {
119            return Ok(());
120        }
121
122        let disable_geoip = self.options.disable_geoip;
123        let inner_events: Vec<InnerEvent> = events
124            .into_iter()
125            .map(|mut event| {
126                if disable_geoip {
127                    event.insert_prop("$geoip_disable", true).ok();
128                }
129                InnerEvent::new(event, self.options.api_key.clone())
130            })
131            .collect();
132
133        let batch_request = BatchRequest {
134            api_key: self.options.api_key.clone(),
135            historical_migration,
136            batch: inner_events,
137        };
138        let payload = serde_json::to_string(&batch_request)
139            .map_err(|e| Error::Serialization(e.to_string()))?;
140        let url = self.options.endpoints().build_url(Endpoint::Batch);
141
142        let response = self
143            .client
144            .post(&url)
145            .header(CONTENT_TYPE, "application/json")
146            .body(payload)
147            .send()
148            .await
149            .map_err(|e| Error::Connection(e.to_string()))?;
150
151        check_response(response).await
152    }
153
154    /// Get all feature flags for a user
155    #[must_use = "feature flags result should be used"]
156    pub async fn get_feature_flags<S: Into<String>>(
157        &self,
158        distinct_id: S,
159        groups: Option<HashMap<String, String>>,
160        person_properties: Option<HashMap<String, serde_json::Value>>,
161        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
162    ) -> Result<
163        (
164            HashMap<String, FlagValue>,
165            HashMap<String, serde_json::Value>,
166        ),
167        Error,
168    > {
169        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
170
171        let mut payload = json!({
172            "api_key": self.options.api_key,
173            "distinct_id": distinct_id.into(),
174        });
175
176        if let Some(groups) = groups {
177            payload["groups"] = json!(groups);
178        }
179
180        if let Some(person_properties) = person_properties {
181            payload["person_properties"] = json!(person_properties);
182        }
183
184        if let Some(group_properties) = group_properties {
185            payload["group_properties"] = json!(group_properties);
186        }
187
188        // Add geoip disable parameter if configured
189        if self.options.disable_geoip {
190            payload["disable_geoip"] = json!(true);
191        }
192
193        let response = self
194            .client
195            .post(&flags_endpoint)
196            .header(CONTENT_TYPE, "application/json")
197            .json(&payload)
198            .timeout(Duration::from_secs(
199                self.options.feature_flags_request_timeout_seconds,
200            ))
201            .send()
202            .await
203            .map_err(|e| Error::Connection(e.to_string()))?;
204
205        if !response.status().is_success() {
206            let status = response.status();
207            let text = response
208                .text()
209                .await
210                .unwrap_or_else(|_| "Unknown error".to_string());
211            return Err(Error::Connection(format!(
212                "API request failed with status {status}: {text}"
213            )));
214        }
215
216        let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
217            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
218        })?;
219
220        Ok(flags_response.normalize())
221    }
222
223    /// Get a specific feature flag value for a user
224    #[must_use = "feature flag result should be used"]
225    #[instrument(skip_all, level = "debug")]
226    pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
227        &self,
228        key: K,
229        distinct_id: D,
230        groups: Option<HashMap<String, String>>,
231        person_properties: Option<HashMap<String, serde_json::Value>>,
232        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
233    ) -> Result<Option<FlagValue>, Error> {
234        let key_str = key.into();
235        let distinct_id_str = distinct_id.into();
236
237        // Try local evaluation first if available
238        if let Some(ref evaluator) = self.local_evaluator {
239            let empty = HashMap::new();
240            let props = person_properties.as_ref().unwrap_or(&empty);
241            match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
242                Ok(Some(value)) => {
243                    debug!(flag = %key_str, ?value, "Flag evaluated locally");
244                    return Ok(Some(value));
245                }
246                Ok(None) => {
247                    if self.options.local_evaluation_only {
248                        debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
249                        return Ok(None);
250                    }
251                    debug!(flag = %key_str, "Flag not found locally, falling back to API");
252                }
253                Err(e) => {
254                    if self.options.local_evaluation_only {
255                        debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
256                        return Ok(None);
257                    }
258                    debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
259                }
260            }
261        }
262
263        // Fall back to API
264        trace!(flag = %key_str, "Fetching flag from API");
265        let (feature_flags, _payloads) = self
266            .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
267            .await?;
268        Ok(feature_flags.get(&key_str).cloned())
269    }
270
271    /// Check if a feature flag is enabled for a user
272    #[must_use = "feature flag enabled check result should be used"]
273    pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
274        &self,
275        key: K,
276        distinct_id: D,
277        groups: Option<HashMap<String, String>>,
278        person_properties: Option<HashMap<String, serde_json::Value>>,
279        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
280    ) -> Result<bool, Error> {
281        let flag_value = self
282            .get_feature_flag(
283                key.into(),
284                distinct_id.into(),
285                groups,
286                person_properties,
287                group_properties,
288            )
289            .await?;
290        Ok(match flag_value {
291            Some(FlagValue::Boolean(b)) => b,
292            Some(FlagValue::String(_)) => true, // Variants are considered enabled
293            None => false,
294        })
295    }
296
297    /// Get a feature flag payload for a user
298    #[must_use = "feature flag payload result should be used"]
299    pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
300        &self,
301        key: K,
302        distinct_id: D,
303    ) -> Result<Option<serde_json::Value>, Error> {
304        let key_str = key.into();
305        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
306
307        let mut payload = json!({
308            "api_key": self.options.api_key,
309            "distinct_id": distinct_id.into(),
310        });
311
312        // Add geoip disable parameter if configured
313        if self.options.disable_geoip {
314            payload["disable_geoip"] = json!(true);
315        }
316
317        let response = self
318            .client
319            .post(&flags_endpoint)
320            .header(CONTENT_TYPE, "application/json")
321            .json(&payload)
322            .timeout(Duration::from_secs(
323                self.options.feature_flags_request_timeout_seconds,
324            ))
325            .send()
326            .await
327            .map_err(|e| Error::Connection(e.to_string()))?;
328
329        if !response.status().is_success() {
330            return Ok(None);
331        }
332
333        let flags_response: FeatureFlagsResponse = response
334            .json()
335            .await
336            .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
337
338        let (_flags, payloads) = flags_response.normalize();
339        Ok(payloads.get(&key_str).cloned())
340    }
341
342    /// Evaluate a feature flag locally (requires feature flags to be loaded)
343    pub fn evaluate_feature_flag_locally(
344        &self,
345        flag: &FeatureFlag,
346        distinct_id: &str,
347        person_properties: &HashMap<String, serde_json::Value>,
348    ) -> Result<FlagValue, Error> {
349        match_feature_flag(flag, distinct_id, person_properties)
350            .map_err(|e| Error::InconclusiveMatch(e.message))
351    }
352}