1use std::collections::HashMap;
2use std::time::Duration;
3
4use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
5use serde_json::json;
6use tracing::{debug, instrument, trace, warn};
7
8use crate::endpoints::{Endpoint, EndpointManager};
9use crate::event::BatchRequest;
10use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagValue};
11use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
12use crate::{event::InnerEvent, Error, Event};
13
14use super::ClientOptions;
15
16async fn check_response(response: reqwest::Response) -> Result<(), Error> {
17 let status = response.status().as_u16();
18 let body = response
19 .text()
20 .await
21 .unwrap_or_else(|_| "Unknown error".to_string());
22
23 match Error::from_http_response(status, body) {
24 Some(err) => Err(err),
25 None => Ok(()),
26 }
27}
28
29pub struct Client {
31 options: ClientOptions,
32 client: HttpClient,
33 local_evaluator: Option<LocalEvaluator>,
34 _flag_poller: Option<AsyncFlagPoller>,
35}
36
37pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
39 let mut options = options.into();
40 options.endpoint_manager = EndpointManager::new(options.host.clone());
42 let client = HttpClient::builder()
43 .timeout(Duration::from_secs(options.request_timeout_seconds))
44 .build()
45 .unwrap(); let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
48 if let Some(ref personal_key) = options.personal_api_key {
49 let cache = FlagCache::new();
50
51 let config = LocalEvaluationConfig {
52 personal_api_key: personal_key.clone(),
53 project_api_key: options.api_key.clone(),
54 api_host: options.endpoints().api_host(),
55 poll_interval: Duration::from_secs(options.poll_interval_seconds),
56 request_timeout: Duration::from_secs(options.request_timeout_seconds),
57 };
58
59 let mut poller = AsyncFlagPoller::new(config, cache.clone());
60 poller.start().await;
61
62 (Some(LocalEvaluator::new(cache)), Some(poller))
63 } else {
64 warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
65 (None, None)
66 }
67 } else {
68 (None, None)
69 };
70
71 Client {
72 options,
73 client,
74 local_evaluator,
75 _flag_poller: flag_poller,
76 }
77}
78
79impl Client {
80 #[instrument(skip(self, event), level = "debug")]
82 pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
83 if self.options.is_disabled() {
84 trace!("Client is disabled, skipping capture");
85 return Ok(());
86 }
87
88 if self.options.disable_geoip {
90 event.insert_prop("$geoip_disable", true).ok();
91 }
92
93 let inner_event = InnerEvent::new(event, self.options.api_key.clone());
94
95 let payload =
96 serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
97
98 let url = self.options.endpoints().build_url(Endpoint::Capture);
99
100 let response = self
101 .client
102 .post(&url)
103 .header(CONTENT_TYPE, "application/json")
104 .body(payload)
105 .send()
106 .await
107 .map_err(|e| Error::Connection(e.to_string()))?;
108
109 check_response(response).await
110 }
111
112 pub async fn capture_batch(
116 &self,
117 events: Vec<Event>,
118 historical_migration: bool,
119 ) -> Result<(), Error> {
120 if self.options.is_disabled() {
121 return Ok(());
122 }
123
124 let disable_geoip = self.options.disable_geoip;
125 let inner_events: Vec<InnerEvent> = events
126 .into_iter()
127 .map(|mut event| {
128 if disable_geoip {
129 event.insert_prop("$geoip_disable", true).ok();
130 }
131 InnerEvent::new(event, self.options.api_key.clone())
132 })
133 .collect();
134
135 let batch_request = BatchRequest {
136 api_key: self.options.api_key.clone(),
137 historical_migration,
138 batch: inner_events,
139 };
140 let payload = serde_json::to_string(&batch_request)
141 .map_err(|e| Error::Serialization(e.to_string()))?;
142 let url = self.options.endpoints().build_url(Endpoint::Batch);
143
144 let response = self
145 .client
146 .post(&url)
147 .header(CONTENT_TYPE, "application/json")
148 .body(payload)
149 .send()
150 .await
151 .map_err(|e| Error::Connection(e.to_string()))?;
152
153 check_response(response).await
154 }
155
156 #[must_use = "feature flags result should be used"]
158 pub async fn get_feature_flags<S: Into<String>>(
159 &self,
160 distinct_id: S,
161 groups: Option<HashMap<String, String>>,
162 person_properties: Option<HashMap<String, serde_json::Value>>,
163 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
164 ) -> Result<
165 (
166 HashMap<String, FlagValue>,
167 HashMap<String, serde_json::Value>,
168 ),
169 Error,
170 > {
171 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
172
173 let mut payload = json!({
174 "api_key": self.options.api_key,
175 "distinct_id": distinct_id.into(),
176 });
177
178 if let Some(groups) = groups {
179 payload["groups"] = json!(groups);
180 }
181
182 if let Some(person_properties) = person_properties {
183 payload["person_properties"] = json!(person_properties);
184 }
185
186 if let Some(group_properties) = group_properties {
187 payload["group_properties"] = json!(group_properties);
188 }
189
190 if self.options.disable_geoip {
192 payload["disable_geoip"] = json!(true);
193 }
194
195 let response = self
196 .client
197 .post(&flags_endpoint)
198 .header(CONTENT_TYPE, "application/json")
199 .json(&payload)
200 .timeout(Duration::from_secs(
201 self.options.feature_flags_request_timeout_seconds,
202 ))
203 .send()
204 .await
205 .map_err(|e| Error::Connection(e.to_string()))?;
206
207 if !response.status().is_success() {
208 let status = response.status();
209 let text = response
210 .text()
211 .await
212 .unwrap_or_else(|_| "Unknown error".to_string());
213 return Err(Error::Connection(format!(
214 "API request failed with status {status}: {text}"
215 )));
216 }
217
218 let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
219 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
220 })?;
221
222 Ok(flags_response.normalize())
223 }
224
225 #[must_use = "feature flag result should be used"]
227 #[instrument(skip_all, level = "debug")]
228 pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
229 &self,
230 key: K,
231 distinct_id: D,
232 groups: Option<HashMap<String, String>>,
233 person_properties: Option<HashMap<String, serde_json::Value>>,
234 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
235 ) -> Result<Option<FlagValue>, Error> {
236 let key_str = key.into();
237 let distinct_id_str = distinct_id.into();
238
239 if let Some(ref evaluator) = self.local_evaluator {
241 let empty = HashMap::new();
242 let props = person_properties.as_ref().unwrap_or(&empty);
243 match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
244 Ok(Some(value)) => {
245 debug!(flag = %key_str, ?value, "Flag evaluated locally");
246 return Ok(Some(value));
247 }
248 Ok(None) => {
249 debug!(flag = %key_str, "Flag not found locally, falling back to API");
250 }
251 Err(e) => {
252 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
253 }
254 }
255 }
256
257 trace!(flag = %key_str, "Fetching flag from API");
259 let (feature_flags, _payloads) = self
260 .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
261 .await?;
262 Ok(feature_flags.get(&key_str).cloned())
263 }
264
265 #[must_use = "feature flag enabled check result should be used"]
267 pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
268 &self,
269 key: K,
270 distinct_id: D,
271 groups: Option<HashMap<String, String>>,
272 person_properties: Option<HashMap<String, serde_json::Value>>,
273 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
274 ) -> Result<bool, Error> {
275 let flag_value = self
276 .get_feature_flag(
277 key.into(),
278 distinct_id.into(),
279 groups,
280 person_properties,
281 group_properties,
282 )
283 .await?;
284 Ok(match flag_value {
285 Some(FlagValue::Boolean(b)) => b,
286 Some(FlagValue::String(_)) => true, None => false,
288 })
289 }
290
291 #[must_use = "feature flag payload result should be used"]
293 pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
294 &self,
295 key: K,
296 distinct_id: D,
297 ) -> Result<Option<serde_json::Value>, Error> {
298 let key_str = key.into();
299 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
300
301 let mut payload = json!({
302 "api_key": self.options.api_key,
303 "distinct_id": distinct_id.into(),
304 });
305
306 if self.options.disable_geoip {
308 payload["disable_geoip"] = json!(true);
309 }
310
311 let response = self
312 .client
313 .post(&flags_endpoint)
314 .header(CONTENT_TYPE, "application/json")
315 .json(&payload)
316 .timeout(Duration::from_secs(
317 self.options.feature_flags_request_timeout_seconds,
318 ))
319 .send()
320 .await
321 .map_err(|e| Error::Connection(e.to_string()))?;
322
323 if !response.status().is_success() {
324 return Ok(None);
325 }
326
327 let flags_response: FeatureFlagsResponse = response
328 .json()
329 .await
330 .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
331
332 let (_flags, payloads) = flags_response.normalize();
333 Ok(payloads.get(&key_str).cloned())
334 }
335
336 pub fn evaluate_feature_flag_locally(
338 &self,
339 flag: &FeatureFlag,
340 distinct_id: &str,
341 person_properties: &HashMap<String, serde_json::Value>,
342 ) -> Result<FlagValue, Error> {
343 match_feature_flag(flag, distinct_id, person_properties)
344 .map_err(|e| Error::InconclusiveMatch(e.message))
345 }
346}