1use crate::metrics::MetricsSnapshot;
4use chrono::{DateTime, Duration, NaiveDate, Utc};
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{error, info, warn};
12
13#[derive(Debug, Clone)]
15pub struct PersistenceConfig {
16    pub data_dir: PathBuf,
18    pub rotation_interval: RotationInterval,
20    pub max_files: usize,
22    pub compress: bool,
24}
25
26#[derive(Debug, Clone, Copy)]
28pub enum RotationInterval {
29    Hourly,
30    Daily,
31    Never,
32}
33
34impl Default for PersistenceConfig {
35    fn default() -> Self {
36        Self {
37            data_dir: PathBuf::from("./data/metrics"),
38            rotation_interval: RotationInterval::Hourly,
39            max_files: 168, compress: false,
41        }
42    }
43}
44
45#[derive(Debug, Serialize, Deserialize)]
47pub struct PersistedMetrics {
48    pub timestamp: DateTime<Utc>,
49    pub snapshot: MetricsSnapshot,
50}
51
52pub struct MetricsPersistence {
54    config: PersistenceConfig,
55    current_file: Arc<RwLock<Option<File>>>,
56    current_file_path: Arc<RwLock<PathBuf>>,
57}
58
59impl MetricsPersistence {
60    pub fn new(config: PersistenceConfig) -> Result<Self, std::io::Error> {
62        fs::create_dir_all(&config.data_dir)?;
64
65        Ok(Self {
66            config,
67            current_file: Arc::new(RwLock::new(None)),
68            current_file_path: Arc::new(RwLock::new(PathBuf::new())),
69        })
70    }
71
72    pub async fn save_snapshot(&self, snapshot: MetricsSnapshot) -> Result<(), std::io::Error> {
74        let persisted = PersistedMetrics {
75            timestamp: Utc::now(),
76            snapshot,
77        };
78
79        let json = serde_json::to_string(&persisted)?;
80
81        let file_path = self.get_current_file_path().await;
83        let mut current_path = self.current_file_path.write().await;
84
85        if *current_path != file_path {
87            self.rotate_file(&file_path).await?;
88            *current_path = file_path.clone();
89        }
90
91        let mut file_guard = self.current_file.write().await;
93        if let Some(file) = file_guard.as_mut() {
94            writeln!(file, "{json}")?;
95            file.flush()?;
96        }
97
98        Ok(())
99    }
100
101    pub async fn load_range(
103        &self,
104        start: DateTime<Utc>,
105        end: DateTime<Utc>,
106    ) -> Result<Vec<PersistedMetrics>, std::io::Error> {
107        let mut all_metrics = Vec::new();
108
109        let files = self.find_files_in_range(start, end).await?;
111
112        for file_path in files {
114            let file = File::open(&file_path)?;
115            let reader = BufReader::new(file);
116
117            for line in reader.lines() {
118                let line = line?;
119                if line.trim().is_empty() {
120                    continue;
121                }
122
123                match serde_json::from_str::<PersistedMetrics>(&line) {
124                    Ok(metrics) => {
125                        if metrics.timestamp >= start && metrics.timestamp <= end {
126                            all_metrics.push(metrics);
127                        }
128                    }
129                    Err(e) => {
130                        warn!("Failed to parse metrics line: {}", e);
131                    }
132                }
133            }
134        }
135
136        all_metrics.sort_by_key(|m| m.timestamp);
138
139        Ok(all_metrics)
140    }
141
142    pub async fn load_latest(&self) -> Result<Option<MetricsSnapshot>, std::io::Error> {
144        let files = self.list_metrics_files().await?;
145
146        for file_path in files.iter().rev() {
148            let file = File::open(file_path)?;
149            let reader = BufReader::new(file);
150
151            let mut last_metrics = None;
153            for line in reader.lines() {
154                let line = line?;
155                if line.trim().is_empty() {
156                    continue;
157                }
158
159                if let Ok(metrics) = serde_json::from_str::<PersistedMetrics>(&line) {
160                    last_metrics = Some(metrics);
161                }
162            }
163
164            if let Some(metrics) = last_metrics {
165                return Ok(Some(metrics.snapshot));
166            }
167        }
168
169        Ok(None)
170    }
171
172    pub async fn cleanup(&self) -> Result<(), std::io::Error> {
174        let files = self.list_metrics_files().await?;
175
176        if files.len() > self.config.max_files {
177            let files_to_remove = files.len() - self.config.max_files;
178
179            for file_path in files.iter().take(files_to_remove) {
180                info!("Removing old metrics file: {:?}", file_path);
181                fs::remove_file(file_path)?;
182            }
183        }
184
185        Ok(())
186    }
187
188    async fn get_current_file_path(&self) -> PathBuf {
190        let now = Utc::now();
191        let filename = match self.config.rotation_interval {
192            RotationInterval::Hourly => {
193                format!("metrics_{}.jsonl", now.format("%Y%m%d_%H"))
194            }
195            RotationInterval::Daily => {
196                format!("metrics_{}.jsonl", now.format("%Y%m%d"))
197            }
198            RotationInterval::Never => "metrics.jsonl".to_string(),
199        };
200
201        self.config.data_dir.join(filename)
202    }
203
204    async fn rotate_file(&self, new_path: &Path) -> Result<(), std::io::Error> {
206        let mut file_guard = self.current_file.write().await;
207
208        if let Some(mut file) = file_guard.take() {
210            file.flush()?;
211        }
212
213        let new_file = OpenOptions::new()
215            .create(true)
216            .append(true)
217            .open(new_path)?;
218
219        *file_guard = Some(new_file);
220
221        info!("Rotated to new metrics file: {:?}", new_path);
222
223        let config = self.config.clone();
225        let data_dir = self.config.data_dir.clone();
226        tokio::spawn(async move {
227            if let Err(e) = cleanup_old_files(&data_dir, config.max_files).await {
228                error!("Failed to cleanup old metrics files: {}", e);
229            }
230        });
231
232        Ok(())
233    }
234
235    async fn list_metrics_files(&self) -> Result<Vec<PathBuf>, std::io::Error> {
237        let mut files = Vec::new();
238
239        for entry in fs::read_dir(&self.config.data_dir)? {
240            let entry = entry?;
241            let path = entry.path();
242
243            if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
244                files.push(path);
245            }
246        }
247
248        files.sort();
250
251        Ok(files)
252    }
253
254    async fn find_files_in_range(
256        &self,
257        start: DateTime<Utc>,
258        end: DateTime<Utc>,
259    ) -> Result<Vec<PathBuf>, std::io::Error> {
260        let all_files = self.list_metrics_files().await?;
261        let mut relevant_files = Vec::new();
262
263        for file_path in all_files {
264            if let Some(file_time) =
266                parse_file_timestamp(&file_path, &self.config.rotation_interval)
267            {
268                let file_end = match self.config.rotation_interval {
270                    RotationInterval::Hourly => file_time + Duration::hours(1),
271                    RotationInterval::Daily => file_time + Duration::days(1),
272                    RotationInterval::Never => end, };
274
275                if file_time <= end && file_end >= start {
276                    relevant_files.push(file_path);
277                }
278            }
279        }
280
281        Ok(relevant_files)
282    }
283}
284
285fn parse_file_timestamp(path: &Path, interval: &RotationInterval) -> Option<DateTime<Utc>> {
287    let filename = path.file_stem()?.to_str()?;
288
289    match interval {
290        RotationInterval::Hourly => {
291            if filename.starts_with("metrics_") && filename.len() >= 19 {
293                let timestamp_str = &filename[8..19]; if let Some((date_str, hour_str)) = timestamp_str.split_once('_') {
296                    if let (Ok(date), Ok(hour)) = (
297                        NaiveDate::parse_from_str(date_str, "%Y%m%d"),
298                        hour_str.parse::<u32>(),
299                    ) {
300                        date.and_hms_opt(hour, 0, 0).map(|dt| dt.and_utc())
301                    } else {
302                        None
303                    }
304                } else {
305                    None
306                }
307            } else {
308                None
309            }
310        }
311        RotationInterval::Daily => {
312            if filename.starts_with("metrics_") && filename.len() >= 16 {
314                let timestamp_str = &filename[8..16]; chrono::NaiveDate::parse_from_str(timestamp_str, "%Y%m%d")
316                    .ok()
317                    .map(|date| date.and_hms_opt(0, 0, 0).unwrap().and_utc())
318            } else {
319                None
320            }
321        }
322        RotationInterval::Never => Some(Utc::now()), }
324}
325
326async fn cleanup_old_files(data_dir: &Path, max_files: usize) -> Result<(), std::io::Error> {
328    let mut files = Vec::new();
329
330    for entry in fs::read_dir(data_dir)? {
331        let entry = entry?;
332        let path = entry.path();
333
334        if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
335            if let Ok(metadata) = entry.metadata() {
336                files.push((path, metadata.modified()?));
337            }
338        }
339    }
340
341    files.sort_by_key(|(_, time)| *time);
343
344    if files.len() > max_files {
346        let files_to_remove = files.len() - max_files;
347
348        for (path, _) in files.iter().take(files_to_remove) {
349            info!("Removing old metrics file: {:?}", path);
350            fs::remove_file(path)?;
351        }
352    }
353
354    Ok(())
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::{BusinessMetrics, ErrorMetrics, HealthMetrics, RequestMetrics};
361
362    #[tokio::test]
363    async fn test_metrics_persistence() {
364        let _config = PersistenceConfig {
365            data_dir: std::env::temp_dir()
366                .join("mcp-logging-persistence-test")
367                .join("metrics"),
368            rotation_interval: RotationInterval::Never,
369            max_files: 10,
370            compress: false,
371        };
372
373        let snapshot = MetricsSnapshot {
375            request_metrics: RequestMetrics::default(),
376            health_metrics: HealthMetrics::default(),
377            business_metrics: BusinessMetrics::default(),
378            error_metrics: ErrorMetrics::default(),
379            snapshot_timestamp: 1234567890,
380        };
381
382        let serialized = serde_json::to_string(&snapshot).unwrap();
384        let deserialized: MetricsSnapshot = serde_json::from_str(&serialized).unwrap();
385        assert_eq!(deserialized.snapshot_timestamp, snapshot.snapshot_timestamp);
386    }
387
388    #[test]
389    fn test_parse_file_timestamp() {
390        let path = Path::new("metrics_20240107_14.jsonl");
391        let timestamp = parse_file_timestamp(path, &RotationInterval::Hourly);
392        assert!(timestamp.is_some());
393
394        let path = Path::new("metrics_20240107.jsonl");
395        let timestamp = parse_file_timestamp(path, &RotationInterval::Daily);
396        assert!(timestamp.is_some());
397    }
398}