Skip to main content

voirs_cli/telemetry/
storage.rs

1//! Telemetry storage system
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use tokio::fs;
6use tokio::io::AsyncWriteExt;
7
8use super::{
9    events::{EventType, TelemetryEvent},
10    TelemetryError, TelemetryStatistics,
11};
12
13/// Telemetry storage
14pub struct TelemetryStorage {
15    storage_path: PathBuf,
16    index_path: PathBuf,
17}
18
19impl TelemetryStorage {
20    /// Create a new telemetry storage
21    pub fn new(storage_path: &Path) -> Result<Self, TelemetryError> {
22        let index_path = storage_path.join("index.json");
23
24        Ok(Self {
25            storage_path: storage_path.to_path_buf(),
26            index_path,
27        })
28    }
29
30    /// Initialize storage directory
31    pub async fn initialize(&self) -> Result<(), TelemetryError> {
32        fs::create_dir_all(&self.storage_path).await?;
33        Ok(())
34    }
35
36    /// Store a telemetry event
37    pub async fn store_event(&mut self, event: &TelemetryEvent) -> Result<(), TelemetryError> {
38        self.initialize().await?;
39
40        // Create daily event file
41        let date = event.timestamp.format("%Y-%m-%d").to_string();
42        let event_file = self.storage_path.join(format!("events-{}.jsonl", date));
43
44        // Append event to file
45        let mut file = fs::OpenOptions::new()
46            .create(true)
47            .append(true)
48            .open(&event_file)
49            .await?;
50
51        let json = serde_json::to_string(event)?;
52        file.write_all(json.as_bytes()).await?;
53        file.write_all(b"\n").await?;
54        file.flush().await?;
55
56        Ok(())
57    }
58
59    /// Get all events
60    pub async fn get_all_events(&self) -> Result<Vec<TelemetryEvent>, TelemetryError> {
61        let mut events = Vec::new();
62
63        if !self.storage_path.exists() {
64            return Ok(events);
65        }
66
67        let mut entries = fs::read_dir(&self.storage_path).await?;
68
69        while let Some(entry) = entries.next_entry().await? {
70            let path = entry.path();
71            if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
72                let file_events = self.read_events_from_file(&path).await?;
73                events.extend(file_events);
74            }
75        }
76
77        Ok(events)
78    }
79
80    /// Read events from a JSONL file
81    async fn read_events_from_file(
82        &self,
83        path: &Path,
84    ) -> Result<Vec<TelemetryEvent>, TelemetryError> {
85        let content = fs::read_to_string(path).await?;
86        let mut events = Vec::new();
87
88        for line in content.lines() {
89            if line.trim().is_empty() {
90                continue;
91            }
92
93            match serde_json::from_str::<TelemetryEvent>(line) {
94                Ok(event) => events.push(event),
95                Err(e) => {
96                    eprintln!("Failed to parse event: {}", e);
97                    continue;
98                }
99            }
100        }
101
102        Ok(events)
103    }
104
105    /// Get telemetry statistics
106    pub async fn get_statistics(&self) -> Result<TelemetryStatistics, TelemetryError> {
107        let events = self.get_all_events().await?;
108
109        if events.is_empty() {
110            return Ok(TelemetryStatistics::default());
111        }
112
113        let total_events = events.len() as u64;
114        let mut events_by_type: HashMap<String, u64> = HashMap::new();
115        let mut synthesis_count = 0u64;
116        let mut synthesis_durations = Vec::new();
117        let mut error_count = 0u64;
118        let mut command_counts: HashMap<String, u64> = HashMap::new();
119        let mut voice_counts: HashMap<String, u64> = HashMap::new();
120
121        let mut start_time = events[0].timestamp;
122        let mut end_time = events[0].timestamp;
123
124        for event in &events {
125            // Count events by type
126            *events_by_type
127                .entry(event.event_type.to_string())
128                .or_insert(0) += 1;
129
130            // Track time range
131            if event.timestamp < start_time {
132                start_time = event.timestamp;
133            }
134            if event.timestamp > end_time {
135                end_time = event.timestamp;
136            }
137
138            // Synthesis statistics
139            if event.event_type == EventType::SynthesisRequest {
140                synthesis_count += 1;
141                if let Some(duration) = event.metadata.get("duration_ms") {
142                    if let Ok(dur) = duration.parse::<u64>() {
143                        synthesis_durations.push(dur);
144                    }
145                }
146                if let Some(voice) = event.metadata.get("voice") {
147                    *voice_counts.entry(voice.clone()).or_insert(0) += 1;
148                }
149            }
150
151            // Error statistics
152            if event.event_type == EventType::Error {
153                error_count += 1;
154            }
155
156            // Command statistics
157            if event.event_type == EventType::CommandExecuted {
158                if let Some(command) = event.metadata.get("command") {
159                    *command_counts.entry(command.clone()).or_insert(0) += 1;
160                }
161            }
162        }
163
164        // Calculate average synthesis duration
165        let avg_synthesis_duration = if !synthesis_durations.is_empty() {
166            synthesis_durations.iter().sum::<u64>() as f64 / synthesis_durations.len() as f64
167        } else {
168            0.0
169        };
170
171        // Get most used commands
172        let mut most_used_commands: Vec<(String, u64)> = command_counts.into_iter().collect();
173        most_used_commands.sort_by(|a, b| b.1.cmp(&a.1));
174        most_used_commands.truncate(10);
175
176        // Get most used voices
177        let mut most_used_voices: Vec<(String, u64)> = voice_counts.into_iter().collect();
178        most_used_voices.sort_by(|a, b| b.1.cmp(&a.1));
179        most_used_voices.truncate(10);
180
181        // Calculate storage size
182        let storage_size_bytes = self.calculate_storage_size().await?;
183
184        Ok(TelemetryStatistics {
185            total_events,
186            events_by_type,
187            synthesis_requests: synthesis_count,
188            avg_synthesis_duration,
189            total_errors: error_count,
190            most_used_commands,
191            most_used_voices,
192            start_time: Some(start_time),
193            end_time: Some(end_time),
194            storage_size_bytes,
195        })
196    }
197
198    /// Calculate total storage size
199    async fn calculate_storage_size(&self) -> Result<u64, TelemetryError> {
200        let mut total_size = 0u64;
201
202        if !self.storage_path.exists() {
203            return Ok(0);
204        }
205
206        let mut entries = fs::read_dir(&self.storage_path).await?;
207
208        while let Some(entry) = entries.next_entry().await? {
209            let metadata = entry.metadata().await?;
210            total_size += metadata.len();
211        }
212
213        Ok(total_size)
214    }
215
216    /// Clear all telemetry data
217    pub async fn clear(&mut self) -> Result<(), TelemetryError> {
218        if self.storage_path.exists() {
219            fs::remove_dir_all(&self.storage_path).await?;
220        }
221        Ok(())
222    }
223
224    /// Get events within a time range
225    pub async fn get_events_in_range(
226        &self,
227        start: chrono::DateTime<chrono::Utc>,
228        end: chrono::DateTime<chrono::Utc>,
229    ) -> Result<Vec<TelemetryEvent>, TelemetryError> {
230        let all_events = self.get_all_events().await?;
231
232        Ok(all_events
233            .into_iter()
234            .filter(|e| e.timestamp >= start && e.timestamp <= end)
235            .collect())
236    }
237
238    /// Get events by type
239    pub async fn get_events_by_type(
240        &self,
241        event_type: EventType,
242    ) -> Result<Vec<TelemetryEvent>, TelemetryError> {
243        let all_events = self.get_all_events().await?;
244
245        Ok(all_events
246            .into_iter()
247            .filter(|e| e.event_type == event_type)
248            .collect())
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use crate::telemetry::events::TelemetryEvent;
256
257    #[tokio::test]
258    async fn test_storage_creation() {
259        let temp_dir = std::env::temp_dir().join("voirs_storage_test");
260        let storage = TelemetryStorage::new(&temp_dir);
261        assert!(storage.is_ok());
262
263        // Cleanup
264        let _ = std::fs::remove_dir_all(temp_dir);
265    }
266
267    #[tokio::test]
268    async fn test_initialize() {
269        let temp_dir = std::env::temp_dir().join("voirs_storage_test_init");
270        let storage =
271            TelemetryStorage::new(&temp_dir).expect("Failed to create telemetry storage for test");
272
273        let result = storage.initialize().await;
274        assert!(result.is_ok());
275        assert!(temp_dir.exists());
276
277        // Cleanup
278        let _ = std::fs::remove_dir_all(temp_dir);
279    }
280
281    #[tokio::test]
282    async fn test_store_and_retrieve_event() {
283        let temp_dir = std::env::temp_dir().join("voirs_storage_test_store");
284        let mut storage =
285            TelemetryStorage::new(&temp_dir).expect("Failed to create telemetry storage for test");
286
287        let event = TelemetryEvent::command_executed("test".to_string(), 100);
288        assert!(storage.store_event(&event).await.is_ok());
289
290        let events = storage
291            .get_all_events()
292            .await
293            .expect("Failed to retrieve telemetry events");
294        assert_eq!(events.len(), 1);
295        assert_eq!(events[0].event_type, EventType::CommandExecuted);
296
297        // Cleanup
298        let _ = std::fs::remove_dir_all(temp_dir);
299    }
300
301    #[tokio::test]
302    async fn test_get_statistics() {
303        let temp_dir = std::env::temp_dir().join("voirs_storage_test_stats");
304        let mut storage =
305            TelemetryStorage::new(&temp_dir).expect("Failed to create telemetry storage for test");
306
307        // Store multiple events
308        let event1 = TelemetryEvent::command_executed("synthesize".to_string(), 100);
309        let event2 = TelemetryEvent::synthesis_request("voice1".to_string(), 50, 200, true);
310        let event3 = TelemetryEvent::synthesis_request("voice2".to_string(), 60, 250, true);
311
312        storage
313            .store_event(&event1)
314            .await
315            .expect("Failed to store event1");
316        storage
317            .store_event(&event2)
318            .await
319            .expect("Failed to store event2");
320        storage
321            .store_event(&event3)
322            .await
323            .expect("Failed to store event3");
324
325        let stats = storage
326            .get_statistics()
327            .await
328            .expect("Failed to get statistics");
329        assert_eq!(stats.total_events, 3);
330        assert_eq!(stats.synthesis_requests, 2);
331        assert!(stats.avg_synthesis_duration > 0.0);
332
333        // Cleanup
334        let _ = std::fs::remove_dir_all(temp_dir);
335    }
336
337    #[tokio::test]
338    async fn test_clear_data() {
339        let temp_dir = std::env::temp_dir().join("voirs_storage_test_clear");
340        let mut storage =
341            TelemetryStorage::new(&temp_dir).expect("Failed to create telemetry storage for test");
342
343        let event = TelemetryEvent::command_executed("test".to_string(), 100);
344        storage
345            .store_event(&event)
346            .await
347            .expect("Failed to store event");
348
349        assert!(storage.clear().await.is_ok());
350        assert!(!temp_dir.exists());
351    }
352
353    #[tokio::test]
354    async fn test_get_events_by_type() {
355        let temp_dir = std::env::temp_dir().join("voirs_storage_test_by_type");
356        let mut storage =
357            TelemetryStorage::new(&temp_dir).expect("Failed to create telemetry storage for test");
358
359        let event1 = TelemetryEvent::command_executed("test1".to_string(), 100);
360        let event2 = TelemetryEvent::synthesis_request("voice".to_string(), 50, 200, true);
361        let event3 = TelemetryEvent::command_executed("test2".to_string(), 150);
362
363        storage
364            .store_event(&event1)
365            .await
366            .expect("Failed to store event1");
367        storage
368            .store_event(&event2)
369            .await
370            .expect("Failed to store event2");
371        storage
372            .store_event(&event3)
373            .await
374            .expect("Failed to store event3");
375
376        let command_events = storage
377            .get_events_by_type(EventType::CommandExecuted)
378            .await
379            .expect("Failed to get events by type CommandExecuted");
380        assert_eq!(command_events.len(), 2);
381
382        let synthesis_events = storage
383            .get_events_by_type(EventType::SynthesisRequest)
384            .await
385            .expect("Failed to get events by type SynthesisRequest");
386        assert_eq!(synthesis_events.len(), 1);
387
388        // Cleanup
389        let _ = std::fs::remove_dir_all(temp_dir);
390    }
391}