mixpanel_rs/
lib.rs

1// mixpanel-rs: A Rust client for Mixpanel
2//
3// Inspired by the Node.js library (https://github.com/mixpanel/mixpanel-node)
4
5use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
6use groups::MixpanelGroups;
7use people::MixpanelPeople;
8use reqwest::{Client, Url};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use tokio::time;
13use error::Error;
14
15pub mod error;
16pub mod groups;
17pub mod people;
18mod utils;
19
20pub type Result<T> = std::result::Result<T, Error>;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct Config {
24    pub test: bool,
25    pub debug: bool,
26    pub verbose: bool,
27    pub host: String,
28    pub protocol: String,
29    pub path: String,
30    pub secret: Option<String>,
31    pub api_key: Option<String>,
32    pub geolocate: bool,
33    pub max_retries: u32,
34    pub retry_base_delay_ms: u64,
35    pub retry_max_delay_ms: u64,
36}
37
38impl Default for Config {
39    fn default() -> Self {
40        Self {
41            test: false,
42            debug: false,
43            verbose: false,
44            host: "api.mixpanel.com".to_string(),
45            protocol: "https".to_string(),
46            path: "".to_string(),
47            secret: None,
48            api_key: None,
49            geolocate: false,
50            max_retries: 3,
51            retry_base_delay_ms: 1000,
52            retry_max_delay_ms: 10000,
53        }
54    }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, Default)]
58pub struct Modifiers {
59    #[serde(rename = "$ip", skip_serializing_if = "Option::is_none")]
60    pub ip: Option<String>,
61
62    #[serde(rename = "$ignore_time", skip_serializing_if = "Option::is_none")]
63    pub ignore_time: Option<bool>,
64
65    #[serde(rename = "$time", skip_serializing_if = "Option::is_none")]
66    pub time: Option<u64>,
67
68    #[serde(rename = "$ignore_alias", skip_serializing_if = "Option::is_none")]
69    pub ignore_alias: Option<bool>,
70
71    #[serde(rename = "$latitude", skip_serializing_if = "Option::is_none")]
72    pub latitude: Option<f64>,
73
74    #[serde(rename = "$longitude", skip_serializing_if = "Option::is_none")]
75    pub longitude: Option<f64>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct Event {
80    pub event: String,
81    pub properties: HashMap<String, serde_json::Value>,
82}
83
84#[derive(Debug, Clone)]
85pub struct Mixpanel {
86    pub token: String,
87    pub config: Config,
88    pub people: MixpanelPeople,
89    pub groups: MixpanelGroups,
90    http_client: Client,
91}
92
93impl Mixpanel {
94    /// Initialize a new Mixpanel client with the given token and optional config
95    pub fn init(token: &str, config: Option<Config>) -> Self {
96        let config = config.unwrap_or_default();
97        let http_client = Client::builder()
98            .build()
99            .expect("Failed to create HTTP client");
100
101        let mut instance = Self {
102            token: token.to_string(),
103            config,
104            people: MixpanelPeople::default(),
105            groups: MixpanelGroups::default(),
106            http_client,
107        };
108
109        instance.people.mixpanel = Some(Box::new(instance.clone()));
110        instance.groups.mixpanel = Some(Box::new(instance.clone()));
111
112        instance
113    }
114
115    /// Track an event with optional properties
116    pub async fn track<S: Into<String>>(
117        &self,
118        event: S,
119        properties: Option<HashMap<String, serde_json::Value>>,
120    ) -> Result<()> {
121        let mut props = properties.unwrap_or_default();
122        props.insert("token".to_string(), self.token.clone().into());
123        props.insert("mp_lib".to_string(), "rust".into());
124        props.insert("$lib_version".to_string(), env!("CARGO_PKG_VERSION").into());
125
126        // Handle time property if it exists
127        if let Some(time_value) = props.get("time") {
128            if let Some(time_num) = time_value.as_u64() {
129                props.insert("time".to_string(), time_num.into());
130            } else if let Some(time_str) = time_value.as_str() {
131                // Try to parse as ISO string - simplified for now
132                if let Ok(time_num) = time_str.parse::<u64>() {
133                    props.insert("time".to_string(), time_num.into());
134                }
135            }
136        }
137
138        let data = Event {
139            event: event.into(),
140            properties: props,
141        };
142
143        if self.config.debug {
144            println!("Sending event to Mixpanel: {:?}", &data);
145        }
146
147        self.send_request("GET", "/track", &data).await
148    }
149
150    /// Track multiple events in a single request (batch)
151    pub async fn track_batch(&self, events: Vec<Event>) -> Result<()> {
152        // Process each event to ensure it has the required properties
153        let events: Vec<Event> = events
154            .into_iter()
155            .map(|event| {
156                let mut props = event.properties;
157                props.insert("token".to_string(), self.token.clone().into());
158                props.insert("mp_lib".to_string(), "rust".into());
159                props.insert("$lib_version".to_string(), env!("CARGO_PKG_VERSION").into());
160
161                Event {
162                    event: event.event,
163                    properties: props,
164                }
165            })
166            .collect();
167
168        if self.config.debug {
169            println!("Sending batch of {} events to Mixpanel", events.len());
170        }
171
172        // Mixpanel accepts a maximum of 50 events per request
173        const MAX_BATCH_SIZE: usize = 50;
174
175        for chunk in events.chunks(MAX_BATCH_SIZE) {
176            self.send_request("POST", "/track", chunk).await?;
177        }
178
179        Ok(())
180    }
181
182    /// Create an alias for a distinct_id
183    pub async fn alias<S: Into<String>>(&self, distinct_id: S, alias: S) -> Result<()> {
184        let mut properties = HashMap::new();
185        properties.insert("distinct_id".to_string(), distinct_id.into().into());
186        properties.insert("alias".to_string(), alias.into().into());
187
188        self.track("$create_alias", Some(properties)).await
189    }
190
191    /// Send a request to the Mixpanel API with automatic retries for certain error types
192    pub async fn send_request<T: Serialize + ?Sized>(
193        &self,
194        method: &str,
195        endpoint: &str,
196        data: &T,
197    ) -> Result<()> {
198        let mut retries = 0;
199        let max_retries = self.config.max_retries;
200        
201        loop {
202            match self.do_send_request(method, endpoint, data).await {
203                Ok(result) => return Ok(result),
204                
205                Err(err) => {
206                    if retries >= max_retries {
207                        return Err(Error::MaxRetriesReached(format!(
208                            "Failed after {} retries. Last error: {}", 
209                            retries, err
210                        )));
211                    }
212                    
213                    let should_retry = match &err {
214                        Error::HttpError(http_err) => http_err.is_connect() || http_err.is_timeout(),
215                        Error::ApiServerError(_) => true,
216                        Error::ApiRateLimitError(_) => true,
217                        _ => false,
218                    };
219                    
220                    if !should_retry {
221                        return Err(err);
222                    }
223                    
224                    let base_delay = self.config.retry_base_delay_ms;
225                    let max_delay = self.config.retry_max_delay_ms;
226                    
227                    let wait_time = match &err {
228                        Error::ApiRateLimitError(Some(retry_after)) => {
229                            Duration::from_secs(*retry_after)
230                        },
231                        _ => {
232                            let delay = base_delay * (1 << retries);
233                            let capped_delay = std::cmp::min(delay, max_delay);
234                            Duration::from_millis(capped_delay)
235                        }
236                    };
237                    
238                    if self.config.debug {
239                        println!("Retrying request after error: {}. Retry {} of {}. Waiting {:?}", 
240                                 err, retries + 1, max_retries, wait_time);
241                    }
242                    
243                    time::sleep(wait_time).await;
244                    retries += 1;
245                }
246            }
247        }
248    }
249
250    /// Internal method to send a request without retries
251    async fn do_send_request<T: Serialize + ?Sized>(
252        &self,
253        method: &str,
254        endpoint: &str,
255        data: &T,
256    ) -> Result<()> {
257        let data_json = serde_json::to_string(data)?;
258        let encoded_data = BASE64.encode(data_json.as_bytes());
259
260        let mut url = Url::parse(&format!(
261            "{}://{}{}",
262            self.config.protocol, self.config.host, self.config.path
263        ))?;
264
265        let endpoint = if endpoint.starts_with('/') {
266            &endpoint[1..]
267        } else {
268            endpoint
269        };
270        url.set_path(&format!("{}{}", url.path(), endpoint));
271
272        {
273            let mut query_pairs = url.query_pairs_mut();
274
275            if self.config.geolocate {
276                query_pairs.append_pair("ip", "1");
277            } else {
278                query_pairs.append_pair("ip", "0");
279            }
280
281            if self.config.verbose {
282                query_pairs.append_pair("verbose", "1");
283            } else {
284                query_pairs.append_pair("verbose", "0");
285            }
286
287            if method.to_uppercase() == "GET" {
288                query_pairs.append_pair("data", &encoded_data);
289            }
290
291            if self.config.test {
292                query_pairs.append_pair("test", "1");
293            }
294        }
295
296        let mut request_builder = match method.to_uppercase().as_str() {
297            "GET" => self.http_client.get(url),
298            "POST" => {
299                let mut builder = self.http_client.post(url);
300                builder = builder.header("Content-Type", "application/x-www-form-urlencoded");
301                builder = builder.body(format!("data={}", encoded_data));
302                builder
303            }
304            _ => {
305                return Err(Error::ApiClientError(
306                    0,
307                    format!("Unsupported HTTP method: {}", method),
308                ));
309            }
310        };
311
312        if let Some(ref secret) = self.config.secret {
313            let auth_header = format!("Basic {}", BASE64.encode(format!("{}:", secret).as_bytes()));
314            request_builder = request_builder.header("Authorization", auth_header);
315        }
316
317        let response = request_builder.send().await?;
318        let status = response.status();
319        let status_code = status.as_u16();
320
321        if status.is_success() {
322            let body = response.text().await?;
323            if self.config.verbose {
324                match serde_json::from_str::<serde_json::Value>(&body) {
325                    Ok(json) => {
326                        if let Some(api_status) = json.get("status").and_then(|s| s.as_u64()) {
327                            if api_status != 1 {
328                                if let Some(error_msg) = json.get("error").and_then(|e| e.as_str())
329                                {
330                                    return Err(Error::ApiClientError(
331                                        status_code,
332                                        error_msg.to_string(),
333                                    ));
334                                } else {
335                                    return Err(Error::ApiUnexpectedResponse(format!(
336                                        "Response status was not 1: {}",
337                                        body
338                                    )));
339                                }
340                            }
341                            Ok(())
342                        } else {
343                            Err(Error::ApiUnexpectedResponse(format!(
344                                "Response missing status: {}",
345                                body
346                            )))
347                        }
348                    }
349                    Err(e) => Err(Error::JsonError(e)),
350                }
351            } else if body != "1" {
352                Err(Error::ApiUnexpectedResponse(body))
353            } else {
354                Ok(())
355            }
356        } else {
357            match status_code {
358                413 => Err(Error::ApiPayloadTooLarge),
359                429 => {
360                    let retry_after = response
361                        .headers()
362                        .get("Retry-After")
363                        .and_then(|v| v.to_str().ok())
364                        .and_then(|s| s.parse::<u64>().ok());
365                    Err(Error::ApiRateLimitError(retry_after))
366                }
367                s if s >= 500 => Err(Error::ApiServerError(s)),
368                s if s >= 400 => {
369                    let body = response.text().await.unwrap_or_else(|e| e.to_string());
370                    Err(Error::ApiClientError(s, body))
371                }
372                _ => {
373                    let body = response.text().await.unwrap_or_else(|e| e.to_string());
374                    Err(Error::ApiHttpError(status_code, body))
375                }
376            }
377        }
378    }
379
380    pub fn now() -> u64 {
381        SystemTime::now()
382            .duration_since(UNIX_EPOCH)
383            .expect("Time went backwards")
384            .as_secs()
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_init() {
394        let mp = Mixpanel::init("test_token", None);
395        assert_eq!(mp.token, "test_token");
396        assert_eq!(mp.config.host, "api.mixpanel.com");
397    }
398
399    #[test]
400    fn test_custom_config() {
401        let config = Config {
402            host: "custom.example.com".to_string(),
403            test: true,
404            ..Default::default()
405        };
406
407        let mp = Mixpanel::init("test_token", Some(config));
408        assert_eq!(mp.config.host, "custom.example.com");
409        assert!(mp.config.test);
410    }
411}