use crate::compression::CompressedBlock;
use crate::types::{DataPoint, Metric, Tags};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
#[derive(Serialize, Deserialize)]
pub struct PersistedState {
pub version: u32,
pub metrics: Vec<Metric>,
pub series: Vec<PersistedSeries>,
}
#[derive(Serialize, Deserialize)]
pub struct PersistedSeries {
pub series_id: String,
pub metric: Metric,
pub tags: Tags,
pub points: Vec<DataPoint>,
pub compressed_blocks: Vec<PersistedBlock>,
}
#[derive(Serialize, Deserialize)]
pub struct PersistedBlock {
pub data: Vec<u8>,
pub first_timestamp: i64,
pub last_timestamp: i64,
pub count: usize,
pub checksum: u32,
}
impl From<&CompressedBlock> for PersistedBlock {
fn from(block: &CompressedBlock) -> Self {
Self {
data: block.data.clone(),
first_timestamp: block.first_timestamp,
last_timestamp: block.last_timestamp,
count: block.count,
checksum: block.checksum,
}
}
}
impl From<PersistedBlock> for CompressedBlock {
fn from(pb: PersistedBlock) -> Self {
Self {
data: pb.data,
first_timestamp: pb.first_timestamp,
last_timestamp: pb.last_timestamp,
count: pb.count,
checksum: pb.checksum,
}
}
}
pub struct PersistenceManager {
data_path: PathBuf,
}
impl PersistenceManager {
pub fn new(data_path: impl Into<PathBuf>) -> std::io::Result<Self> {
let data_path = data_path.into();
std::fs::create_dir_all(&data_path)?;
Ok(Self { data_path })
}
fn data_file(&self) -> PathBuf {
self.data_path.join("timeseries.bin")
}
fn temp_file(&self) -> PathBuf {
self.data_path.join("timeseries.bin.tmp")
}
pub fn save(&self, state: &PersistedState) -> Result<(), PersistenceError> {
let encoded = bincode::serialize(state)
.map_err(|e| PersistenceError::SerializationError(e.to_string()))?;
let temp_path = self.temp_file();
let data_path = self.data_file();
std::fs::write(&temp_path, &encoded)
.map_err(|e| PersistenceError::IoError(e.to_string()))?;
std::fs::rename(&temp_path, &data_path)
.map_err(|e| PersistenceError::IoError(e.to_string()))?;
Ok(())
}
pub fn load(&self) -> Result<Option<PersistedState>, PersistenceError> {
let data_path = self.data_file();
if !data_path.exists() {
return Ok(None);
}
let data =
std::fs::read(&data_path).map_err(|e| PersistenceError::IoError(e.to_string()))?;
if data.is_empty() {
return Ok(None);
}
let state: PersistedState = bincode::deserialize(&data)
.map_err(|e| PersistenceError::DeserializationError(e.to_string()))?;
Ok(Some(state))
}
pub fn exists(&self) -> bool {
self.data_file().exists()
}
pub fn data_path(&self) -> &Path {
&self.data_path
}
}
#[derive(Debug, Clone)]
pub enum PersistenceError {
IoError(String),
SerializationError(String),
DeserializationError(String),
}
impl std::fmt::Display for PersistenceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::IoError(msg) => write!(f, "I/O error: {}", msg),
Self::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
Self::DeserializationError(msg) => write!(f, "Deserialization error: {}", msg),
}
}
}
impl std::error::Error for PersistenceError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{DataPoint, Metric, Tags};
use chrono::Utc;
#[test]
fn test_save_and_load() {
let dir = tempfile::tempdir().unwrap();
let pm = PersistenceManager::new(dir.path().join("ts")).unwrap();
let mut tags = Tags::new();
tags.insert("host", "server1");
let state = PersistedState {
version: 1,
metrics: vec![Metric::gauge("cpu_usage")],
series: vec![PersistedSeries {
series_id: "cpu_usage:host=server1".to_string(),
metric: Metric::gauge("cpu_usage"),
tags,
points: vec![DataPoint {
timestamp: Utc::now(),
value: 42.5,
}],
compressed_blocks: vec![],
}],
};
pm.save(&state).unwrap();
assert!(pm.exists());
let loaded = pm.load().unwrap().unwrap();
assert_eq!(loaded.version, 1);
assert_eq!(loaded.metrics.len(), 1);
assert_eq!(loaded.series.len(), 1);
assert_eq!(loaded.series[0].points[0].value, 42.5);
}
#[test]
fn test_load_nonexistent() {
let dir = tempfile::tempdir().unwrap();
let pm = PersistenceManager::new(dir.path().join("ts")).unwrap();
let loaded = pm.load().unwrap();
assert!(loaded.is_none());
}
#[test]
fn test_persisted_block_roundtrip() {
let block = CompressedBlock {
data: vec![1, 2, 3, 4],
first_timestamp: 1000,
last_timestamp: 2000,
count: 10,
checksum: 12345,
};
let persisted: PersistedBlock = (&block).into();
let restored: CompressedBlock = persisted.into();
assert_eq!(restored.data, block.data);
assert_eq!(restored.first_timestamp, block.first_timestamp);
assert_eq!(restored.last_timestamp, block.last_timestamp);
assert_eq!(restored.count, block.count);
assert_eq!(restored.checksum, block.checksum);
}
}