1use std::collections::HashMap;
2use std::time::Duration;
3
4use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
5use serde_json::json;
6use tracing::{debug, instrument, trace, warn};
7
8use crate::endpoints::Endpoint;
9use crate::event::BatchRequest;
10use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagValue};
11use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
12use crate::{event::InnerEvent, Error, Event};
13
14use super::ClientOptions;
15
16async fn check_response(response: reqwest::Response) -> Result<(), Error> {
17 let status = response.status().as_u16();
18 let body = response
19 .text()
20 .await
21 .unwrap_or_else(|_| "Unknown error".to_string());
22
23 match Error::from_http_response(status, body) {
24 Some(err) => Err(err),
25 None => Ok(()),
26 }
27}
28
29pub struct Client {
31 options: ClientOptions,
32 client: HttpClient,
33 local_evaluator: Option<LocalEvaluator>,
34 _flag_poller: Option<AsyncFlagPoller>,
35}
36
37pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
39 let options = options.into().sanitize();
40 let client = HttpClient::builder()
41 .timeout(Duration::from_secs(options.request_timeout_seconds))
42 .build()
43 .unwrap(); let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
46 if let Some(ref personal_key) = options.personal_api_key {
47 let cache = FlagCache::new();
48
49 let config = LocalEvaluationConfig {
50 personal_api_key: personal_key.clone(),
51 project_api_key: options.api_key.clone(),
52 api_host: options.endpoints().api_host(),
53 poll_interval: Duration::from_secs(options.poll_interval_seconds),
54 request_timeout: Duration::from_secs(options.request_timeout_seconds),
55 };
56
57 let mut poller = AsyncFlagPoller::new(config, cache.clone());
58 poller.start().await;
59
60 (Some(LocalEvaluator::new(cache)), Some(poller))
61 } else {
62 warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
63 (None, None)
64 }
65 } else {
66 (None, None)
67 };
68
69 Client {
70 options,
71 client,
72 local_evaluator,
73 _flag_poller: flag_poller,
74 }
75}
76
77impl Client {
78 #[instrument(skip(self, event), level = "debug")]
80 pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
81 if self.options.is_disabled() {
82 trace!("Client is disabled, skipping capture");
83 return Ok(());
84 }
85
86 if self.options.disable_geoip {
88 event.insert_prop("$geoip_disable", true).ok();
89 }
90
91 let inner_event = InnerEvent::new(event, self.options.api_key.clone());
92
93 let payload =
94 serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
95
96 let url = self.options.endpoints().build_url(Endpoint::Capture);
97
98 let response = self
99 .client
100 .post(&url)
101 .header(CONTENT_TYPE, "application/json")
102 .body(payload)
103 .send()
104 .await
105 .map_err(|e| Error::Connection(e.to_string()))?;
106
107 check_response(response).await
108 }
109
110 pub async fn capture_batch(
114 &self,
115 events: Vec<Event>,
116 historical_migration: bool,
117 ) -> Result<(), Error> {
118 if self.options.is_disabled() {
119 return Ok(());
120 }
121
122 let disable_geoip = self.options.disable_geoip;
123 let inner_events: Vec<InnerEvent> = events
124 .into_iter()
125 .map(|mut event| {
126 if disable_geoip {
127 event.insert_prop("$geoip_disable", true).ok();
128 }
129 InnerEvent::new(event, self.options.api_key.clone())
130 })
131 .collect();
132
133 let batch_request = BatchRequest {
134 api_key: self.options.api_key.clone(),
135 historical_migration,
136 batch: inner_events,
137 };
138 let payload = serde_json::to_string(&batch_request)
139 .map_err(|e| Error::Serialization(e.to_string()))?;
140 let url = self.options.endpoints().build_url(Endpoint::Batch);
141
142 let response = self
143 .client
144 .post(&url)
145 .header(CONTENT_TYPE, "application/json")
146 .body(payload)
147 .send()
148 .await
149 .map_err(|e| Error::Connection(e.to_string()))?;
150
151 check_response(response).await
152 }
153
154 #[must_use = "feature flags result should be used"]
156 pub async fn get_feature_flags<S: Into<String>>(
157 &self,
158 distinct_id: S,
159 groups: Option<HashMap<String, String>>,
160 person_properties: Option<HashMap<String, serde_json::Value>>,
161 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
162 ) -> Result<
163 (
164 HashMap<String, FlagValue>,
165 HashMap<String, serde_json::Value>,
166 ),
167 Error,
168 > {
169 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
170
171 let mut payload = json!({
172 "api_key": self.options.api_key,
173 "distinct_id": distinct_id.into(),
174 });
175
176 if let Some(groups) = groups {
177 payload["groups"] = json!(groups);
178 }
179
180 if let Some(person_properties) = person_properties {
181 payload["person_properties"] = json!(person_properties);
182 }
183
184 if let Some(group_properties) = group_properties {
185 payload["group_properties"] = json!(group_properties);
186 }
187
188 if self.options.disable_geoip {
190 payload["disable_geoip"] = json!(true);
191 }
192
193 let response = self
194 .client
195 .post(&flags_endpoint)
196 .header(CONTENT_TYPE, "application/json")
197 .json(&payload)
198 .timeout(Duration::from_secs(
199 self.options.feature_flags_request_timeout_seconds,
200 ))
201 .send()
202 .await
203 .map_err(|e| Error::Connection(e.to_string()))?;
204
205 if !response.status().is_success() {
206 let status = response.status();
207 let text = response
208 .text()
209 .await
210 .unwrap_or_else(|_| "Unknown error".to_string());
211 return Err(Error::Connection(format!(
212 "API request failed with status {status}: {text}"
213 )));
214 }
215
216 let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
217 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
218 })?;
219
220 Ok(flags_response.normalize())
221 }
222
223 #[must_use = "feature flag result should be used"]
225 #[instrument(skip_all, level = "debug")]
226 pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
227 &self,
228 key: K,
229 distinct_id: D,
230 groups: Option<HashMap<String, String>>,
231 person_properties: Option<HashMap<String, serde_json::Value>>,
232 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
233 ) -> Result<Option<FlagValue>, Error> {
234 let key_str = key.into();
235 let distinct_id_str = distinct_id.into();
236
237 if let Some(ref evaluator) = self.local_evaluator {
239 let empty = HashMap::new();
240 let props = person_properties.as_ref().unwrap_or(&empty);
241 match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
242 Ok(Some(value)) => {
243 debug!(flag = %key_str, ?value, "Flag evaluated locally");
244 return Ok(Some(value));
245 }
246 Ok(None) => {
247 if self.options.local_evaluation_only {
248 debug!(flag = %key_str, "Flag not found locally, skipping remote fallback");
249 return Ok(None);
250 }
251 debug!(flag = %key_str, "Flag not found locally, falling back to API");
252 }
253 Err(e) => {
254 if self.options.local_evaluation_only {
255 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, skipping remote fallback");
256 return Ok(None);
257 }
258 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
259 }
260 }
261 }
262
263 trace!(flag = %key_str, "Fetching flag from API");
265 let (feature_flags, _payloads) = self
266 .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
267 .await?;
268 Ok(feature_flags.get(&key_str).cloned())
269 }
270
271 #[must_use = "feature flag enabled check result should be used"]
273 pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
274 &self,
275 key: K,
276 distinct_id: D,
277 groups: Option<HashMap<String, String>>,
278 person_properties: Option<HashMap<String, serde_json::Value>>,
279 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
280 ) -> Result<bool, Error> {
281 let flag_value = self
282 .get_feature_flag(
283 key.into(),
284 distinct_id.into(),
285 groups,
286 person_properties,
287 group_properties,
288 )
289 .await?;
290 Ok(match flag_value {
291 Some(FlagValue::Boolean(b)) => b,
292 Some(FlagValue::String(_)) => true, None => false,
294 })
295 }
296
297 #[must_use = "feature flag payload result should be used"]
299 pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
300 &self,
301 key: K,
302 distinct_id: D,
303 ) -> Result<Option<serde_json::Value>, Error> {
304 let key_str = key.into();
305 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
306
307 let mut payload = json!({
308 "api_key": self.options.api_key,
309 "distinct_id": distinct_id.into(),
310 });
311
312 if self.options.disable_geoip {
314 payload["disable_geoip"] = json!(true);
315 }
316
317 let response = self
318 .client
319 .post(&flags_endpoint)
320 .header(CONTENT_TYPE, "application/json")
321 .json(&payload)
322 .timeout(Duration::from_secs(
323 self.options.feature_flags_request_timeout_seconds,
324 ))
325 .send()
326 .await
327 .map_err(|e| Error::Connection(e.to_string()))?;
328
329 if !response.status().is_success() {
330 return Ok(None);
331 }
332
333 let flags_response: FeatureFlagsResponse = response
334 .json()
335 .await
336 .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
337
338 let (_flags, payloads) = flags_response.normalize();
339 Ok(payloads.get(&key_str).cloned())
340 }
341
342 pub fn evaluate_feature_flag_locally(
344 &self,
345 flag: &FeatureFlag,
346 distinct_id: &str,
347 person_properties: &HashMap<String, serde_json::Value>,
348 ) -> Result<FlagValue, Error> {
349 match_feature_flag(flag, distinct_id, person_properties)
350 .map_err(|e| Error::InconclusiveMatch(e.message))
351 }
352}