1use crate::metrics::MetricsSnapshot;
4use chrono::{DateTime, Duration, NaiveDate, Utc};
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{error, info, warn};
12
13#[derive(Debug, Clone)]
15pub struct PersistenceConfig {
16 pub data_dir: PathBuf,
18 pub rotation_interval: RotationInterval,
20 pub max_files: usize,
22 pub compress: bool,
24}
25
26#[derive(Debug, Clone, Copy)]
28pub enum RotationInterval {
29 Hourly,
30 Daily,
31 Never,
32}
33
34impl Default for PersistenceConfig {
35 fn default() -> Self {
36 Self {
37 data_dir: PathBuf::from("./data/metrics"),
38 rotation_interval: RotationInterval::Hourly,
39 max_files: 168, compress: false,
41 }
42 }
43}
44
45#[derive(Debug, Serialize, Deserialize)]
47pub struct PersistedMetrics {
48 pub timestamp: DateTime<Utc>,
49 pub snapshot: MetricsSnapshot,
50}
51
52pub struct MetricsPersistence {
54 config: PersistenceConfig,
55 current_file: Arc<RwLock<Option<File>>>,
56 current_file_path: Arc<RwLock<PathBuf>>,
57}
58
59impl MetricsPersistence {
60 pub fn new(config: PersistenceConfig) -> Result<Self, std::io::Error> {
62 fs::create_dir_all(&config.data_dir)?;
64
65 Ok(Self {
66 config,
67 current_file: Arc::new(RwLock::new(None)),
68 current_file_path: Arc::new(RwLock::new(PathBuf::new())),
69 })
70 }
71
72 pub async fn save_snapshot(&self, snapshot: MetricsSnapshot) -> Result<(), std::io::Error> {
74 let persisted = PersistedMetrics {
75 timestamp: Utc::now(),
76 snapshot,
77 };
78
79 let json = serde_json::to_string(&persisted)?;
80
81 let file_path = self.get_current_file_path().await;
83 let mut current_path = self.current_file_path.write().await;
84
85 if *current_path != file_path {
87 self.rotate_file(&file_path).await?;
88 *current_path = file_path.clone();
89 }
90
91 let mut file_guard = self.current_file.write().await;
93 if let Some(file) = file_guard.as_mut() {
94 writeln!(file, "{json}")?;
95 file.flush()?;
96 }
97
98 Ok(())
99 }
100
101 pub async fn load_range(
103 &self,
104 start: DateTime<Utc>,
105 end: DateTime<Utc>,
106 ) -> Result<Vec<PersistedMetrics>, std::io::Error> {
107 let mut all_metrics = Vec::new();
108
109 let files = self.find_files_in_range(start, end).await?;
111
112 for file_path in files {
114 let file = File::open(&file_path)?;
115 let reader = BufReader::new(file);
116
117 for line in reader.lines() {
118 let line = line?;
119 if line.trim().is_empty() {
120 continue;
121 }
122
123 match serde_json::from_str::<PersistedMetrics>(&line) {
124 Ok(metrics) => {
125 if metrics.timestamp >= start && metrics.timestamp <= end {
126 all_metrics.push(metrics);
127 }
128 }
129 Err(e) => {
130 warn!("Failed to parse metrics line: {}", e);
131 }
132 }
133 }
134 }
135
136 all_metrics.sort_by_key(|m| m.timestamp);
138
139 Ok(all_metrics)
140 }
141
142 pub async fn load_latest(&self) -> Result<Option<MetricsSnapshot>, std::io::Error> {
144 let files = self.list_metrics_files().await?;
145
146 for file_path in files.iter().rev() {
148 let file = File::open(file_path)?;
149 let reader = BufReader::new(file);
150
151 let mut last_metrics = None;
153 for line in reader.lines() {
154 let line = line?;
155 if line.trim().is_empty() {
156 continue;
157 }
158
159 if let Ok(metrics) = serde_json::from_str::<PersistedMetrics>(&line) {
160 last_metrics = Some(metrics);
161 }
162 }
163
164 if let Some(metrics) = last_metrics {
165 return Ok(Some(metrics.snapshot));
166 }
167 }
168
169 Ok(None)
170 }
171
172 pub async fn cleanup(&self) -> Result<(), std::io::Error> {
174 let files = self.list_metrics_files().await?;
175
176 if files.len() > self.config.max_files {
177 let files_to_remove = files.len() - self.config.max_files;
178
179 for file_path in files.iter().take(files_to_remove) {
180 info!("Removing old metrics file: {:?}", file_path);
181 fs::remove_file(file_path)?;
182 }
183 }
184
185 Ok(())
186 }
187
188 async fn get_current_file_path(&self) -> PathBuf {
190 let now = Utc::now();
191 let filename = match self.config.rotation_interval {
192 RotationInterval::Hourly => {
193 format!("metrics_{}.jsonl", now.format("%Y%m%d_%H"))
194 }
195 RotationInterval::Daily => {
196 format!("metrics_{}.jsonl", now.format("%Y%m%d"))
197 }
198 RotationInterval::Never => "metrics.jsonl".to_string(),
199 };
200
201 self.config.data_dir.join(filename)
202 }
203
204 async fn rotate_file(&self, new_path: &Path) -> Result<(), std::io::Error> {
206 let mut file_guard = self.current_file.write().await;
207
208 if let Some(mut file) = file_guard.take() {
210 file.flush()?;
211 }
212
213 let new_file = OpenOptions::new()
215 .create(true)
216 .append(true)
217 .open(new_path)?;
218
219 *file_guard = Some(new_file);
220
221 info!("Rotated to new metrics file: {:?}", new_path);
222
223 let config = self.config.clone();
225 let data_dir = self.config.data_dir.clone();
226 tokio::spawn(async move {
227 if let Err(e) = cleanup_old_files(&data_dir, config.max_files).await {
228 error!("Failed to cleanup old metrics files: {}", e);
229 }
230 });
231
232 Ok(())
233 }
234
235 async fn list_metrics_files(&self) -> Result<Vec<PathBuf>, std::io::Error> {
237 let mut files = Vec::new();
238
239 for entry in fs::read_dir(&self.config.data_dir)? {
240 let entry = entry?;
241 let path = entry.path();
242
243 if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
244 files.push(path);
245 }
246 }
247
248 files.sort();
250
251 Ok(files)
252 }
253
254 async fn find_files_in_range(
256 &self,
257 start: DateTime<Utc>,
258 end: DateTime<Utc>,
259 ) -> Result<Vec<PathBuf>, std::io::Error> {
260 let all_files = self.list_metrics_files().await?;
261 let mut relevant_files = Vec::new();
262
263 for file_path in all_files {
264 if let Some(file_time) =
266 parse_file_timestamp(&file_path, &self.config.rotation_interval)
267 {
268 let file_end = match self.config.rotation_interval {
270 RotationInterval::Hourly => file_time + Duration::hours(1),
271 RotationInterval::Daily => file_time + Duration::days(1),
272 RotationInterval::Never => end, };
274
275 if file_time <= end && file_end >= start {
276 relevant_files.push(file_path);
277 }
278 }
279 }
280
281 Ok(relevant_files)
282 }
283}
284
285fn parse_file_timestamp(path: &Path, interval: &RotationInterval) -> Option<DateTime<Utc>> {
287 let filename = path.file_stem()?.to_str()?;
288
289 match interval {
290 RotationInterval::Hourly => {
291 if filename.starts_with("metrics_") && filename.len() >= 19 {
293 let timestamp_str = &filename[8..19]; if let Some((date_str, hour_str)) = timestamp_str.split_once('_') {
296 if let (Ok(date), Ok(hour)) = (
297 NaiveDate::parse_from_str(date_str, "%Y%m%d"),
298 hour_str.parse::<u32>(),
299 ) {
300 date.and_hms_opt(hour, 0, 0).map(|dt| dt.and_utc())
301 } else {
302 None
303 }
304 } else {
305 None
306 }
307 } else {
308 None
309 }
310 }
311 RotationInterval::Daily => {
312 if filename.starts_with("metrics_") && filename.len() >= 16 {
314 let timestamp_str = &filename[8..16]; chrono::NaiveDate::parse_from_str(timestamp_str, "%Y%m%d")
316 .ok()
317 .map(|date| date.and_hms_opt(0, 0, 0).unwrap().and_utc())
318 } else {
319 None
320 }
321 }
322 RotationInterval::Never => Some(Utc::now()), }
324}
325
326async fn cleanup_old_files(data_dir: &Path, max_files: usize) -> Result<(), std::io::Error> {
328 let mut files = Vec::new();
329
330 for entry in fs::read_dir(data_dir)? {
331 let entry = entry?;
332 let path = entry.path();
333
334 if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
335 if let Ok(metadata) = entry.metadata() {
336 files.push((path, metadata.modified()?));
337 }
338 }
339 }
340
341 files.sort_by_key(|(_, time)| *time);
343
344 if files.len() > max_files {
346 let files_to_remove = files.len() - max_files;
347
348 for (path, _) in files.iter().take(files_to_remove) {
349 info!("Removing old metrics file: {:?}", path);
350 fs::remove_file(path)?;
351 }
352 }
353
354 Ok(())
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::{BusinessMetrics, ErrorMetrics, HealthMetrics, RequestMetrics};
361
362 #[tokio::test]
363 async fn test_metrics_persistence() {
364 let _config = PersistenceConfig {
365 data_dir: std::env::temp_dir()
366 .join("mcp-logging-persistence-test")
367 .join("metrics"),
368 rotation_interval: RotationInterval::Never,
369 max_files: 10,
370 compress: false,
371 };
372
373 let snapshot = MetricsSnapshot {
375 request_metrics: RequestMetrics::default(),
376 health_metrics: HealthMetrics::default(),
377 business_metrics: BusinessMetrics::default(),
378 error_metrics: ErrorMetrics::default(),
379 snapshot_timestamp: 1234567890,
380 };
381
382 let serialized = serde_json::to_string(&snapshot).unwrap();
384 let deserialized: MetricsSnapshot = serde_json::from_str(&serialized).unwrap();
385 assert_eq!(deserialized.snapshot_timestamp, snapshot.snapshot_timestamp);
386 }
387
388 #[test]
389 fn test_parse_file_timestamp() {
390 let path = Path::new("metrics_20240107_14.jsonl");
391 let timestamp = parse_file_timestamp(path, &RotationInterval::Hourly);
392 assert!(timestamp.is_some());
393
394 let path = Path::new("metrics_20240107.jsonl");
395 let timestamp = parse_file_timestamp(path, &RotationInterval::Daily);
396 assert!(timestamp.is_some());
397 }
398}