term_guard/analyzers/incremental/
state_store.rs

1//! State storage abstraction for incremental computation.
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5
6use async_trait::async_trait;
7use tokio::fs;
8use tracing::{debug, instrument};
9
10use crate::analyzers::{AnalyzerError, AnalyzerResult};
11
12/// Type alias for state storage - maps analyzer names to serialized states
13pub type StateMap = HashMap<String, Vec<u8>>;
14
15/// Trait for storing and retrieving analyzer states.
16///
17/// Implementations handle persistence of intermediate computation states,
18/// enabling incremental analysis across data partitions.
19#[async_trait]
20pub trait StateStore: Send + Sync {
21    /// Loads the state for a specific partition.
22    ///
23    /// # Arguments
24    /// * `partition` - The partition identifier (e.g., "2024-01-15")
25    ///
26    /// # Returns
27    /// A map of analyzer names to their serialized states
28    async fn load_state(&self, partition: &str) -> AnalyzerResult<StateMap>;
29
30    /// Saves the state for a specific partition.
31    ///
32    /// # Arguments
33    /// * `partition` - The partition identifier
34    /// * `state` - Map of analyzer names to serialized states
35    async fn save_state(&self, partition: &str, state: StateMap) -> AnalyzerResult<()>;
36
37    /// Lists all known partitions.
38    ///
39    /// # Returns
40    /// Vector of partition identifiers ordered by name
41    async fn list_partitions(&self) -> AnalyzerResult<Vec<String>>;
42
43    /// Deletes the state for a specific partition.
44    ///
45    /// # Arguments
46    /// * `partition` - The partition identifier to delete
47    async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()>;
48
49    /// Loads states for multiple partitions.
50    ///
51    /// # Arguments
52    /// * `partitions` - List of partition identifiers
53    ///
54    /// # Returns
55    /// Map of partition names to their state maps
56    async fn load_states_batch(
57        &self,
58        partitions: &[String],
59    ) -> AnalyzerResult<HashMap<String, StateMap>> {
60        let mut results = HashMap::new();
61        for partition in partitions {
62            let state = self.load_state(partition).await?;
63            results.insert(partition.clone(), state);
64        }
65        Ok(results)
66    }
67}
68
69/// File system implementation of StateStore.
70///
71/// Stores states as JSON files organized by partition in a directory structure:
72/// ```text
73/// base_path/
74/// ├── 2024-01-14/
75/// │   ├── size.json
76/// │   ├── completeness_col1.json
77/// │   └── mean_col2.json
78/// └── 2024-01-15/
79///     └── ...
80/// ```
81pub struct FileSystemStateStore {
82    base_path: PathBuf,
83}
84
85impl FileSystemStateStore {
86    /// Creates a new file system state store.
87    ///
88    /// # Arguments
89    /// * `base_path` - Directory path for storing state files
90    pub fn new<P: AsRef<Path>>(base_path: P) -> AnalyzerResult<Self> {
91        let base_path = base_path.as_ref().to_path_buf();
92
93        // Ensure base directory exists
94        std::fs::create_dir_all(&base_path)
95            .map_err(|e| AnalyzerError::Custom(format!("Failed to create state directory: {e}")))?;
96
97        Ok(Self { base_path })
98    }
99
100    /// Gets the directory path for a partition
101    fn partition_path(&self, partition: &str) -> PathBuf {
102        self.base_path.join(partition)
103    }
104
105    /// Gets the file path for a specific analyzer state
106    fn state_file_path(&self, partition: &str, analyzer_name: &str) -> PathBuf {
107        self.partition_path(partition)
108            .join(format!("{analyzer_name}.json"))
109    }
110}
111
112#[async_trait]
113impl StateStore for FileSystemStateStore {
114    #[instrument(skip(self))]
115    async fn load_state(&self, partition: &str) -> AnalyzerResult<StateMap> {
116        let partition_dir = self.partition_path(partition);
117        let mut state_map = StateMap::new();
118
119        if !partition_dir.exists() {
120            debug!(partition = %partition, "No state directory found");
121            return Ok(state_map);
122        }
123
124        let mut entries = fs::read_dir(&partition_dir).await.map_err(|e| {
125            AnalyzerError::Custom(format!("Failed to read partition directory: {e}"))
126        })?;
127
128        while let Some(entry) = entries
129            .next_entry()
130            .await
131            .map_err(|e| AnalyzerError::Custom(format!("Failed to read directory entry: {e}")))?
132        {
133            let path = entry.path();
134            if path.extension().and_then(|s| s.to_str()) == Some("json") {
135                let analyzer_name = path
136                    .file_stem()
137                    .and_then(|s| s.to_str())
138                    .ok_or_else(|| AnalyzerError::Custom("Invalid state file name".to_string()))?;
139
140                let state_data = fs::read(&path).await.map_err(|e| {
141                    AnalyzerError::Custom(format!("Failed to read state file: {e}"))
142                })?;
143
144                state_map.insert(analyzer_name.to_string(), state_data);
145            }
146        }
147
148        debug!(partition = %partition, states = state_map.len(), "Loaded partition state");
149        Ok(state_map)
150    }
151
152    #[instrument(skip(self, state))]
153    async fn save_state(&self, partition: &str, state: StateMap) -> AnalyzerResult<()> {
154        let partition_dir = self.partition_path(partition);
155
156        // Create partition directory
157        fs::create_dir_all(&partition_dir).await.map_err(|e| {
158            AnalyzerError::Custom(format!("Failed to create partition directory: {e}"))
159        })?;
160
161        // Count states for logging
162        let state_count = state.len();
163
164        // Save each analyzer state
165        for (analyzer_name, state_data) in state {
166            let file_path = self.state_file_path(partition, &analyzer_name);
167
168            fs::write(&file_path, state_data)
169                .await
170                .map_err(|e| AnalyzerError::Custom(format!("Failed to write state file: {e}")))?;
171        }
172
173        debug!(partition = %partition, states = state_count, "Saved partition state");
174        Ok(())
175    }
176
177    #[instrument(skip(self))]
178    async fn list_partitions(&self) -> AnalyzerResult<Vec<String>> {
179        let mut partitions = Vec::new();
180
181        if !self.base_path.exists() {
182            return Ok(partitions);
183        }
184
185        let mut entries = fs::read_dir(&self.base_path)
186            .await
187            .map_err(|e| AnalyzerError::Custom(format!("Failed to read base directory: {e}")))?;
188
189        while let Some(entry) = entries
190            .next_entry()
191            .await
192            .map_err(|e| AnalyzerError::Custom(format!("Failed to read directory entry: {e}")))?
193        {
194            if entry
195                .file_type()
196                .await
197                .map_err(|e| AnalyzerError::Custom(format!("Failed to get file type: {e}")))?
198                .is_dir()
199            {
200                if let Some(name) = entry.file_name().to_str() {
201                    partitions.push(name.to_string());
202                }
203            }
204        }
205
206        partitions.sort();
207        debug!(count = partitions.len(), "Listed partitions");
208        Ok(partitions)
209    }
210
211    #[instrument(skip(self))]
212    async fn delete_partition(&self, partition: &str) -> AnalyzerResult<()> {
213        let partition_dir = self.partition_path(partition);
214
215        if partition_dir.exists() {
216            fs::remove_dir_all(&partition_dir)
217                .await
218                .map_err(|e| AnalyzerError::Custom(format!("Failed to delete partition: {e}")))?;
219            debug!(partition = %partition, "Deleted partition");
220        }
221
222        Ok(())
223    }
224}