term_guard/analyzers/incremental/
state_store.rs1use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5
6use async_trait::async_trait;
7use tokio::fs;
8use tracing::{debug, instrument};
9
10use crate::analyzers::{AnalyzerError, AnalyzerResult};
11
12pub type StateMap = HashMap<String, Vec<u8>>;
14
15#[async_trait]
20pub trait StateStore: Send + Sync {
21 async fn load_state(&self, partition: &str) -> AnalyzerResult<StateMap>;
29
30 async fn save_state(&self, partition: &str, state: StateMap) -> AnalyzerResult<()>;
36
37 async fn list_partitions(&self) -> AnalyzerResult<Vec<String>>;
42
43 async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()>;
48
49 async fn load_states_batch(
57 &self,
58 partitions: &[String],
59 ) -> AnalyzerResult<HashMap<String, StateMap>> {
60 let mut results = HashMap::new();
61 for partition in partitions {
62 let state = self.load_state(partition).await?;
63 results.insert(partition.clone(), state);
64 }
65 Ok(results)
66 }
67}
68
69pub struct FileSystemStateStore {
82 base_path: PathBuf,
83}
84
85impl FileSystemStateStore {
86 pub fn new<P: AsRef<Path>>(base_path: P) -> AnalyzerResult<Self> {
91 let base_path = base_path.as_ref().to_path_buf();
92
93 std::fs::create_dir_all(&base_path)
95 .map_err(|e| AnalyzerError::Custom(format!("Failed to create state directory: {e}")))?;
96
97 Ok(Self { base_path })
98 }
99
100 fn partition_path(&self, partition: &str) -> PathBuf {
102 self.base_path.join(partition)
103 }
104
105 fn state_file_path(&self, partition: &str, analyzer_name: &str) -> PathBuf {
107 self.partition_path(partition)
108 .join(format!("{analyzer_name}.json"))
109 }
110}
111
112#[async_trait]
113impl StateStore for FileSystemStateStore {
114 #[instrument(skip(self))]
115 async fn load_state(&self, partition: &str) -> AnalyzerResult<StateMap> {
116 let partition_dir = self.partition_path(partition);
117 let mut state_map = StateMap::new();
118
119 if !partition_dir.exists() {
120 debug!(partition = %partition, "No state directory found");
121 return Ok(state_map);
122 }
123
124 let mut entries = fs::read_dir(&partition_dir).await.map_err(|e| {
125 AnalyzerError::Custom(format!("Failed to read partition directory: {e}"))
126 })?;
127
128 while let Some(entry) = entries
129 .next_entry()
130 .await
131 .map_err(|e| AnalyzerError::Custom(format!("Failed to read directory entry: {e}")))?
132 {
133 let path = entry.path();
134 if path.extension().and_then(|s| s.to_str()) == Some("json") {
135 let analyzer_name = path
136 .file_stem()
137 .and_then(|s| s.to_str())
138 .ok_or_else(|| AnalyzerError::Custom("Invalid state file name".to_string()))?;
139
140 let state_data = fs::read(&path).await.map_err(|e| {
141 AnalyzerError::Custom(format!("Failed to read state file: {e}"))
142 })?;
143
144 state_map.insert(analyzer_name.to_string(), state_data);
145 }
146 }
147
148 debug!(partition = %partition, states = state_map.len(), "Loaded partition state");
149 Ok(state_map)
150 }
151
152 #[instrument(skip(self, state))]
153 async fn save_state(&self, partition: &str, state: StateMap) -> AnalyzerResult<()> {
154 let partition_dir = self.partition_path(partition);
155
156 fs::create_dir_all(&partition_dir).await.map_err(|e| {
158 AnalyzerError::Custom(format!("Failed to create partition directory: {e}"))
159 })?;
160
161 let state_count = state.len();
163
164 for (analyzer_name, state_data) in state {
166 let file_path = self.state_file_path(partition, &analyzer_name);
167
168 fs::write(&file_path, state_data)
169 .await
170 .map_err(|e| AnalyzerError::Custom(format!("Failed to write state file: {e}")))?;
171 }
172
173 debug!(partition = %partition, states = state_count, "Saved partition state");
174 Ok(())
175 }
176
177 #[instrument(skip(self))]
178 async fn list_partitions(&self) -> AnalyzerResult<Vec<String>> {
179 let mut partitions = Vec::new();
180
181 if !self.base_path.exists() {
182 return Ok(partitions);
183 }
184
185 let mut entries = fs::read_dir(&self.base_path)
186 .await
187 .map_err(|e| AnalyzerError::Custom(format!("Failed to read base directory: {e}")))?;
188
189 while let Some(entry) = entries
190 .next_entry()
191 .await
192 .map_err(|e| AnalyzerError::Custom(format!("Failed to read directory entry: {e}")))?
193 {
194 if entry
195 .file_type()
196 .await
197 .map_err(|e| AnalyzerError::Custom(format!("Failed to get file type: {e}")))?
198 .is_dir()
199 {
200 if let Some(name) = entry.file_name().to_str() {
201 partitions.push(name.to_string());
202 }
203 }
204 }
205
206 partitions.sort();
207 debug!(count = partitions.len(), "Listed partitions");
208 Ok(partitions)
209 }
210
211 #[instrument(skip(self))]
212 async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()> {
213 let partition_dir = self.partition_path(partition);
214
215 if partition_dir.exists() {
216 fs::remove_dir_all(&partition_dir)
217 .await
218 .map_err(|e| AnalyzerError::Custom(format!("Failed to delete partition: {e}")))?;
219 debug!(partition = %partition, "Deleted partition");
220 }
221
222 Ok(())
223 }
224}