1use std::collections::HashMap;
2use std::time::Duration;
3
4use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
5use serde_json::json;
6use tracing::{debug, instrument, trace, warn};
7
8use crate::endpoints::{Endpoint, EndpointManager};
9use crate::feature_flags::{match_feature_flag, FeatureFlag, FeatureFlagsResponse, FlagValue};
10use crate::local_evaluation::{AsyncFlagPoller, FlagCache, LocalEvaluationConfig, LocalEvaluator};
11use crate::{event::InnerEvent, Error, Event};
12
13use super::ClientOptions;
14
15pub struct Client {
17 options: ClientOptions,
18 client: HttpClient,
19 local_evaluator: Option<LocalEvaluator>,
20 _flag_poller: Option<AsyncFlagPoller>,
21}
22
23pub async fn client<C: Into<ClientOptions>>(options: C) -> Client {
25 let mut options = options.into();
26 options.endpoint_manager = EndpointManager::new(options.host.clone());
28 let client = HttpClient::builder()
29 .timeout(Duration::from_secs(options.request_timeout_seconds))
30 .build()
31 .unwrap(); let (local_evaluator, flag_poller) = if options.enable_local_evaluation {
34 if let Some(ref personal_key) = options.personal_api_key {
35 let cache = FlagCache::new();
36
37 let config = LocalEvaluationConfig {
38 personal_api_key: personal_key.clone(),
39 project_api_key: options.api_key.clone(),
40 api_host: options.endpoints().api_host(),
41 poll_interval: Duration::from_secs(options.poll_interval_seconds),
42 request_timeout: Duration::from_secs(options.request_timeout_seconds),
43 };
44
45 let mut poller = AsyncFlagPoller::new(config, cache.clone());
46 poller.start().await;
47
48 (Some(LocalEvaluator::new(cache)), Some(poller))
49 } else {
50 warn!("Local evaluation enabled but personal_api_key not set, falling back to API evaluation");
51 (None, None)
52 }
53 } else {
54 (None, None)
55 };
56
57 Client {
58 options,
59 client,
60 local_evaluator,
61 _flag_poller: flag_poller,
62 }
63}
64
65impl Client {
66 #[instrument(skip(self, event), level = "debug")]
68 pub async fn capture(&self, mut event: Event) -> Result<(), Error> {
69 if self.options.is_disabled() {
70 trace!("Client is disabled, skipping capture");
71 return Ok(());
72 }
73
74 if self.options.disable_geoip {
76 event.insert_prop("$geoip_disable", true).ok();
77 }
78
79 let inner_event = InnerEvent::new(event, self.options.api_key.clone());
80
81 let payload =
82 serde_json::to_string(&inner_event).map_err(|e| Error::Serialization(e.to_string()))?;
83
84 let url = self.options.endpoints().build_url(Endpoint::Capture);
85
86 self.client
87 .post(&url)
88 .header(CONTENT_TYPE, "application/json")
89 .body(payload)
90 .send()
91 .await
92 .map_err(|e| Error::Connection(e.to_string()))?;
93
94 Ok(())
95 }
96
97 pub async fn capture_batch(&self, events: Vec<Event>) -> Result<(), Error> {
100 if self.options.is_disabled() {
101 return Ok(());
102 }
103
104 let disable_geoip = self.options.disable_geoip;
105 let events: Vec<_> = events
106 .into_iter()
107 .map(|mut event| {
108 if disable_geoip {
110 event.insert_prop("$geoip_disable", true).ok();
111 }
112 InnerEvent::new(event, self.options.api_key.clone())
113 })
114 .collect();
115
116 let payload =
117 serde_json::to_string(&events).map_err(|e| Error::Serialization(e.to_string()))?;
118
119 let url = self.options.endpoints().build_url(Endpoint::Capture);
120
121 self.client
122 .post(&url)
123 .header(CONTENT_TYPE, "application/json")
124 .body(payload)
125 .send()
126 .await
127 .map_err(|e| Error::Connection(e.to_string()))?;
128
129 Ok(())
130 }
131
132 #[must_use = "feature flags result should be used"]
134 pub async fn get_feature_flags<S: Into<String>>(
135 &self,
136 distinct_id: S,
137 groups: Option<HashMap<String, String>>,
138 person_properties: Option<HashMap<String, serde_json::Value>>,
139 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
140 ) -> Result<
141 (
142 HashMap<String, FlagValue>,
143 HashMap<String, serde_json::Value>,
144 ),
145 Error,
146 > {
147 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
148
149 let mut payload = json!({
150 "api_key": self.options.api_key,
151 "distinct_id": distinct_id.into(),
152 });
153
154 if let Some(groups) = groups {
155 payload["groups"] = json!(groups);
156 }
157
158 if let Some(person_properties) = person_properties {
159 payload["person_properties"] = json!(person_properties);
160 }
161
162 if let Some(group_properties) = group_properties {
163 payload["group_properties"] = json!(group_properties);
164 }
165
166 if self.options.disable_geoip {
168 payload["disable_geoip"] = json!(true);
169 }
170
171 let response = self
172 .client
173 .post(&flags_endpoint)
174 .header(CONTENT_TYPE, "application/json")
175 .json(&payload)
176 .timeout(Duration::from_secs(
177 self.options.feature_flags_request_timeout_seconds,
178 ))
179 .send()
180 .await
181 .map_err(|e| Error::Connection(e.to_string()))?;
182
183 if !response.status().is_success() {
184 let status = response.status();
185 let text = response
186 .text()
187 .await
188 .unwrap_or_else(|_| "Unknown error".to_string());
189 return Err(Error::Connection(format!(
190 "API request failed with status {status}: {text}"
191 )));
192 }
193
194 let flags_response = response.json::<FeatureFlagsResponse>().await.map_err(|e| {
195 Error::Serialization(format!("Failed to parse feature flags response: {e}"))
196 })?;
197
198 Ok(flags_response.normalize())
199 }
200
201 #[must_use = "feature flag result should be used"]
203 #[instrument(skip_all, level = "debug")]
204 pub async fn get_feature_flag<K: Into<String>, D: Into<String>>(
205 &self,
206 key: K,
207 distinct_id: D,
208 groups: Option<HashMap<String, String>>,
209 person_properties: Option<HashMap<String, serde_json::Value>>,
210 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
211 ) -> Result<Option<FlagValue>, Error> {
212 let key_str = key.into();
213 let distinct_id_str = distinct_id.into();
214
215 if let Some(ref evaluator) = self.local_evaluator {
217 let empty = HashMap::new();
218 let props = person_properties.as_ref().unwrap_or(&empty);
219 match evaluator.evaluate_flag(&key_str, &distinct_id_str, props) {
220 Ok(Some(value)) => {
221 debug!(flag = %key_str, ?value, "Flag evaluated locally");
222 return Ok(Some(value));
223 }
224 Ok(None) => {
225 debug!(flag = %key_str, "Flag not found locally, falling back to API");
226 }
227 Err(e) => {
228 debug!(flag = %key_str, error = %e.message, "Inconclusive local evaluation, falling back to API");
229 }
230 }
231 }
232
233 trace!(flag = %key_str, "Fetching flag from API");
235 let (feature_flags, _payloads) = self
236 .get_feature_flags(distinct_id_str, groups, person_properties, group_properties)
237 .await?;
238 Ok(feature_flags.get(&key_str).cloned())
239 }
240
241 #[must_use = "feature flag enabled check result should be used"]
243 pub async fn is_feature_enabled<K: Into<String>, D: Into<String>>(
244 &self,
245 key: K,
246 distinct_id: D,
247 groups: Option<HashMap<String, String>>,
248 person_properties: Option<HashMap<String, serde_json::Value>>,
249 group_properties: Option<HashMap<String, HashMap<String, serde_json::Value>>>,
250 ) -> Result<bool, Error> {
251 let flag_value = self
252 .get_feature_flag(
253 key.into(),
254 distinct_id.into(),
255 groups,
256 person_properties,
257 group_properties,
258 )
259 .await?;
260 Ok(match flag_value {
261 Some(FlagValue::Boolean(b)) => b,
262 Some(FlagValue::String(_)) => true, None => false,
264 })
265 }
266
267 #[must_use = "feature flag payload result should be used"]
269 pub async fn get_feature_flag_payload<K: Into<String>, D: Into<String>>(
270 &self,
271 key: K,
272 distinct_id: D,
273 ) -> Result<Option<serde_json::Value>, Error> {
274 let key_str = key.into();
275 let flags_endpoint = self.options.endpoints().build_url(Endpoint::Flags);
276
277 let mut payload = json!({
278 "api_key": self.options.api_key,
279 "distinct_id": distinct_id.into(),
280 });
281
282 if self.options.disable_geoip {
284 payload["disable_geoip"] = json!(true);
285 }
286
287 let response = self
288 .client
289 .post(&flags_endpoint)
290 .header(CONTENT_TYPE, "application/json")
291 .json(&payload)
292 .timeout(Duration::from_secs(
293 self.options.feature_flags_request_timeout_seconds,
294 ))
295 .send()
296 .await
297 .map_err(|e| Error::Connection(e.to_string()))?;
298
299 if !response.status().is_success() {
300 return Ok(None);
301 }
302
303 let flags_response: FeatureFlagsResponse = response
304 .json()
305 .await
306 .map_err(|e| Error::Serialization(format!("Failed to parse response: {e}")))?;
307
308 let (_flags, payloads) = flags_response.normalize();
309 Ok(payloads.get(&key_str).cloned())
310 }
311
312 pub fn evaluate_feature_flag_locally(
314 &self,
315 flag: &FeatureFlag,
316 distinct_id: &str,
317 person_properties: &HashMap<String, serde_json::Value>,
318 ) -> Result<FlagValue, Error> {
319 match_feature_flag(flag, distinct_id, person_properties)
320 .map_err(|e| Error::InconclusiveMatch(e.message))
321 }
322}