telemetry_kit/sync/
client.rs

1//! Sync client for pushing events to telemetry-kit.dev
2
3use super::{
4    auth::HmacAuth, config::SyncConfig, retry::RetryStrategy, ErrorResponse, SyncResponse,
5};
6use crate::error::{Result, TelemetryError};
7use crate::event::EventBatch;
8use chrono::Utc;
9use reqwest::{header::HeaderMap, Client as HttpClient, StatusCode};
10use std::time::Duration;
11use uuid::Uuid;
12
13const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
14const SCHEMA_VERSION: &str = "1.0.0";
15
16/// Sync client for pushing events to the server
17pub struct SyncClient {
18    config: SyncConfig,
19    auth: HmacAuth,
20    http_client: HttpClient,
21    retry_strategy: RetryStrategy,
22}
23
24impl SyncClient {
25    /// Create a new sync client
26    pub fn new(config: SyncConfig) -> Result<Self> {
27        let auth = HmacAuth::new(config.secret.clone());
28        let http_client = HttpClient::builder()
29            .timeout(Duration::from_secs(30))
30            .build()?;
31
32        let retry_strategy = RetryStrategy::new(config.max_retries, 1000);
33
34        Ok(Self {
35            config,
36            auth,
37            http_client,
38            retry_strategy,
39        })
40    }
41
42    /// Sync a batch of events to the server
43    ///
44    /// This method handles:
45    /// - HMAC signature generation
46    /// - DNT (Do Not Track) checking
47    /// - Retry logic with exponential backoff
48    /// - Rate limit handling
49    pub async fn sync(&self, batch: EventBatch) -> Result<SyncResponse> {
50        // Check DNT header if enabled
51        if self.config.respect_dnt && is_dnt_enabled() {
52            return Err(TelemetryError::Other(
53                "DNT (Do Not Track) is enabled, skipping sync".to_string(),
54            ));
55        }
56
57        let mut retry_count = 0;
58
59        loop {
60            match self.try_sync(&batch, retry_count).await {
61                Ok(response) => return Ok(response),
62                Err(e) if e.is_retryable() && self.retry_strategy.should_retry(retry_count) => {
63                    let delay = self.retry_strategy.delay_for(retry_count);
64                    tokio::time::sleep(delay).await;
65                    retry_count += 1;
66                }
67                Err(e) => return Err(e),
68            }
69        }
70    }
71
72    /// Attempt to sync events (single attempt, no retry)
73    async fn try_sync(&self, batch: &EventBatch, _retry_count: u32) -> Result<SyncResponse> {
74        if batch.is_empty() {
75            return Ok(SyncResponse::Success {
76                accepted: 0,
77                rejected: 0,
78                message: "No events to sync".to_string(),
79            });
80        }
81
82        // Generate timestamp and nonce
83        let timestamp = Utc::now().timestamp().to_string();
84        let nonce = Uuid::new_v4().to_string();
85
86        // Serialize body
87        let body = serde_json::to_string(batch)?;
88
89        // Calculate HMAC signature
90        let signature = self.auth.sign(&timestamp, &nonce, &body);
91
92        // Build headers
93        let mut headers = HeaderMap::new();
94        headers.insert("Content-Type", "application/json".parse().unwrap());
95        headers.insert("X-Signature", signature.parse().unwrap());
96        headers.insert("X-Timestamp", timestamp.parse().unwrap());
97        headers.insert("X-Nonce", nonce.parse().unwrap());
98        headers.insert("X-Batch-Size", batch.size().to_string().parse().unwrap());
99        headers.insert(
100            "X-SDK-Version",
101            format!("telemetry-kit-rust/{}", SDK_VERSION)
102                .parse()
103                .unwrap(),
104        );
105        headers.insert("X-Schema-Version", SCHEMA_VERSION.parse().unwrap());
106
107        // GNU Terry Pratchett - keeping his memory alive in the overhead
108        // See: http://www.gnuterrypratchett.com/
109        headers.insert("X-Clacks-Overhead", "GNU Terry Pratchett".parse().unwrap());
110
111        // Send request
112        let url = self.config.ingestion_url();
113        let response = self
114            .http_client
115            .post(&url)
116            .headers(headers)
117            .body(body)
118            .send()
119            .await?;
120
121        let status = response.status();
122
123        // Handle response based on status code
124        match status {
125            StatusCode::OK => {
126                let sync_response: SyncResponse = response.json().await?;
127                Ok(sync_response)
128            }
129
130            StatusCode::MULTI_STATUS => {
131                let sync_response: SyncResponse = response.json().await?;
132                Ok(sync_response)
133            }
134
135            StatusCode::TOO_MANY_REQUESTS => {
136                let error_response: ErrorResponse = response.json().await?;
137                let retry_after = error_response.retry_after.unwrap_or(60);
138                Err(TelemetryError::RateLimitExceeded { retry_after })
139            }
140
141            StatusCode::BAD_REQUEST
142            | StatusCode::UNAUTHORIZED
143            | StatusCode::FORBIDDEN
144            | StatusCode::CONFLICT
145            | StatusCode::PAYLOAD_TOO_LARGE
146            | StatusCode::UNPROCESSABLE_ENTITY => {
147                let error_response: ErrorResponse = response.json().await?;
148                Err(TelemetryError::ServerError {
149                    status: status.as_u16(),
150                    message: format!("{}: {}", error_response.error, error_response.message),
151                })
152            }
153
154            _ if status.is_server_error() => {
155                let error_text = response.text().await.unwrap_or_default();
156                Err(TelemetryError::ServerError {
157                    status: status.as_u16(),
158                    message: error_text,
159                })
160            }
161
162            _ => {
163                let error_text = response.text().await.unwrap_or_default();
164                Err(TelemetryError::Other(format!(
165                    "Unexpected status code {}: {}",
166                    status, error_text
167                )))
168            }
169        }
170    }
171
172    /// Get the sync configuration
173    pub fn config(&self) -> &SyncConfig {
174        &self.config
175    }
176}
177
178/// Check if DNT (Do Not Track) is enabled
179///
180/// Checks the DNT environment variable
181fn is_dnt_enabled() -> bool {
182    std::env::var("DNT")
183        .ok()
184        .and_then(|v| v.parse::<u8>().ok())
185        .map(|v| v == 1)
186        .unwrap_or(false)
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    fn create_test_config() -> SyncConfig {
194        SyncConfig::builder()
195            .org_id("550e8400-e29b-41d4-a716-446655440000")
196            .unwrap()
197            .app_id("7c9e6679-7425-40de-944b-e07fc1f90ae7")
198            .unwrap()
199            .token("tk_test_token")
200            .secret("test_secret")
201            .build()
202            .unwrap()
203    }
204
205    #[test]
206    fn test_client_creation() {
207        let config = create_test_config();
208        let client = SyncClient::new(config);
209        assert!(client.is_ok());
210    }
211
212    #[test]
213    fn test_empty_batch() {
214        let rt = tokio::runtime::Runtime::new().unwrap();
215        rt.block_on(async {
216            let config = create_test_config();
217            let client = SyncClient::new(config).unwrap();
218
219            let batch = EventBatch::new(vec![]);
220            let result = client.try_sync(&batch, 0).await;
221
222            assert!(result.is_ok());
223            let response = result.unwrap();
224            assert_eq!(response.accepted(), 0);
225        });
226    }
227
228    #[test]
229    fn test_dnt_detection() {
230        // DNT not set
231        std::env::remove_var("DNT");
232        assert!(!is_dnt_enabled());
233
234        // DNT = 0
235        std::env::set_var("DNT", "0");
236        assert!(!is_dnt_enabled());
237
238        // DNT = 1
239        std::env::set_var("DNT", "1");
240        assert!(is_dnt_enabled());
241
242        // Cleanup
243        std::env::remove_var("DNT");
244    }
245}