pulseengine_mcp_logging/
persistence.rs

1//! Metrics persistence for historical data
2
3use crate::metrics::MetricsSnapshot;
4use chrono::{DateTime, Duration, NaiveDate, Utc};
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{error, info, warn};
12
13/// Metrics persistence configuration
14#[derive(Debug, Clone)]
15pub struct PersistenceConfig {
16    /// Directory to store metrics files
17    pub data_dir: PathBuf,
18    /// Rotation interval (e.g., hourly, daily)
19    pub rotation_interval: RotationInterval,
20    /// Maximum number of files to keep
21    pub max_files: usize,
22    /// Enable compression for old files
23    pub compress: bool,
24}
25
26/// Rotation interval for metrics files
27#[derive(Debug, Clone, Copy)]
28pub enum RotationInterval {
29    Hourly,
30    Daily,
31    Never,
32}
33
34impl Default for PersistenceConfig {
35    fn default() -> Self {
36        Self {
37            data_dir: PathBuf::from("./data/metrics"),
38            rotation_interval: RotationInterval::Hourly,
39            max_files: 168, // 7 days of hourly files
40            compress: false,
41        }
42    }
43}
44
45/// Persisted metrics entry
46#[derive(Debug, Serialize, Deserialize)]
47pub struct PersistedMetrics {
48    pub timestamp: DateTime<Utc>,
49    pub snapshot: MetricsSnapshot,
50}
51
52/// Metrics persistence manager
53pub struct MetricsPersistence {
54    config: PersistenceConfig,
55    current_file: Arc<RwLock<Option<File>>>,
56    current_file_path: Arc<RwLock<PathBuf>>,
57}
58
59impl MetricsPersistence {
60    /// Create a new metrics persistence manager
61    pub fn new(config: PersistenceConfig) -> Result<Self, std::io::Error> {
62        // Ensure data directory exists
63        fs::create_dir_all(&config.data_dir)?;
64
65        Ok(Self {
66            config,
67            current_file: Arc::new(RwLock::new(None)),
68            current_file_path: Arc::new(RwLock::new(PathBuf::new())),
69        })
70    }
71
72    /// Save a metrics snapshot
73    pub async fn save_snapshot(&self, snapshot: MetricsSnapshot) -> Result<(), std::io::Error> {
74        let persisted = PersistedMetrics {
75            timestamp: Utc::now(),
76            snapshot,
77        };
78
79        let json = serde_json::to_string(&persisted)?;
80
81        // Get or create current file
82        let file_path = self.get_current_file_path().await;
83        let mut current_path = self.current_file_path.write().await;
84
85        // Check if we need to rotate
86        if *current_path != file_path {
87            self.rotate_file(&file_path).await?;
88            *current_path = file_path.clone();
89        }
90
91        // Write to file
92        let mut file_guard = self.current_file.write().await;
93        if let Some(file) = file_guard.as_mut() {
94            writeln!(file, "{json}")?;
95            file.flush()?;
96        }
97
98        Ok(())
99    }
100
101    /// Load metrics from a time range
102    pub async fn load_range(
103        &self,
104        start: DateTime<Utc>,
105        end: DateTime<Utc>,
106    ) -> Result<Vec<PersistedMetrics>, std::io::Error> {
107        let mut all_metrics = Vec::new();
108
109        // Find relevant files
110        let files = self.find_files_in_range(start, end).await?;
111
112        // Read each file
113        for file_path in files {
114            let file = File::open(&file_path)?;
115            let reader = BufReader::new(file);
116
117            for line in reader.lines() {
118                let line = line?;
119                if line.trim().is_empty() {
120                    continue;
121                }
122
123                match serde_json::from_str::<PersistedMetrics>(&line) {
124                    Ok(metrics) => {
125                        if metrics.timestamp >= start && metrics.timestamp <= end {
126                            all_metrics.push(metrics);
127                        }
128                    }
129                    Err(e) => {
130                        warn!("Failed to parse metrics line: {}", e);
131                    }
132                }
133            }
134        }
135
136        // Sort by timestamp
137        all_metrics.sort_by_key(|m| m.timestamp);
138
139        Ok(all_metrics)
140    }
141
142    /// Load the most recent metrics snapshot
143    pub async fn load_latest(&self) -> Result<Option<MetricsSnapshot>, std::io::Error> {
144        let files = self.list_metrics_files().await?;
145
146        // Try files in reverse order (newest first)
147        for file_path in files.iter().rev() {
148            let file = File::open(file_path)?;
149            let reader = BufReader::new(file);
150
151            // Read last non-empty line
152            let mut last_metrics = None;
153            for line in reader.lines() {
154                let line = line?;
155                if line.trim().is_empty() {
156                    continue;
157                }
158
159                if let Ok(metrics) = serde_json::from_str::<PersistedMetrics>(&line) {
160                    last_metrics = Some(metrics);
161                }
162            }
163
164            if let Some(metrics) = last_metrics {
165                return Ok(Some(metrics.snapshot));
166            }
167        }
168
169        Ok(None)
170    }
171
172    /// Clean up old metrics files
173    pub async fn cleanup(&self) -> Result<(), std::io::Error> {
174        let files = self.list_metrics_files().await?;
175
176        if files.len() > self.config.max_files {
177            let files_to_remove = files.len() - self.config.max_files;
178
179            for file_path in files.iter().take(files_to_remove) {
180                info!("Removing old metrics file: {:?}", file_path);
181                fs::remove_file(file_path)?;
182            }
183        }
184
185        Ok(())
186    }
187
188    /// Get the current file path based on rotation interval
189    async fn get_current_file_path(&self) -> PathBuf {
190        let now = Utc::now();
191        let filename = match self.config.rotation_interval {
192            RotationInterval::Hourly => {
193                format!("metrics_{}.jsonl", now.format("%Y%m%d_%H"))
194            }
195            RotationInterval::Daily => {
196                format!("metrics_{}.jsonl", now.format("%Y%m%d"))
197            }
198            RotationInterval::Never => "metrics.jsonl".to_string(),
199        };
200
201        self.config.data_dir.join(filename)
202    }
203
204    /// Rotate to a new file
205    async fn rotate_file(&self, new_path: &Path) -> Result<(), std::io::Error> {
206        let mut file_guard = self.current_file.write().await;
207
208        // Close current file
209        if let Some(mut file) = file_guard.take() {
210            file.flush()?;
211        }
212
213        // Open new file
214        let new_file = OpenOptions::new()
215            .create(true)
216            .append(true)
217            .open(new_path)?;
218
219        *file_guard = Some(new_file);
220
221        info!("Rotated to new metrics file: {:?}", new_path);
222
223        // Trigger cleanup in background
224        let config = self.config.clone();
225        let data_dir = self.config.data_dir.clone();
226        tokio::spawn(async move {
227            if let Err(e) = cleanup_old_files(&data_dir, config.max_files).await {
228                error!("Failed to cleanup old metrics files: {}", e);
229            }
230        });
231
232        Ok(())
233    }
234
235    /// List all metrics files
236    async fn list_metrics_files(&self) -> Result<Vec<PathBuf>, std::io::Error> {
237        let mut files = Vec::new();
238
239        for entry in fs::read_dir(&self.config.data_dir)? {
240            let entry = entry?;
241            let path = entry.path();
242
243            if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
244                files.push(path);
245            }
246        }
247
248        // Sort by filename (which includes timestamp)
249        files.sort();
250
251        Ok(files)
252    }
253
254    /// Find files that might contain metrics in the given time range
255    async fn find_files_in_range(
256        &self,
257        start: DateTime<Utc>,
258        end: DateTime<Utc>,
259    ) -> Result<Vec<PathBuf>, std::io::Error> {
260        let all_files = self.list_metrics_files().await?;
261        let mut relevant_files = Vec::new();
262
263        for file_path in all_files {
264            // Parse timestamp from filename
265            if let Some(file_time) =
266                parse_file_timestamp(&file_path, &self.config.rotation_interval)
267            {
268                // Check if file might contain data in range
269                let file_end = match self.config.rotation_interval {
270                    RotationInterval::Hourly => file_time + Duration::hours(1),
271                    RotationInterval::Daily => file_time + Duration::days(1),
272                    RotationInterval::Never => end, // Always include
273                };
274
275                if file_time <= end && file_end >= start {
276                    relevant_files.push(file_path);
277                }
278            }
279        }
280
281        Ok(relevant_files)
282    }
283}
284
285/// Parse timestamp from metrics filename
286fn parse_file_timestamp(path: &Path, interval: &RotationInterval) -> Option<DateTime<Utc>> {
287    let filename = path.file_stem()?.to_str()?;
288
289    match interval {
290        RotationInterval::Hourly => {
291            // Format: metrics_YYYYMMDD_HH
292            if filename.starts_with("metrics_") && filename.len() >= 19 {
293                let timestamp_str = &filename[8..19]; // Skip "metrics_", extract "YYYYMMDD_HH"
294                // Parse as "20240107_14" -> parse date and hour separately
295                if let Some((date_str, hour_str)) = timestamp_str.split_once('_') {
296                    if let (Ok(date), Ok(hour)) = (
297                        NaiveDate::parse_from_str(date_str, "%Y%m%d"),
298                        hour_str.parse::<u32>(),
299                    ) {
300                        date.and_hms_opt(hour, 0, 0).map(|dt| dt.and_utc())
301                    } else {
302                        None
303                    }
304                } else {
305                    None
306                }
307            } else {
308                None
309            }
310        }
311        RotationInterval::Daily => {
312            // Format: metrics_YYYYMMDD
313            if filename.starts_with("metrics_") && filename.len() >= 16 {
314                let timestamp_str = &filename[8..16]; // Skip "metrics_"
315                chrono::NaiveDate::parse_from_str(timestamp_str, "%Y%m%d")
316                    .ok()
317                    .map(|date| date.and_hms_opt(0, 0, 0).unwrap().and_utc())
318            } else {
319                None
320            }
321        }
322        RotationInterval::Never => Some(Utc::now()), // Always current
323    }
324}
325
326/// Clean up old files in a directory
327async fn cleanup_old_files(data_dir: &Path, max_files: usize) -> Result<(), std::io::Error> {
328    let mut files = Vec::new();
329
330    for entry in fs::read_dir(data_dir)? {
331        let entry = entry?;
332        let path = entry.path();
333
334        if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
335            if let Ok(metadata) = entry.metadata() {
336                files.push((path, metadata.modified()?));
337            }
338        }
339    }
340
341    // Sort by modification time (oldest first)
342    files.sort_by_key(|(_, time)| *time);
343
344    // Remove oldest files if over limit
345    if files.len() > max_files {
346        let files_to_remove = files.len() - max_files;
347
348        for (path, _) in files.iter().take(files_to_remove) {
349            info!("Removing old metrics file: {:?}", path);
350            fs::remove_file(path)?;
351        }
352    }
353
354    Ok(())
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::{BusinessMetrics, ErrorMetrics, HealthMetrics, RequestMetrics};
361
362    #[tokio::test]
363    async fn test_metrics_persistence() {
364        let _config = PersistenceConfig {
365            data_dir: std::env::temp_dir()
366                .join("mcp-logging-persistence-test")
367                .join("metrics"),
368            rotation_interval: RotationInterval::Never,
369            max_files: 10,
370            compress: false,
371        };
372
373        // Create a test snapshot
374        let snapshot = MetricsSnapshot {
375            request_metrics: RequestMetrics::default(),
376            health_metrics: HealthMetrics::default(),
377            business_metrics: BusinessMetrics::default(),
378            error_metrics: ErrorMetrics::default(),
379            snapshot_timestamp: 1234567890,
380        };
381
382        // Test serialization
383        let serialized = serde_json::to_string(&snapshot).unwrap();
384        let deserialized: MetricsSnapshot = serde_json::from_str(&serialized).unwrap();
385        assert_eq!(deserialized.snapshot_timestamp, snapshot.snapshot_timestamp);
386    }
387
388    #[test]
389    fn test_parse_file_timestamp() {
390        let path = Path::new("metrics_20240107_14.jsonl");
391        let timestamp = parse_file_timestamp(path, &RotationInterval::Hourly);
392        assert!(timestamp.is_some());
393
394        let path = Path::new("metrics_20240107.jsonl");
395        let timestamp = parse_file_timestamp(path, &RotationInterval::Daily);
396        assert!(timestamp.is_some());
397    }
398}