Skip to main content

torsh_distributed/store/
file.rs

1//! File-based store implementation for single-node multi-process coordination
2
3use super::{
4    store_trait::Store,
5    types::{StoreValue, DEFAULT_TIMEOUT},
6};
7use crate::{TorshDistributedError, TorshResult};
8use async_trait::async_trait;
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14/// File-based store implementation
15#[derive(Debug)]
16pub struct FileStore {
17    file_path: String,
18    data: Arc<RwLock<HashMap<String, StoreValue>>>,
19}
20
21impl FileStore {
22    pub fn new(file_path: String) -> TorshResult<Self> {
23        let store = Self {
24            file_path,
25            data: Arc::new(RwLock::new(HashMap::new())),
26        };
27
28        // Try to load existing data
29        if store.load_from_file().is_err() {
30            // If loading fails, start with empty store
31        }
32
33        Ok(store)
34    }
35
36    fn load_from_file(&self) -> TorshResult<()> {
37        if std::path::Path::new(&self.file_path).exists() {
38            let contents = std::fs::read_to_string(&self.file_path).map_err(|e| {
39                TorshDistributedError::backend_error(
40                    "FileStore",
41                    format!("Failed to read store file: {}", e),
42                )
43            })?;
44
45            let data: HashMap<String, StoreValue> =
46                serde_json::from_str(&contents).map_err(|e| {
47                    TorshDistributedError::backend_error(
48                        "FileStore",
49                        format!("Failed to parse store file: {}", e),
50                    )
51                })?;
52
53            *self.data.write() = data;
54        }
55        Ok(())
56    }
57
58    fn save_to_file(&self) -> TorshResult<()> {
59        let data = self.data.read();
60        let contents = serde_json::to_string_pretty(&*data).map_err(|e| {
61            TorshDistributedError::backend_error(
62                "FileStore",
63                format!("Failed to serialize store: {}", e),
64            )
65        })?;
66
67        std::fs::write(&self.file_path, contents).map_err(|e| {
68            TorshDistributedError::backend_error(
69                "FileStore",
70                format!("Failed to write store file: {}", e),
71            )
72        })?;
73
74        Ok(())
75    }
76}
77
78#[async_trait]
79impl Store for FileStore {
80    async fn set(&self, key: &str, value: &[u8]) -> TorshResult<()> {
81        let store_value = StoreValue::new(value.to_vec());
82        self.data.write().insert(key.to_string(), store_value);
83        self.save_to_file()?;
84        Ok(())
85    }
86
87    async fn get(&self, key: &str) -> TorshResult<Option<Vec<u8>>> {
88        self.load_from_file()?;
89        Ok(self.data.read().get(key).map(|v| v.data().to_vec()))
90    }
91
92    async fn wait(&self, keys: &[String]) -> TorshResult<()> {
93        let start = Instant::now();
94
95        loop {
96            self.load_from_file()?;
97            let all_present = {
98                let data = self.data.read();
99                keys.iter().all(|key| data.contains_key(key))
100            }; // RwLockReadGuard is dropped here
101
102            if all_present {
103                return Ok(());
104            }
105
106            if start.elapsed() > DEFAULT_TIMEOUT {
107                return Err(TorshDistributedError::communication_error(
108                    "Store wait",
109                    "Timeout waiting for keys",
110                ));
111            }
112
113            tokio::time::sleep(Duration::from_millis(100)).await;
114        }
115    }
116
117    async fn delete(&self, key: &str) -> TorshResult<()> {
118        self.data.write().remove(key);
119        self.save_to_file()?;
120        Ok(())
121    }
122
123    async fn num_keys(&self) -> TorshResult<usize> {
124        self.load_from_file()?;
125        Ok(self.data.read().len())
126    }
127
128    async fn contains(&self, key: &str) -> TorshResult<bool> {
129        self.load_from_file()?;
130        Ok(self.data.read().contains_key(key))
131    }
132
133    async fn set_with_expiry(&self, key: &str, value: &[u8], _ttl: Duration) -> TorshResult<()> {
134        // File store doesn't support TTL, just set normally
135        self.set(key, value).await
136    }
137
138    async fn compare_and_swap(
139        &self,
140        key: &str,
141        expected: Option<&[u8]>,
142        value: &[u8],
143    ) -> TorshResult<bool> {
144        self.load_from_file()?;
145        let mut data = self.data.write();
146
147        match expected {
148            Some(expected_val) => {
149                if let Some(current) = data.get(key) {
150                    if current.data() == expected_val {
151                        let store_value = StoreValue::new(value.to_vec());
152                        data.insert(key.to_string(), store_value);
153                        drop(data);
154                        self.save_to_file()?;
155                        Ok(true)
156                    } else {
157                        Ok(false)
158                    }
159                } else {
160                    Ok(false)
161                }
162            }
163            None => {
164                if data.contains_key(key) {
165                    Ok(false)
166                } else {
167                    let store_value = StoreValue::new(value.to_vec());
168                    data.insert(key.to_string(), store_value);
169                    drop(data);
170                    self.save_to_file()?;
171                    Ok(true)
172                }
173            }
174        }
175    }
176
177    async fn add(&self, key: &str, value: i64) -> TorshResult<i64> {
178        self.load_from_file()?;
179        let mut data = self.data.write();
180
181        let new_value = if let Some(existing) = data.get(key) {
182            let current = i64::from_le_bytes(existing.data()[..8].try_into().map_err(|_| {
183                TorshDistributedError::invalid_argument(
184                    "value",
185                    "Failed to convert stored bytes to i64",
186                    "8 bytes representing a valid i64 value",
187                )
188            })?);
189            current + value
190        } else {
191            value
192        };
193
194        let store_value = StoreValue::new(new_value.to_le_bytes().to_vec());
195        data.insert(key.to_string(), store_value);
196        drop(data);
197        self.save_to_file()?;
198        Ok(new_value)
199    }
200}