1use crate::builder::TelemetryBuilder;
4use crate::error::{Result, TelemetryError};
5use crate::event::{
6 CommandEventBuilder, Environment, Event, EventBatch, EventData, FeatureEventBuilder, Metadata,
7 ServiceInfo, SCHEMA_VERSION,
8};
9use crate::storage::EventStorage;
10use crate::user::{generate_session_id, generate_user_id};
11use chrono::Utc;
12use std::path::PathBuf;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use uuid::Uuid;
16
17#[cfg(feature = "sync")]
18use crate::sync::{SyncClient, SyncConfig};
19
20#[cfg(feature = "sync")]
21use crate::auto_sync::{AutoSyncConfig, AutoSyncTask};
22
23#[cfg(feature = "sync")]
24use tokio::sync::Mutex;
25
26#[cfg(feature = "privacy")]
27use crate::privacy::{PrivacyConfig, PrivacyManager};
28
29const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");
30
31pub struct TelemetryKit {
33 inner: Arc<TelemetryKitInner>,
34}
35
36struct TelemetryKitInner {
37 service_name: String,
38 service_version: String,
39 user_id: String,
40 session_id: String,
41 environment: Environment,
42 storage: Arc<RwLock<EventStorage>>,
43
44 #[cfg(feature = "sync")]
45 sync_client: Option<SyncClient>,
46
47 #[cfg(feature = "sync")]
48 auto_sync_task: Option<Arc<Mutex<AutoSyncTask>>>,
49
50 #[cfg(feature = "privacy")]
51 privacy_manager: Option<PrivacyManager>,
52}
53
54impl TelemetryKit {
55 pub fn builder() -> TelemetryBuilder {
57 TelemetryBuilder::new()
58 }
59
60 pub(crate) fn new(
62 service_name: String,
63 service_version: String,
64 db_path: PathBuf,
65 #[cfg(feature = "sync")] sync_config: Option<SyncConfig>,
66 #[cfg(feature = "sync")] auto_sync_enabled: bool,
67 #[cfg(feature = "sync")] auto_sync_config: AutoSyncConfig,
68 #[cfg(feature = "privacy")] privacy_config: Option<PrivacyConfig>,
69 ) -> Result<Self> {
70 let user_id = generate_user_id()?;
71 let session_id = generate_session_id();
72 let environment = detect_environment();
73 let storage = EventStorage::new(db_path)?;
74 let storage_arc = Arc::new(RwLock::new(storage));
75
76 #[cfg(feature = "sync")]
77 let sync_client = if let Some(config) = sync_config {
78 Some(SyncClient::new(config)?)
79 } else {
80 None
81 };
82
83 #[cfg(feature = "sync")]
85 let auto_sync_task = if auto_sync_enabled {
86 if let Some(client) = sync_client.as_ref() {
87 let task = AutoSyncTask::start(
88 Arc::new(client.clone()),
89 storage_arc.clone(),
90 auto_sync_config,
91 );
92 Some(Arc::new(Mutex::new(task)))
93 } else {
94 None
95 }
96 } else {
97 None
98 };
99
100 #[cfg(feature = "privacy")]
102 let privacy_manager = if let Some(config) = privacy_config {
103 Some(PrivacyManager::new(config, &service_name)?)
104 } else {
105 None
106 };
107
108 let inner = Arc::new(TelemetryKitInner {
109 service_name,
110 service_version,
111 user_id,
112 session_id,
113 environment,
114 storage: storage_arc,
115 #[cfg(feature = "sync")]
116 sync_client,
117 #[cfg(feature = "sync")]
118 auto_sync_task,
119 #[cfg(feature = "privacy")]
120 privacy_manager,
121 });
122
123 Ok(Self { inner })
124 }
125
126 pub async fn track_command<F>(&self, command: impl Into<String>, builder_fn: F) -> Result<()>
143 where
144 F: FnOnce(CommandEventBuilder) -> CommandEventBuilder,
145 {
146 let builder = CommandEventBuilder::new(command);
147 let event_data = builder_fn(builder).build();
148
149 self.track_event("command_execution", Some("usage"), event_data)
150 .await
151 }
152
153 pub async fn track_feature<F>(&self, feature: impl Into<String>, builder_fn: F) -> Result<()>
169 where
170 F: FnOnce(FeatureEventBuilder) -> FeatureEventBuilder,
171 {
172 let builder = FeatureEventBuilder::new(feature);
173 let event_data = builder_fn(builder).build();
174
175 self.track_event("feature_used", Some("library"), event_data)
176 .await
177 }
178
179 pub async fn track_custom(
181 &self,
182 event_type: impl Into<String>,
183 data: serde_json::Value,
184 ) -> Result<()> {
185 self.track_event(event_type, None, data).await
186 }
187
188 async fn track_event(
190 &self,
191 event_type: impl Into<String>,
192 category: Option<&str>,
193 data: serde_json::Value,
194 ) -> Result<()> {
195 #[cfg(feature = "privacy")]
197 if let Some(privacy_manager) = &self.inner.privacy_manager {
198 if !privacy_manager.should_track()? {
199 return Ok(());
201 }
202 }
203
204 let mut sanitized_data = data;
206 #[cfg(feature = "privacy")]
207 if let Some(privacy_manager) = &self.inner.privacy_manager {
208 privacy_manager.sanitize_data(&mut sanitized_data);
209 }
210
211 let event = Event {
212 schema_version: SCHEMA_VERSION.to_string(),
213 event_id: Uuid::new_v4(),
214 timestamp: Utc::now(),
215 service: ServiceInfo {
216 name: self.inner.service_name.clone(),
217 version: self.inner.service_version.clone(),
218 language: "rust".to_string(),
219 language_version: Some(rustc_version()),
220 },
221 user_id: self.inner.user_id.clone(),
222 session_id: Some(self.inner.session_id.clone()),
223 environment: self.inner.environment.clone(),
224 event: EventData {
225 event_type: event_type.into(),
226 category: category.map(|s| s.to_string()),
227 data: sanitized_data,
228 },
229 metadata: Metadata {
230 sdk_version: format!("telemetry-kit-rust/{}", SDK_VERSION),
231 transmission_timestamp: Utc::now(),
232 batch_size: 1,
233 retry_count: 0,
234 },
235 };
236
237 let storage = self.inner.storage.write().await;
239 storage.insert(&event)?;
240 drop(storage);
241
242 Ok(())
245 }
246
247 #[cfg(feature = "sync")]
249 pub async fn sync(&self) -> Result<()> {
250 if let Some(client) = &self.inner.sync_client {
251 Self::sync_events(Arc::new(client.clone()), self.inner.storage.clone()).await
252 } else {
253 Err(TelemetryError::invalid_config(
254 "sync",
255 "Sync is not configured. Use .with_sync_credentials() when building TelemetryKit",
256 ))
257 }
258 }
259
260 #[cfg(feature = "sync")]
262 async fn sync_events(
263 client: Arc<SyncClient>,
264 storage: Arc<RwLock<EventStorage>>,
265 ) -> Result<()> {
266 let storage_read = storage.read().await;
267 let events = storage_read.get_unsynced(client.config().batch_size)?;
268 drop(storage_read);
269
270 if events.is_empty() {
271 return Ok(());
272 }
273
274 let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect();
275 let batch = EventBatch::new(events);
276
277 match client.sync(batch).await {
278 Ok(response) => {
279 if response.accepted() > 0 {
280 let storage_write = storage.write().await;
281 storage_write.mark_synced(&event_ids)?;
282 }
283 Ok(())
284 }
285 Err(e) => {
286 let storage_write = storage.write().await;
287 storage_write.increment_retry(&event_ids)?;
288 Err(e)
289 }
290 }
291 }
292
293 pub async fn stats(&self) -> Result<EventStats> {
295 let storage = self.inner.storage.read().await;
296 let total = storage.total_count()?;
297 let unsynced = storage.unsynced_count()?;
298
299 Ok(EventStats {
300 total_events: total,
301 unsynced_events: unsynced,
302 synced_events: total - unsynced,
303 })
304 }
305
306 pub async fn cleanup(&self) -> Result<usize> {
308 let storage = self.inner.storage.write().await;
309 storage.cleanup_old_events()
310 }
311
312 #[cfg(feature = "privacy")]
314 pub fn grant_consent(&self) -> Result<()> {
315 if let Some(privacy_manager) = &self.inner.privacy_manager {
316 privacy_manager.grant_consent(&self.inner.service_name)
317 } else {
318 Err(TelemetryError::invalid_config(
319 "privacy",
320 "Privacy features are not enabled. Use .privacy() or .strict_privacy() when building TelemetryKit"
321 ))
322 }
323 }
324
325 #[cfg(feature = "privacy")]
327 pub fn deny_consent(&self) -> Result<()> {
328 if let Some(privacy_manager) = &self.inner.privacy_manager {
329 privacy_manager.deny_consent(&self.inner.service_name)
330 } else {
331 Err(TelemetryError::invalid_config(
332 "privacy",
333 "Privacy features are not enabled. Use .privacy() or .strict_privacy() when building TelemetryKit"
334 ))
335 }
336 }
337
338 #[cfg(feature = "privacy")]
340 pub fn opt_out(&self) -> Result<()> {
341 if let Some(privacy_manager) = &self.inner.privacy_manager {
342 privacy_manager.opt_out(&self.inner.service_name)
343 } else {
344 Err(TelemetryError::InvalidConfig(
345 "Privacy features are not enabled".to_string(),
346 ))
347 }
348 }
349
350 #[cfg(feature = "privacy")]
352 pub fn is_do_not_track_enabled() -> bool {
353 PrivacyManager::is_do_not_track_enabled()
354 }
355
356 #[cfg(feature = "sync")]
358 pub async fn shutdown(&self) -> Result<()> {
359 if let Some(task_mutex) = &self.inner.auto_sync_task {
360 let mut task = task_mutex.lock().await;
361
362 if task.should_sync_on_shutdown() {
364 if let Some(client) = &self.inner.sync_client {
365 let _ = Self::sync_events(Arc::new(client.clone()), self.inner.storage.clone())
366 .await;
367 }
368 }
369
370 task.shutdown();
372 task.join().await?;
373 }
374 Ok(())
375 }
376}
377
378#[cfg(feature = "sync")]
379impl Drop for TelemetryKit {
380 fn drop(&mut self) {
381 }
385}
386
387impl Clone for SyncClient {
388 fn clone(&self) -> Self {
389 Self::new(self.config().clone()).expect("Failed to clone SyncClient")
392 }
393}
394
395#[derive(Debug, Clone)]
397pub struct EventStats {
398 pub total_events: usize,
400 pub unsynced_events: usize,
402 pub synced_events: usize,
404}
405
406fn detect_environment() -> Environment {
408 Environment {
409 os: std::env::consts::OS.to_string(),
410 os_version: None, arch: Some(std::env::consts::ARCH.to_string()),
412 ci: Some(is_ci()),
413 shell: detect_shell(),
414 }
415}
416
417fn is_ci() -> bool {
419 std::env::var("CI").is_ok()
420 || std::env::var("GITHUB_ACTIONS").is_ok()
421 || std::env::var("JENKINS_HOME").is_ok()
422 || std::env::var("TRAVIS").is_ok()
423 || std::env::var("CIRCLECI").is_ok()
424}
425
426fn detect_shell() -> Option<String> {
428 std::env::var("SHELL").ok().and_then(|s| {
429 std::path::Path::new(&s)
430 .file_name()
431 .map(|f| f.to_string_lossy().to_string())
432 })
433}
434
435fn rustc_version() -> String {
437 env!("CARGO_PKG_RUST_VERSION").to_string()
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444
445 #[tokio::test]
446 async fn test_telemetry_creation() {
447 use uuid::Uuid;
448 let unique_name = format!("test-creation-{}", Uuid::new_v4());
449 let telemetry = TelemetryKit::builder()
450 .service_name(&unique_name)
451 .unwrap()
452 .service_version("1.0.0")
453 .build();
454
455 assert!(telemetry.is_ok());
456 }
457
458 #[tokio::test]
459 async fn test_track_command() {
460 use uuid::Uuid;
461 let unique_name = format!("test-track-{}", Uuid::new_v4());
462 let telemetry = TelemetryKit::builder()
463 .service_name(&unique_name)
464 .unwrap()
465 .service_version("1.0.0")
466 .build()
467 .unwrap();
468
469 let result = telemetry
470 .track_command("test", |event| event.success(true).duration_ms(100))
471 .await;
472
473 assert!(result.is_ok());
474 }
475
476 #[tokio::test]
477 async fn test_event_stats() {
478 use uuid::Uuid;
479 let unique_name = format!("test-stats-{}", Uuid::new_v4());
480 let telemetry = TelemetryKit::builder()
481 .service_name(&unique_name)
482 .unwrap()
483 .service_version("1.0.0")
484 .build()
485 .unwrap();
486
487 telemetry
488 .track_command("test", |event| event.success(true))
489 .await
490 .unwrap();
491
492 let stats = telemetry.stats().await.unwrap();
493 assert_eq!(stats.total_events, 1);
494 assert_eq!(stats.unsynced_events, 1);
495 }
496
497 #[test]
498 fn test_ci_detection() {
499 std::env::remove_var("CI");
500 assert!(!is_ci());
501
502 std::env::set_var("CI", "true");
503 assert!(is_ci());
504
505 std::env::remove_var("CI");
506 }
507}