use std::collections::HashMap;
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use tokio::fs;
use tracing::{debug, instrument};
use crate::analyzers::{AnalyzerError, AnalyzerResult};
pub type StateMap = HashMap<String, Vec<u8>>;
#[async_trait]
pub trait StateStore: Send + Sync {
async fn load_state(&self, partition: &str) -> AnalyzerResult<StateMap>;
async fn save_state(&self, partition: &str, state: StateMap) -> AnalyzerResult<()>;
async fn list_partitions(&self) -> AnalyzerResult<Vec<String>>;
async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()>;
async fn load_states_batch(
&self,
partitions: &[String],
) -> AnalyzerResult<HashMap<String, StateMap>> {
let mut results = HashMap::new();
for partition in partitions {
let state = self.load_state(partition).await?;
results.insert(partition.clone(), state);
}
Ok(results)
}
}
pub struct FileSystemStateStore {
base_path: PathBuf,
}
impl FileSystemStateStore {
pub fn new<P: AsRef<Path>>(base_path: P) -> AnalyzerResult<Self> {
let base_path = base_path.as_ref().to_path_buf();
std::fs::create_dir_all(&base_path)
.map_err(|e| AnalyzerError::Custom(format!("Failed to create state directory: {e}")))?;
Ok(Self { base_path })
}
fn partition_path(&self, partition: &str) -> PathBuf {
self.base_path.join(partition)
}
fn state_file_path(&self, partition: &str, analyzer_name: &str) -> PathBuf {
self.partition_path(partition)
.join(format!("{analyzer_name}.json"))
}
}
#[async_trait]
impl StateStore for FileSystemStateStore {
#[instrument(skip(self))]
async fn load_state(&self, partition: &str) -> AnalyzerResult<StateMap> {
let partition_dir = self.partition_path(partition);
let mut state_map = StateMap::new();
if !partition_dir.exists() {
debug!(partition = %partition, "No state directory found");
return Ok(state_map);
}
let mut entries = fs::read_dir(&partition_dir).await.map_err(|e| {
AnalyzerError::Custom(format!("Failed to read partition directory: {e}"))
})?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| AnalyzerError::Custom(format!("Failed to read directory entry: {e}")))?
{
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let analyzer_name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| AnalyzerError::Custom("Invalid state file name".to_string()))?;
let state_data = fs::read(&path).await.map_err(|e| {
AnalyzerError::Custom(format!("Failed to read state file: {e}"))
})?;
state_map.insert(analyzer_name.to_string(), state_data);
}
}
debug!(partition = %partition, states = state_map.len(), "Loaded partition state");
Ok(state_map)
}
#[instrument(skip(self, state))]
async fn save_state(&self, partition: &str, state: StateMap) -> AnalyzerResult<()> {
let partition_dir = self.partition_path(partition);
fs::create_dir_all(&partition_dir).await.map_err(|e| {
AnalyzerError::Custom(format!("Failed to create partition directory: {e}"))
})?;
let state_count = state.len();
for (analyzer_name, state_data) in state {
let file_path = self.state_file_path(partition, &analyzer_name);
fs::write(&file_path, state_data)
.await
.map_err(|e| AnalyzerError::Custom(format!("Failed to write state file: {e}")))?;
}
debug!(partition = %partition, states = state_count, "Saved partition state");
Ok(())
}
#[instrument(skip(self))]
async fn list_partitions(&self) -> AnalyzerResult<Vec<String>> {
let mut partitions = Vec::new();
if !self.base_path.exists() {
return Ok(partitions);
}
let mut entries = fs::read_dir(&self.base_path)
.await
.map_err(|e| AnalyzerError::Custom(format!("Failed to read base directory: {e}")))?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| AnalyzerError::Custom(format!("Failed to read directory entry: {e}")))?
{
if entry
.file_type()
.await
.map_err(|e| AnalyzerError::Custom(format!("Failed to get file type: {e}")))?
.is_dir()
{
if let Some(name) = entry.file_name().to_str() {
partitions.push(name.to_string());
}
}
}
partitions.sort();
debug!(count = partitions.len(), "Listed partitions");
Ok(partitions)
}
#[instrument(skip(self))]
async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()> {
let partition_dir = self.partition_path(partition);
if partition_dir.exists() {
fs::remove_dir_all(&partition_dir)
.await
.map_err(|e| AnalyzerError::Custom(format!("Failed to delete partition: {e}")))?;
debug!(partition = %partition, "Deleted partition");
}
Ok(())
}
}