aegis_timeseries/
persistence.rs1use crate::compression::CompressedBlock;
10use crate::types::{DataPoint, Metric, Tags};
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13
14#[derive(Serialize, Deserialize)]
20pub struct PersistedState {
21 pub version: u32,
22 pub metrics: Vec<Metric>,
23 pub series: Vec<PersistedSeries>,
24}
25
26#[derive(Serialize, Deserialize)]
28pub struct PersistedSeries {
29 pub series_id: String,
30 pub metric: Metric,
31 pub tags: Tags,
32 pub points: Vec<DataPoint>,
33 pub compressed_blocks: Vec<PersistedBlock>,
34}
35
36#[derive(Serialize, Deserialize)]
38pub struct PersistedBlock {
39 pub data: Vec<u8>,
40 pub first_timestamp: i64,
41 pub last_timestamp: i64,
42 pub count: usize,
43 pub checksum: u32,
44}
45
46impl From<&CompressedBlock> for PersistedBlock {
47 fn from(block: &CompressedBlock) -> Self {
48 Self {
49 data: block.data.clone(),
50 first_timestamp: block.first_timestamp,
51 last_timestamp: block.last_timestamp,
52 count: block.count,
53 checksum: block.checksum,
54 }
55 }
56}
57
58impl From<PersistedBlock> for CompressedBlock {
59 fn from(pb: PersistedBlock) -> Self {
60 Self {
61 data: pb.data,
62 first_timestamp: pb.first_timestamp,
63 last_timestamp: pb.last_timestamp,
64 count: pb.count,
65 checksum: pb.checksum,
66 }
67 }
68}
69
70pub struct PersistenceManager {
76 data_path: PathBuf,
77}
78
79impl PersistenceManager {
80 pub fn new(data_path: impl Into<PathBuf>) -> std::io::Result<Self> {
82 let data_path = data_path.into();
83 std::fs::create_dir_all(&data_path)?;
84 Ok(Self { data_path })
85 }
86
87 fn data_file(&self) -> PathBuf {
89 self.data_path.join("timeseries.bin")
90 }
91
92 fn temp_file(&self) -> PathBuf {
94 self.data_path.join("timeseries.bin.tmp")
95 }
96
97 pub fn save(&self, state: &PersistedState) -> Result<(), PersistenceError> {
99 let encoded = bincode::serialize(state)
100 .map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
101
102 let temp_path = self.temp_file();
103 let data_path = self.data_file();
104
105 std::fs::write(&temp_path, &encoded)
107 .map_err(|e| PersistenceError::IoError(e.to_string()))?;
108
109 std::fs::rename(&temp_path, &data_path)
111 .map_err(|e| PersistenceError::IoError(e.to_string()))?;
112
113 Ok(())
114 }
115
116 pub fn load(&self) -> Result<Option<PersistedState>, PersistenceError> {
118 let data_path = self.data_file();
119
120 if !data_path.exists() {
121 return Ok(None);
122 }
123
124 let data =
125 std::fs::read(&data_path).map_err(|e| PersistenceError::IoError(e.to_string()))?;
126
127 if data.is_empty() {
128 return Ok(None);
129 }
130
131 let state: PersistedState = bincode::deserialize(&data)
132 .map_err(|e| PersistenceError::DeserializationError(e.to_string()))?;
133
134 Ok(Some(state))
135 }
136
137 pub fn exists(&self) -> bool {
139 self.data_file().exists()
140 }
141
142 pub fn data_path(&self) -> &Path {
144 &self.data_path
145 }
146}
147
148#[derive(Debug, Clone)]
154pub enum PersistenceError {
155 IoError(String),
156 SerializationError(String),
157 DeserializationError(String),
158}
159
160impl std::fmt::Display for PersistenceError {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 Self::IoError(msg) => write!(f, "I/O error: {}", msg),
164 Self::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
165 Self::DeserializationError(msg) => write!(f, "Deserialization error: {}", msg),
166 }
167 }
168}
169
170impl std::error::Error for PersistenceError {}
171
172#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::types::{DataPoint, Metric, Tags};
180 use chrono::Utc;
181
182 #[test]
183 fn test_save_and_load() {
184 let dir = tempfile::tempdir().unwrap();
185 let pm = PersistenceManager::new(dir.path().join("ts")).unwrap();
186
187 let mut tags = Tags::new();
188 tags.insert("host", "server1");
189
190 let state = PersistedState {
191 version: 1,
192 metrics: vec![Metric::gauge("cpu_usage")],
193 series: vec![PersistedSeries {
194 series_id: "cpu_usage:host=server1".to_string(),
195 metric: Metric::gauge("cpu_usage"),
196 tags,
197 points: vec![DataPoint {
198 timestamp: Utc::now(),
199 value: 42.5,
200 }],
201 compressed_blocks: vec![],
202 }],
203 };
204
205 pm.save(&state).unwrap();
206 assert!(pm.exists());
207
208 let loaded = pm.load().unwrap().unwrap();
209 assert_eq!(loaded.version, 1);
210 assert_eq!(loaded.metrics.len(), 1);
211 assert_eq!(loaded.series.len(), 1);
212 assert_eq!(loaded.series[0].points[0].value, 42.5);
213 }
214
215 #[test]
216 fn test_load_nonexistent() {
217 let dir = tempfile::tempdir().unwrap();
218 let pm = PersistenceManager::new(dir.path().join("ts")).unwrap();
219 let loaded = pm.load().unwrap();
220 assert!(loaded.is_none());
221 }
222
223 #[test]
224 fn test_persisted_block_roundtrip() {
225 let block = CompressedBlock {
226 data: vec![1, 2, 3, 4],
227 first_timestamp: 1000,
228 last_timestamp: 2000,
229 count: 10,
230 checksum: 12345,
231 };
232
233 let persisted: PersistedBlock = (&block).into();
234 let restored: CompressedBlock = persisted.into();
235
236 assert_eq!(restored.data, block.data);
237 assert_eq!(restored.first_timestamp, block.first_timestamp);
238 assert_eq!(restored.last_timestamp, block.last_timestamp);
239 assert_eq!(restored.count, block.count);
240 assert_eq!(restored.checksum, block.checksum);
241 }
242}