Skip to main content

posthog_rs/client/
async_client.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
5use serde_json::json;
6use tracing::{debug, instrument, trace, warn};
7
8use crate::endpoints::{Endpoint, EndpointManager};
9use crate::event::BatchRequest;
10use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagValue};
11use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
12use crate::{event::InnerEvent, Error, Event};
13
14use super::ClientOptions;
15
16async fn check_response(response: reqwest::Response) -> Result<(), Error> {
17    let status = response.status().as_u16();
18    let body = response
19        .text()
20        .await
21        .unwrap_or_else(|_| "Unknown error".to_string());
22
23    match Error::from_http_response(status, body) {
24        Some(err) => Err(err),
25        None => Ok(()),
26    }
27}
28
29/// A [`Client`] facilitates interactions with the PostHog API over HTTP.
30pub struct Client {
31    options: ClientOptions,
32    client: HttpClient,
33    local_evaluator: Option<LocalEvaluator>,
34    _flag_poller: Option<AsyncFlagPoller>,
35}
36
37/// This function constructs a new client using the options provided.
38pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
39    let mut options = options.into();
40    // Ensure endpoint_manager is properly initialized based on the host
41    options.endpoint_manager = EndpointManager::new(options.host.clone());
42    let client = HttpClient::builder()
43        .timeout(Duration::from_secs(options.request_timeout_seconds))
44        .build()
45        .unwrap(); // Unwrap here is as safe as `HttpClient::new`
46
47    let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
48        if let Some(ref personal_key) = options.personal_api_key {
49            let cache = FlagCache::new();
50
51            let config = LocalEvaluationConfig {
52                personal_api_key: personal_key.clone(),
53                project_api_key: options.api_key.clone(),
54                api_host: options.endpoints().api_host(),
55                poll_interval: Duration::from_secs(options.poll_interval_seconds),
56                request_timeout: Duration::from_secs(options.request_timeout_seconds),
57            };
58
59            let mut poller = AsyncFlagPoller::new(config, cache.clone());
60            poller.start().await;
61
62            (Some(LocalEvaluator::new(cache)), Some(poller))
63        } else {
64            warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
65            (None, None)
66        }
67    } else {
68        (None, None)
69    };
70
71    Client {
72        options,
73        client,
74        local_evaluator,
75        _flag_poller: flag_poller,
76    }
77}
78
79impl Client {
80    /// Capture the provided event, sending it to PostHog.
81    #[instrument(skip(self, event), level = "debug")]
82    pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
83        if self.options.is_disabled() {
84            trace!("Client is disabled, skipping capture");
85            return Ok(());
86        }
87
88        // Add geoip disable property if configured
89        if self.options.disable_geoip {
90            event.insert_prop("$geoip_disable", true).ok();
91        }
92
93        let inner_event = InnerEvent::new(event, self.options.api_key.clone());
94
95        let payload =
96            serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
97
98        let url = self.options.endpoints().build_url(Endpoint::Capture);
99
100        let response = self
101            .client
102            .post(&url)
103            .header(CONTENT_TYPE, "application/json")
104            .body(payload)
105            .send()
106            .await
107            .map_err(|e| Error::Connection(e.to_string()))?;
108
109        check_response(response).await
110    }
111
112    /// Capture a collection of events with a single request. Events are sent to
113    /// the `/batch/` endpoint. Set `historical_migration` to `true` to route
114    /// events to the historical ingestion topic, bypassing the main pipeline.
115    pub async fn capture_batch(
116        &self,
117        events: Vec<Event>,
118        historical_migration: bool,
119    ) -> Result<(), Error> {
120        if self.options.is_disabled() {
121            return Ok(());
122        }
123
124        let disable_geoip = self.options.disable_geoip;
125        let inner_events: Vec<InnerEvent> = events
126            .into_iter()
127            .map(|mut event| {
128                if disable_geoip {
129                    event.insert_prop("$geoip_disable", true).ok();
130                }
131                InnerEvent::new(event, self.options.api_key.clone())
132            })
133            .collect();
134
135        let batch_request = BatchRequest {
136            api_key: self.options.api_key.clone(),
137            historical_migration,
138            batch: inner_events,
139        };
140        let payload = serde_json::to_string(&batch_request)
141            .map_err(|e| Error::Serialization(e.to_string()))?;
142        let url = self.options.endpoints().build_url(Endpoint::Batch);
143
144        let response = self
145            .client
146            .post(&url)
147            .header(CONTENT_TYPE, "application/json")
148            .body(payload)
149            .send()
150            .await
151            .map_err(|e| Error::Connection(e.to_string()))?;
152
153        check_response(response).await
154    }
155
156    /// Get all feature flags for a user
157    #[must_use = "feature flags result should be used"]
158    pub async fn get_feature_flags<S: Into<String>>(
159        &self,
160        distinct_id: S,
161        groups: Option<HashMap<String, String>>,
162        person_properties: Option<HashMap<String, serde_json::Value>>,
163        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
164    ) -> Result<
165        (
166            HashMap<String, FlagValue>,
167            HashMap<String, serde_json::Value>,
168        ),
169        Error,
170    > {
171        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
172
173        let mut payload = json!({
174            "api_key": self.options.api_key,
175            "distinct_id": distinct_id.into(),
176        });
177
178        if let Some(groups) = groups {
179            payload["groups"] = json!(groups);
180        }
181
182        if let Some(person_properties) = person_properties {
183            payload["person_properties"] = json!(person_properties);
184        }
185
186        if let Some(group_properties) = group_properties {
187            payload["group_properties"] = json!(group_properties);
188        }
189
190        // Add geoip disable parameter if configured
191        if self.options.disable_geoip {
192            payload["disable_geoip"] = json!(true);
193        }
194
195        let response = self
196            .client
197            .post(&flags_endpoint)
198            .header(CONTENT_TYPE, "application/json")
199            .json(&payload)
200            .timeout(Duration::from_secs(
201                self.options.feature_flags_request_timeout_seconds,
202            ))
203            .send()
204            .await
205            .map_err(|e| Error::Connection(e.to_string()))?;
206
207        if !response.status().is_success() {
208            let status = response.status();
209            let text = response
210                .text()
211                .await
212                .unwrap_or_else(|_| "Unknown error".to_string());
213            return Err(Error::Connection(format!(
214                "API request failed with status {status}: {text}"
215            )));
216        }
217
218        let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
219            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
220        })?;
221
222        Ok(flags_response.normalize())
223    }
224
225    /// Get a specific feature flag value for a user
226    #[must_use = "feature flag result should be used"]
227    #[instrument(skip_all, level = "debug")]
228    pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
229        &self,
230        key: K,
231        distinct_id: D,
232        groups: Option<HashMap<String, String>>,
233        person_properties: Option<HashMap<String, serde_json::Value>>,
234        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
235    ) -> Result<Option<FlagValue>, Error> {
236        let key_str = key.into();
237        let distinct_id_str = distinct_id.into();
238
239        // Try local evaluation first if available
240        if let Some(ref evaluator) = self.local_evaluator {
241            let empty = HashMap::new();
242            let props = person_properties.as_ref().unwrap_or(&empty);
243            match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
244                Ok(Some(value)) => {
245                    debug!(flag = %key_str, ?value, "Flag evaluated locally");
246                    return Ok(Some(value));
247                }
248                Ok(None) => {
249                    debug!(flag = %key_str, "Flag not found locally, falling back to API");
250                }
251                Err(e) => {
252                    debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
253                }
254            }
255        }
256
257        // Fall back to API
258        trace!(flag = %key_str, "Fetching flag from API");
259        let (feature_flags, _payloads) = self
260            .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
261            .await?;
262        Ok(feature_flags.get(&key_str).cloned())
263    }
264
265    /// Check if a feature flag is enabled for a user
266    #[must_use = "feature flag enabled check result should be used"]
267    pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
268        &self,
269        key: K,
270        distinct_id: D,
271        groups: Option<HashMap<String, String>>,
272        person_properties: Option<HashMap<String, serde_json::Value>>,
273        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
274    ) -> Result<bool, Error> {
275        let flag_value = self
276            .get_feature_flag(
277                key.into(),
278                distinct_id.into(),
279                groups,
280                person_properties,
281                group_properties,
282            )
283            .await?;
284        Ok(match flag_value {
285            Some(FlagValue::Boolean(b)) => b,
286            Some(FlagValue::String(_)) => true, // Variants are considered enabled
287            None => false,
288        })
289    }
290
291    /// Get a feature flag payload for a user
292    #[must_use = "feature flag payload result should be used"]
293    pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
294        &self,
295        key: K,
296        distinct_id: D,
297    ) -> Result<Option<serde_json::Value>, Error> {
298        let key_str = key.into();
299        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
300
301        let mut payload = json!({
302            "api_key": self.options.api_key,
303            "distinct_id": distinct_id.into(),
304        });
305
306        // Add geoip disable parameter if configured
307        if self.options.disable_geoip {
308            payload["disable_geoip"] = json!(true);
309        }
310
311        let response = self
312            .client
313            .post(&flags_endpoint)
314            .header(CONTENT_TYPE, "application/json")
315            .json(&payload)
316            .timeout(Duration::from_secs(
317                self.options.feature_flags_request_timeout_seconds,
318            ))
319            .send()
320            .await
321            .map_err(|e| Error::Connection(e.to_string()))?;
322
323        if !response.status().is_success() {
324            return Ok(None);
325        }
326
327        let flags_response: FeatureFlagsResponse = response
328            .json()
329            .await
330            .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
331
332        let (_flags, payloads) = flags_response.normalize();
333        Ok(payloads.get(&key_str).cloned())
334    }
335
336    /// Evaluate a feature flag locally (requires feature flags to be loaded)
337    pub fn evaluate_feature_flag_locally(
338        &self,
339        flag: &FeatureFlag,
340        distinct_id: &str,
341        person_properties: &HashMap<String, serde_json::Value>,
342    ) -> Result<FlagValue, Error> {
343        match_feature_flag(flag, distinct_id, person_properties)
344            .map_err(|e| Error::InconclusiveMatch(e.message))
345    }
346}