Skip to main content

posthog_rs/client/
async_client.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
5use serde_json::json;
6use tracing::{debug, instrument, trace, warn};
7
8use crate::endpoints::{Endpoint, EndpointManager};
9use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagValue};
10use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
11use crate::{event::InnerEvent, Error, Event};
12
13use super::ClientOptions;
14
15/// A [`Client`] facilitates interactions with the PostHog API over HTTP.
16pub struct Client {
17    options: ClientOptions,
18    client: HttpClient,
19    local_evaluator: Option<LocalEvaluator>,
20    _flag_poller: Option<AsyncFlagPoller>,
21}
22
23/// This function constructs a new client using the options provided.
24pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
25    let mut options = options.into();
26    // Ensure endpoint_manager is properly initialized based on the host
27    options.endpoint_manager = EndpointManager::new(options.host.clone());
28    let client = HttpClient::builder()
29        .timeout(Duration::from_secs(options.request_timeout_seconds))
30        .build()
31        .unwrap(); // Unwrap here is as safe as `HttpClient::new`
32
33    let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
34        if let Some(ref personal_key) = options.personal_api_key {
35            let cache = FlagCache::new();
36
37            let config = LocalEvaluationConfig {
38                personal_api_key: personal_key.clone(),
39                project_api_key: options.api_key.clone(),
40                api_host: options.endpoints().api_host(),
41                poll_interval: Duration::from_secs(options.poll_interval_seconds),
42                request_timeout: Duration::from_secs(options.request_timeout_seconds),
43            };
44
45            let mut poller = AsyncFlagPoller::new(config, cache.clone());
46            poller.start().await;
47
48            (Some(LocalEvaluator::new(cache)), Some(poller))
49        } else {
50            warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
51            (None, None)
52        }
53    } else {
54        (None, None)
55    };
56
57    Client {
58        options,
59        client,
60        local_evaluator,
61        _flag_poller: flag_poller,
62    }
63}
64
65impl Client {
66    /// Capture the provided event, sending it to PostHog.
67    #[instrument(skip(self, event), level = "debug")]
68    pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
69        if self.options.is_disabled() {
70            trace!("Client is disabled, skipping capture");
71            return Ok(());
72        }
73
74        // Add geoip disable property if configured
75        if self.options.disable_geoip {
76            event.insert_prop("$geoip_disable", true).ok();
77        }
78
79        let inner_event = InnerEvent::new(event, self.options.api_key.clone());
80
81        let payload =
82            serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
83
84        let url = self.options.endpoints().build_url(Endpoint::Capture);
85
86        self.client
87            .post(&url)
88            .header(CONTENT_TYPE, "application/json")
89            .body(payload)
90            .send()
91            .await
92            .map_err(|e| Error::Connection(e.to_string()))?;
93
94        Ok(())
95    }
96
97    /// Capture a collection of events with a single request. This function may be
98    /// more performant than capturing a list of events individually.
99    pub async fn capture_batch(&self, events: Vec<Event>) -> Result<(), Error> {
100        if self.options.is_disabled() {
101            return Ok(());
102        }
103
104        let disable_geoip = self.options.disable_geoip;
105        let events: Vec<_> = events
106            .into_iter()
107            .map(|mut event| {
108                // Add geoip disable property if configured
109                if disable_geoip {
110                    event.insert_prop("$geoip_disable", true).ok();
111                }
112                InnerEvent::new(event, self.options.api_key.clone())
113            })
114            .collect();
115
116        let payload =
117            serde_json::to_string(&events).map_err(|e| Error::Serialization(e.to_string()))?;
118
119        let url = self.options.endpoints().build_url(Endpoint::Capture);
120
121        self.client
122            .post(&url)
123            .header(CONTENT_TYPE, "application/json")
124            .body(payload)
125            .send()
126            .await
127            .map_err(|e| Error::Connection(e.to_string()))?;
128
129        Ok(())
130    }
131
132    /// Get all feature flags for a user
133    #[must_use = "feature flags result should be used"]
134    pub async fn get_feature_flags<S: Into<String>>(
135        &self,
136        distinct_id: S,
137        groups: Option<HashMap<String, String>>,
138        person_properties: Option<HashMap<String, serde_json::Value>>,
139        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
140    ) -> Result<
141        (
142            HashMap<String, FlagValue>,
143            HashMap<String, serde_json::Value>,
144        ),
145        Error,
146    > {
147        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
148
149        let mut payload = json!({
150            "api_key": self.options.api_key,
151            "distinct_id": distinct_id.into(),
152        });
153
154        if let Some(groups) = groups {
155            payload["groups"] = json!(groups);
156        }
157
158        if let Some(person_properties) = person_properties {
159            payload["person_properties"] = json!(person_properties);
160        }
161
162        if let Some(group_properties) = group_properties {
163            payload["group_properties"] = json!(group_properties);
164        }
165
166        // Add geoip disable parameter if configured
167        if self.options.disable_geoip {
168            payload["disable_geoip"] = json!(true);
169        }
170
171        let response = self
172            .client
173            .post(&flags_endpoint)
174            .header(CONTENT_TYPE, "application/json")
175            .json(&payload)
176            .timeout(Duration::from_secs(
177                self.options.feature_flags_request_timeout_seconds,
178            ))
179            .send()
180            .await
181            .map_err(|e| Error::Connection(e.to_string()))?;
182
183        if !response.status().is_success() {
184            let status = response.status();
185            let text = response
186                .text()
187                .await
188                .unwrap_or_else(|_| "Unknown error".to_string());
189            return Err(Error::Connection(format!(
190                "API request failed with status {status}: {text}"
191            )));
192        }
193
194        let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
195            Error::Serialization(format!("Failed to parse feature flags response: {e}"))
196        })?;
197
198        Ok(flags_response.normalize())
199    }
200
201    /// Get a specific feature flag value for a user
202    #[must_use = "feature flag result should be used"]
203    #[instrument(skip_all, level = "debug")]
204    pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
205        &self,
206        key: K,
207        distinct_id: D,
208        groups: Option<HashMap<String, String>>,
209        person_properties: Option<HashMap<String, serde_json::Value>>,
210        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
211    ) -> Result<Option<FlagValue>, Error> {
212        let key_str = key.into();
213        let distinct_id_str = distinct_id.into();
214
215        // Try local evaluation first if available
216        if let Some(ref evaluator) = self.local_evaluator {
217            let empty = HashMap::new();
218            let props = person_properties.as_ref().unwrap_or(&empty);
219            match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
220                Ok(Some(value)) => {
221                    debug!(flag = %key_str, ?value, "Flag evaluated locally");
222                    return Ok(Some(value));
223                }
224                Ok(None) => {
225                    debug!(flag = %key_str, "Flag not found locally, falling back to API");
226                }
227                Err(e) => {
228                    debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
229                }
230            }
231        }
232
233        // Fall back to API
234        trace!(flag = %key_str, "Fetching flag from API");
235        let (feature_flags, _payloads) = self
236            .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
237            .await?;
238        Ok(feature_flags.get(&key_str).cloned())
239    }
240
241    /// Check if a feature flag is enabled for a user
242    #[must_use = "feature flag enabled check result should be used"]
243    pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
244        &self,
245        key: K,
246        distinct_id: D,
247        groups: Option<HashMap<String, String>>,
248        person_properties: Option<HashMap<String, serde_json::Value>>,
249        group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
250    ) -> Result<bool, Error> {
251        let flag_value = self
252            .get_feature_flag(
253                key.into(),
254                distinct_id.into(),
255                groups,
256                person_properties,
257                group_properties,
258            )
259            .await?;
260        Ok(match flag_value {
261            Some(FlagValue::Boolean(b)) => b,
262            Some(FlagValue::String(_)) => true, // Variants are considered enabled
263            None => false,
264        })
265    }
266
267    /// Get a feature flag payload for a user
268    #[must_use = "feature flag payload result should be used"]
269    pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
270        &self,
271        key: K,
272        distinct_id: D,
273    ) -> Result<Option<serde_json::Value>, Error> {
274        let key_str = key.into();
275        let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
276
277        let mut payload = json!({
278            "api_key": self.options.api_key,
279            "distinct_id": distinct_id.into(),
280        });
281
282        // Add geoip disable parameter if configured
283        if self.options.disable_geoip {
284            payload["disable_geoip"] = json!(true);
285        }
286
287        let response = self
288            .client
289            .post(&flags_endpoint)
290            .header(CONTENT_TYPE, "application/json")
291            .json(&payload)
292            .timeout(Duration::from_secs(
293                self.options.feature_flags_request_timeout_seconds,
294            ))
295            .send()
296            .await
297            .map_err(|e| Error::Connection(e.to_string()))?;
298
299        if !response.status().is_success() {
300            return Ok(None);
301        }
302
303        let flags_response: FeatureFlagsResponse = response
304            .json()
305            .await
306            .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
307
308        let (_flags, payloads) = flags_response.normalize();
309        Ok(payloads.get(&key_str).cloned())
310    }
311
312    /// Evaluate a feature flag locally (requires feature flags to be loaded)
313    pub fn evaluate_feature_flag_locally(
314        &self,
315        flag: &FeatureFlag,
316        distinct_id: &str,
317        person_properties: &HashMap<String, serde_json::Value>,
318    ) -> Result<FlagValue, Error> {
319        match_feature_flag(flag, distinct_id, person_properties)
320            .map_err(|e| Error::InconclusiveMatch(e.message))
321    }
322}