1use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
6use groups::MixpanelGroups;
7use people::MixpanelPeople;
8use reqwest::{Client, Url};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use tokio::time;
13use error::Error;
14
15pub mod error;
16pub mod groups;
17pub mod people;
18mod utils;
19
20pub type Result<T> = std::result::Result<T, Error>;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct Config {
24 pub test: bool,
25 pub debug: bool,
26 pub verbose: bool,
27 pub host: String,
28 pub protocol: String,
29 pub path: String,
30 pub secret: Option<String>,
31 pub api_key: Option<String>,
32 pub geolocate: bool,
33 pub max_retries: u32,
34 pub retry_base_delay_ms: u64,
35 pub retry_max_delay_ms: u64,
36}
37
38impl Default for Config {
39 fn default() -> Self {
40 Self {
41 test: false,
42 debug: false,
43 verbose: false,
44 host: "api.mixpanel.com".to_string(),
45 protocol: "https".to_string(),
46 path: "".to_string(),
47 secret: None,
48 api_key: None,
49 geolocate: false,
50 max_retries: 3,
51 retry_base_delay_ms: 1000,
52 retry_max_delay_ms: 10000,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, Default)]
58pub struct Modifiers {
59 #[serde(rename = "$ip", skip_serializing_if = "Option::is_none")]
60 pub ip: Option<String>,
61
62 #[serde(rename = "$ignore_time", skip_serializing_if = "Option::is_none")]
63 pub ignore_time: Option<bool>,
64
65 #[serde(rename = "$time", skip_serializing_if = "Option::is_none")]
66 pub time: Option<u64>,
67
68 #[serde(rename = "$ignore_alias", skip_serializing_if = "Option::is_none")]
69 pub ignore_alias: Option<bool>,
70
71 #[serde(rename = "$latitude", skip_serializing_if = "Option::is_none")]
72 pub latitude: Option<f64>,
73
74 #[serde(rename = "$longitude", skip_serializing_if = "Option::is_none")]
75 pub longitude: Option<f64>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct Event {
80 pub event: String,
81 pub properties: HashMap<String, serde_json::Value>,
82}
83
84#[derive(Debug, Clone)]
85pub struct Mixpanel {
86 pub token: String,
87 pub config: Config,
88 pub people: MixpanelPeople,
89 pub groups: MixpanelGroups,
90 http_client: Client,
91}
92
93impl Mixpanel {
94 pub fn init(token: &str, config: Option<Config>) -> Self {
96 let config = config.unwrap_or_default();
97 let http_client = Client::builder()
98 .build()
99 .expect("Failed to create HTTP client");
100
101 let mut instance = Self {
102 token: token.to_string(),
103 config,
104 people: MixpanelPeople::default(),
105 groups: MixpanelGroups::default(),
106 http_client,
107 };
108
109 instance.people.mixpanel = Some(Box::new(instance.clone()));
110 instance.groups.mixpanel = Some(Box::new(instance.clone()));
111
112 instance
113 }
114
115 pub async fn track<S: Into<String>>(
117 &self,
118 event: S,
119 properties: Option<HashMap<String, serde_json::Value>>,
120 ) -> Result<()> {
121 let mut props = properties.unwrap_or_default();
122 props.insert("token".to_string(), self.token.clone().into());
123 props.insert("mp_lib".to_string(), "rust".into());
124 props.insert("$lib_version".to_string(), env!("CARGO_PKG_VERSION").into());
125
126 if let Some(time_value) = props.get("time") {
128 if let Some(time_num) = time_value.as_u64() {
129 props.insert("time".to_string(), time_num.into());
130 } else if let Some(time_str) = time_value.as_str() {
131 if let Ok(time_num) = time_str.parse::<u64>() {
133 props.insert("time".to_string(), time_num.into());
134 }
135 }
136 }
137
138 let data = Event {
139 event: event.into(),
140 properties: props,
141 };
142
143 if self.config.debug {
144 println!("Sending event to Mixpanel: {:?}", &data);
145 }
146
147 self.send_request("GET", "/track", &data).await
148 }
149
150 pub async fn track_batch(&self, events: Vec<Event>) -> Result<()> {
152 let events: Vec<Event> = events
154 .into_iter()
155 .map(|event| {
156 let mut props = event.properties;
157 props.insert("token".to_string(), self.token.clone().into());
158 props.insert("mp_lib".to_string(), "rust".into());
159 props.insert("$lib_version".to_string(), env!("CARGO_PKG_VERSION").into());
160
161 Event {
162 event: event.event,
163 properties: props,
164 }
165 })
166 .collect();
167
168 if self.config.debug {
169 println!("Sending batch of {} events to Mixpanel", events.len());
170 }
171
172 const MAX_BATCH_SIZE: usize = 50;
174
175 for chunk in events.chunks(MAX_BATCH_SIZE) {
176 self.send_request("POST", "/track", chunk).await?;
177 }
178
179 Ok(())
180 }
181
182 pub async fn alias<S: Into<String>>(&self, distinct_id: S, alias: S) -> Result<()> {
184 let mut properties = HashMap::new();
185 properties.insert("distinct_id".to_string(), distinct_id.into().into());
186 properties.insert("alias".to_string(), alias.into().into());
187
188 self.track("$create_alias", Some(properties)).await
189 }
190
191 pub async fn send_request<T: Serialize + ?Sized>(
193 &self,
194 method: &str,
195 endpoint: &str,
196 data: &T,
197 ) -> Result<()> {
198 let mut retries = 0;
199 let max_retries = self.config.max_retries;
200
201 loop {
202 match self.do_send_request(method, endpoint, data).await {
203 Ok(result) => return Ok(result),
204
205 Err(err) => {
206 if retries >= max_retries {
207 return Err(Error::MaxRetriesReached(format!(
208 "Failed after {} retries. Last error: {}",
209 retries, err
210 )));
211 }
212
213 let should_retry = match &err {
214 Error::HttpError(http_err) => http_err.is_connect() || http_err.is_timeout(),
215 Error::ApiServerError(_) => true,
216 Error::ApiRateLimitError(_) => true,
217 _ => false,
218 };
219
220 if !should_retry {
221 return Err(err);
222 }
223
224 let base_delay = self.config.retry_base_delay_ms;
225 let max_delay = self.config.retry_max_delay_ms;
226
227 let wait_time = match &err {
228 Error::ApiRateLimitError(Some(retry_after)) => {
229 Duration::from_secs(*retry_after)
230 },
231 _ => {
232 let delay = base_delay * (1 << retries);
233 let capped_delay = std::cmp::min(delay, max_delay);
234 Duration::from_millis(capped_delay)
235 }
236 };
237
238 if self.config.debug {
239 println!("Retrying request after error: {}. Retry {} of {}. Waiting {:?}",
240 err, retries + 1, max_retries, wait_time);
241 }
242
243 time::sleep(wait_time).await;
244 retries += 1;
245 }
246 }
247 }
248 }
249
250 async fn do_send_request<T: Serialize + ?Sized>(
252 &self,
253 method: &str,
254 endpoint: &str,
255 data: &T,
256 ) -> Result<()> {
257 let data_json = serde_json::to_string(data)?;
258 let encoded_data = BASE64.encode(data_json.as_bytes());
259
260 let mut url = Url::parse(&format!(
261 "{}://{}{}",
262 self.config.protocol, self.config.host, self.config.path
263 ))?;
264
265 let endpoint = if endpoint.starts_with('/') {
266 &endpoint[1..]
267 } else {
268 endpoint
269 };
270 url.set_path(&format!("{}{}", url.path(), endpoint));
271
272 {
273 let mut query_pairs = url.query_pairs_mut();
274
275 if self.config.geolocate {
276 query_pairs.append_pair("ip", "1");
277 } else {
278 query_pairs.append_pair("ip", "0");
279 }
280
281 if self.config.verbose {
282 query_pairs.append_pair("verbose", "1");
283 } else {
284 query_pairs.append_pair("verbose", "0");
285 }
286
287 if method.to_uppercase() == "GET" {
288 query_pairs.append_pair("data", &encoded_data);
289 }
290
291 if self.config.test {
292 query_pairs.append_pair("test", "1");
293 }
294 }
295
296 let mut request_builder = match method.to_uppercase().as_str() {
297 "GET" => self.http_client.get(url),
298 "POST" => {
299 let mut builder = self.http_client.post(url);
300 builder = builder.header("Content-Type", "application/x-www-form-urlencoded");
301 builder = builder.body(format!("data={}", encoded_data));
302 builder
303 }
304 _ => {
305 return Err(Error::ApiClientError(
306 0,
307 format!("Unsupported HTTP method: {}", method),
308 ));
309 }
310 };
311
312 if let Some(ref secret) = self.config.secret {
313 let auth_header = format!("Basic {}", BASE64.encode(format!("{}:", secret).as_bytes()));
314 request_builder = request_builder.header("Authorization", auth_header);
315 }
316
317 let response = request_builder.send().await?;
318 let status = response.status();
319 let status_code = status.as_u16();
320
321 if status.is_success() {
322 let body = response.text().await?;
323 if self.config.verbose {
324 match serde_json::from_str::<serde_json::Value>(&body) {
325 Ok(json) => {
326 if let Some(api_status) = json.get("status").and_then(|s| s.as_u64()) {
327 if api_status != 1 {
328 if let Some(error_msg) = json.get("error").and_then(|e| e.as_str())
329 {
330 return Err(Error::ApiClientError(
331 status_code,
332 error_msg.to_string(),
333 ));
334 } else {
335 return Err(Error::ApiUnexpectedResponse(format!(
336 "Response status was not 1: {}",
337 body
338 )));
339 }
340 }
341 Ok(())
342 } else {
343 Err(Error::ApiUnexpectedResponse(format!(
344 "Response missing status: {}",
345 body
346 )))
347 }
348 }
349 Err(e) => Err(Error::JsonError(e)),
350 }
351 } else if body != "1" {
352 Err(Error::ApiUnexpectedResponse(body))
353 } else {
354 Ok(())
355 }
356 } else {
357 match status_code {
358 413 => Err(Error::ApiPayloadTooLarge),
359 429 => {
360 let retry_after = response
361 .headers()
362 .get("Retry-After")
363 .and_then(|v| v.to_str().ok())
364 .and_then(|s| s.parse::<u64>().ok());
365 Err(Error::ApiRateLimitError(retry_after))
366 }
367 s if s >= 500 => Err(Error::ApiServerError(s)),
368 s if s >= 400 => {
369 let body = response.text().await.unwrap_or_else(|e| e.to_string());
370 Err(Error::ApiClientError(s, body))
371 }
372 _ => {
373 let body = response.text().await.unwrap_or_else(|e| e.to_string());
374 Err(Error::ApiHttpError(status_code, body))
375 }
376 }
377 }
378 }
379
380 pub fn now() -> u64 {
381 SystemTime::now()
382 .duration_since(UNIX_EPOCH)
383 .expect("Time went backwards")
384 .as_secs()
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn test_init() {
394 let mp = Mixpanel::init("test_token", None);
395 assert_eq!(mp.token, "test_token");
396 assert_eq!(mp.config.host, "api.mixpanel.com");
397 }
398
399 #[test]
400 fn test_custom_config() {
401 let config = Config {
402 host: "custom.example.com".to_string(),
403 test: true,
404 ..Default::default()
405 };
406
407 let mp = Mixpanel::init("test_token", Some(config));
408 assert_eq!(mp.config.host, "custom.example.com");
409 assert!(mp.config.test);
410 }
411}