telemetry_kit/sync/
client.rs1use super::{
4 auth::HmacAuth, config::SyncConfig, retry::RetryStrategy, ErrorResponse, SyncResponse,
5};
6use crate::error::{Result, TelemetryError};
7use crate::event::EventBatch;
8use chrono::Utc;
9use reqwest::{header::HeaderMap, Client as HttpClient, StatusCode};
10use std::time::Duration;
11use uuid::Uuid;
12
13const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
14const SCHEMA_VERSION: &str = "1.0.0";
15
16pub struct SyncClient {
18 config: SyncConfig,
19 auth: HmacAuth,
20 http_client: HttpClient,
21 retry_strategy: RetryStrategy,
22}
23
24impl SyncClient {
25 pub fn new(config: SyncConfig) -> Result<Self> {
27 let auth = HmacAuth::new(config.secret.clone());
28 let http_client = HttpClient::builder()
29 .timeout(Duration::from_secs(30))
30 .build()?;
31
32 let retry_strategy = RetryStrategy::new(config.max_retries, 1000);
33
34 Ok(Self {
35 config,
36 auth,
37 http_client,
38 retry_strategy,
39 })
40 }
41
42 pub async fn sync(&self, batch: EventBatch) -> Result<SyncResponse> {
50 if self.config.respect_dnt && is_dnt_enabled() {
52 return Err(TelemetryError::Other(
53 "DNT (Do Not Track) is enabled, skipping sync".to_string(),
54 ));
55 }
56
57 let mut retry_count = 0;
58
59 loop {
60 match self.try_sync(&batch, retry_count).await {
61 Ok(response) => return Ok(response),
62 Err(e) if e.is_retryable() && self.retry_strategy.should_retry(retry_count) => {
63 let delay = self.retry_strategy.delay_for(retry_count);
64 tokio::time::sleep(delay).await;
65 retry_count += 1;
66 }
67 Err(e) => return Err(e),
68 }
69 }
70 }
71
72 async fn try_sync(&self, batch: &EventBatch, _retry_count: u32) -> Result<SyncResponse> {
74 if batch.is_empty() {
75 return Ok(SyncResponse::Success {
76 accepted: 0,
77 rejected: 0,
78 message: "No events to sync".to_string(),
79 });
80 }
81
82 let timestamp = Utc::now().timestamp().to_string();
84 let nonce = Uuid::new_v4().to_string();
85
86 let body = serde_json::to_string(batch)?;
88
89 let signature = self.auth.sign(×tamp, &nonce, &body);
91
92 let mut headers = HeaderMap::new();
94 headers.insert("Content-Type", "application/json".parse().unwrap());
95 headers.insert("X-Signature", signature.parse().unwrap());
96 headers.insert("X-Timestamp", timestamp.parse().unwrap());
97 headers.insert("X-Nonce", nonce.parse().unwrap());
98 headers.insert("X-Batch-Size", batch.size().to_string().parse().unwrap());
99 headers.insert(
100 "X-SDK-Version",
101 format!("telemetry-kit-rust/{}", SDK_VERSION)
102 .parse()
103 .unwrap(),
104 );
105 headers.insert("X-Schema-Version", SCHEMA_VERSION.parse().unwrap());
106
107 headers.insert("X-Clacks-Overhead", "GNU Terry Pratchett".parse().unwrap());
110
111 let url = self.config.ingestion_url();
113 let response = self
114 .http_client
115 .post(&url)
116 .headers(headers)
117 .body(body)
118 .send()
119 .await?;
120
121 let status = response.status();
122
123 match status {
125 StatusCode::OK => {
126 let sync_response: SyncResponse = response.json().await?;
127 Ok(sync_response)
128 }
129
130 StatusCode::MULTI_STATUS => {
131 let sync_response: SyncResponse = response.json().await?;
132 Ok(sync_response)
133 }
134
135 StatusCode::TOO_MANY_REQUESTS => {
136 let error_response: ErrorResponse = response.json().await?;
137 let retry_after = error_response.retry_after.unwrap_or(60);
138 Err(TelemetryError::RateLimitExceeded { retry_after })
139 }
140
141 StatusCode::BAD_REQUEST
142 | StatusCode::UNAUTHORIZED
143 | StatusCode::FORBIDDEN
144 | StatusCode::CONFLICT
145 | StatusCode::PAYLOAD_TOO_LARGE
146 | StatusCode::UNPROCESSABLE_ENTITY => {
147 let error_response: ErrorResponse = response.json().await?;
148 Err(TelemetryError::ServerError {
149 status: status.as_u16(),
150 message: format!("{}: {}", error_response.error, error_response.message),
151 })
152 }
153
154 _ if status.is_server_error() => {
155 let error_text = response.text().await.unwrap_or_default();
156 Err(TelemetryError::ServerError {
157 status: status.as_u16(),
158 message: error_text,
159 })
160 }
161
162 _ => {
163 let error_text = response.text().await.unwrap_or_default();
164 Err(TelemetryError::Other(format!(
165 "Unexpected status code {}: {}",
166 status, error_text
167 )))
168 }
169 }
170 }
171
172 pub fn config(&self) -> &SyncConfig {
174 &self.config
175 }
176}
177
178fn is_dnt_enabled() -> bool {
182 std::env::var("DNT")
183 .ok()
184 .and_then(|v| v.parse::<u8>().ok())
185 .map(|v| v == 1)
186 .unwrap_or(false)
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 fn create_test_config() -> SyncConfig {
194 SyncConfig::builder()
195 .org_id("550e8400-e29b-41d4-a716-446655440000")
196 .unwrap()
197 .app_id("7c9e6679-7425-40de-944b-e07fc1f90ae7")
198 .unwrap()
199 .token("tk_test_token")
200 .secret("test_secret")
201 .build()
202 .unwrap()
203 }
204
205 #[test]
206 fn test_client_creation() {
207 let config = create_test_config();
208 let client = SyncClient::new(config);
209 assert!(client.is_ok());
210 }
211
212 #[test]
213 fn test_empty_batch() {
214 let rt = tokio::runtime::Runtime::new().unwrap();
215 rt.block_on(async {
216 let config = create_test_config();
217 let client = SyncClient::new(config).unwrap();
218
219 let batch = EventBatch::new(vec![]);
220 let result = client.try_sync(&batch, 0).await;
221
222 assert!(result.is_ok());
223 let response = result.unwrap();
224 assert_eq!(response.accepted(), 0);
225 });
226 }
227
228 #[test]
229 fn test_dnt_detection() {
230 std::env::remove_var("DNT");
232 assert!(!is_dnt_enabled());
233
234 std::env::set_var("DNT", "0");
236 assert!(!is_dnt_enabled());
237
238 std::env::set_var("DNT", "1");
240 assert!(is_dnt_enabled());
241
242 std::env::remove_var("DNT");
244 }
245}