Skip to main content

aegis_timeseries/
persistence.rs

1//! Aegis Time Series Persistence
2//!
3//! Serialization and deserialization of time series data for crash recovery.
4//! Uses bincode for compact binary encoding of series buffers and compressed blocks.
5//!
6//! @version 0.1.0
7//! @author AutomataNexus Development Team
8
9use crate::compression::CompressedBlock;
10use crate::types::{DataPoint, Metric, Tags};
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13
14// =============================================================================
15// Persisted Data Structures
16// =============================================================================
17
18/// Serializable representation of all time series data.
19#[derive(Serialize, Deserialize)]
20pub struct PersistedState {
21    pub version: u32,
22    pub metrics: Vec<Metric>,
23    pub series: Vec<PersistedSeries>,
24}
25
26/// Serializable representation of a single series buffer.
27#[derive(Serialize, Deserialize)]
28pub struct PersistedSeries {
29    pub series_id: String,
30    pub metric: Metric,
31    pub tags: Tags,
32    pub points: Vec<DataPoint>,
33    pub compressed_blocks: Vec<PersistedBlock>,
34}
35
36/// Serializable representation of a compressed block.
37#[derive(Serialize, Deserialize)]
38pub struct PersistedBlock {
39    pub data: Vec<u8>,
40    pub first_timestamp: i64,
41    pub last_timestamp: i64,
42    pub count: usize,
43    pub checksum: u32,
44}
45
46impl From<&CompressedBlock> for PersistedBlock {
47    fn from(block: &CompressedBlock) -> Self {
48        Self {
49            data: block.data.clone(),
50            first_timestamp: block.first_timestamp,
51            last_timestamp: block.last_timestamp,
52            count: block.count,
53            checksum: block.checksum,
54        }
55    }
56}
57
58impl From<PersistedBlock> for CompressedBlock {
59    fn from(pb: PersistedBlock) -> Self {
60        Self {
61            data: pb.data,
62            first_timestamp: pb.first_timestamp,
63            last_timestamp: pb.last_timestamp,
64            count: pb.count,
65            checksum: pb.checksum,
66        }
67    }
68}
69
70// =============================================================================
71// Persistence Manager
72// =============================================================================
73
74/// Manages reading and writing time series data to disk.
75pub struct PersistenceManager {
76    data_path: PathBuf,
77}
78
79impl PersistenceManager {
80    /// Create a new persistence manager for the given directory.
81    pub fn new(data_path: impl Into<PathBuf>) -> std::io::Result<Self> {
82        let data_path = data_path.into();
83        std::fs::create_dir_all(&data_path)?;
84        Ok(Self { data_path })
85    }
86
87    /// Path to the main data file.
88    fn data_file(&self) -> PathBuf {
89        self.data_path.join("timeseries.bin")
90    }
91
92    /// Path to the temporary write file (for atomic writes).
93    fn temp_file(&self) -> PathBuf {
94        self.data_path.join("timeseries.bin.tmp")
95    }
96
97    /// Save state to disk using atomic write (write to temp, then rename).
98    pub fn save(&self, state: &PersistedState) -> Result<(), PersistenceError> {
99        let encoded = bincode::serialize(state)
100            .map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
101
102        let temp_path = self.temp_file();
103        let data_path = self.data_file();
104
105        // Write to temp file first
106        std::fs::write(&temp_path, &encoded)
107            .map_err(|e| PersistenceError::IoError(e.to_string()))?;
108
109        // Atomic rename
110        std::fs::rename(&temp_path, &data_path)
111            .map_err(|e| PersistenceError::IoError(e.to_string()))?;
112
113        Ok(())
114    }
115
116    /// Load state from disk.
117    pub fn load(&self) -> Result<Option<PersistedState>, PersistenceError> {
118        let data_path = self.data_file();
119
120        if !data_path.exists() {
121            return Ok(None);
122        }
123
124        let data =
125            std::fs::read(&data_path).map_err(|e| PersistenceError::IoError(e.to_string()))?;
126
127        if data.is_empty() {
128            return Ok(None);
129        }
130
131        let state: PersistedState = bincode::deserialize(&data)
132            .map_err(|e| PersistenceError::DeserializationError(e.to_string()))?;
133
134        Ok(Some(state))
135    }
136
137    /// Check if persisted data exists.
138    pub fn exists(&self) -> bool {
139        self.data_file().exists()
140    }
141
142    /// Get the data directory path.
143    pub fn data_path(&self) -> &Path {
144        &self.data_path
145    }
146}
147
148// =============================================================================
149// Persistence Error
150// =============================================================================
151
152/// Errors that can occur during persistence operations.
153#[derive(Debug, Clone)]
154pub enum PersistenceError {
155    IoError(String),
156    SerializationError(String),
157    DeserializationError(String),
158}
159
160impl std::fmt::Display for PersistenceError {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            Self::IoError(msg) => write!(f, "I/O error: {}", msg),
164            Self::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
165            Self::DeserializationError(msg) => write!(f, "Deserialization error: {}", msg),
166        }
167    }
168}
169
170impl std::error::Error for PersistenceError {}
171
172// =============================================================================
173// Tests
174// =============================================================================
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::types::{DataPoint, Metric, Tags};
180    use chrono::Utc;
181
182    #[test]
183    fn test_save_and_load() {
184        let dir = tempfile::tempdir().unwrap();
185        let pm = PersistenceManager::new(dir.path().join("ts")).unwrap();
186
187        let mut tags = Tags::new();
188        tags.insert("host", "server1");
189
190        let state = PersistedState {
191            version: 1,
192            metrics: vec![Metric::gauge("cpu_usage")],
193            series: vec![PersistedSeries {
194                series_id: "cpu_usage:host=server1".to_string(),
195                metric: Metric::gauge("cpu_usage"),
196                tags,
197                points: vec![DataPoint {
198                    timestamp: Utc::now(),
199                    value: 42.5,
200                }],
201                compressed_blocks: vec![],
202            }],
203        };
204
205        pm.save(&state).unwrap();
206        assert!(pm.exists());
207
208        let loaded = pm.load().unwrap().unwrap();
209        assert_eq!(loaded.version, 1);
210        assert_eq!(loaded.metrics.len(), 1);
211        assert_eq!(loaded.series.len(), 1);
212        assert_eq!(loaded.series[0].points[0].value, 42.5);
213    }
214
215    #[test]
216    fn test_load_nonexistent() {
217        let dir = tempfile::tempdir().unwrap();
218        let pm = PersistenceManager::new(dir.path().join("ts")).unwrap();
219        let loaded = pm.load().unwrap();
220        assert!(loaded.is_none());
221    }
222
223    #[test]
224    fn test_persisted_block_roundtrip() {
225        let block = CompressedBlock {
226            data: vec![1, 2, 3, 4],
227            first_timestamp: 1000,
228            last_timestamp: 2000,
229            count: 10,
230            checksum: 12345,
231        };
232
233        let persisted: PersistedBlock = (&block).into();
234        let restored: CompressedBlock = persisted.into();
235
236        assert_eq!(restored.data, block.data);
237        assert_eq!(restored.first_timestamp, block.first_timestamp);
238        assert_eq!(restored.last_timestamp, block.last_timestamp);
239        assert_eq!(restored.count, block.count);
240        assert_eq!(restored.checksum, block.checksum);
241    }
242}