torsh_distributed/store/
file.rs1use super::{
4 store_trait::Store,
5 types::{StoreValue, DEFAULT_TIMEOUT},
6};
7use crate::{TorshDistributedError, TorshResult};
8use async_trait::async_trait;
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13
14#[derive(Debug)]
16pub struct FileStore {
17 file_path: String,
18 data: Arc<RwLock<HashMap<String, StoreValue>>>,
19}
20
21impl FileStore {
22 pub fn new(file_path: String) -> TorshResult<Self> {
23 let store = Self {
24 file_path,
25 data: Arc::new(RwLock::new(HashMap::new())),
26 };
27
28 if store.load_from_file().is_err() {
30 }
32
33 Ok(store)
34 }
35
36 fn load_from_file(&self) -> TorshResult<()> {
37 if std::path::Path::new(&self.file_path).exists() {
38 let contents = std::fs::read_to_string(&self.file_path).map_err(|e| {
39 TorshDistributedError::backend_error(
40 "FileStore",
41 format!("Failed to read store file: {}", e),
42 )
43 })?;
44
45 let data: HashMap<String, StoreValue> =
46 serde_json::from_str(&contents).map_err(|e| {
47 TorshDistributedError::backend_error(
48 "FileStore",
49 format!("Failed to parse store file: {}", e),
50 )
51 })?;
52
53 *self.data.write() = data;
54 }
55 Ok(())
56 }
57
58 fn save_to_file(&self) -> TorshResult<()> {
59 let data = self.data.read();
60 let contents = serde_json::to_string_pretty(&*data).map_err(|e| {
61 TorshDistributedError::backend_error(
62 "FileStore",
63 format!("Failed to serialize store: {}", e),
64 )
65 })?;
66
67 std::fs::write(&self.file_path, contents).map_err(|e| {
68 TorshDistributedError::backend_error(
69 "FileStore",
70 format!("Failed to write store file: {}", e),
71 )
72 })?;
73
74 Ok(())
75 }
76}
77
78#[async_trait]
79impl Store for FileStore {
80 async fn set(&self, key: &str, value: &[u8]) -> TorshResult<()> {
81 let store_value = StoreValue::new(value.to_vec());
82 self.data.write().insert(key.to_string(), store_value);
83 self.save_to_file()?;
84 Ok(())
85 }
86
87 async fn get(&self, key: &str) -> TorshResult<Option<Vec<u8>>> {
88 self.load_from_file()?;
89 Ok(self.data.read().get(key).map(|v| v.data().to_vec()))
90 }
91
92 async fn wait(&self, keys: &[String]) -> TorshResult<()> {
93 let start = Instant::now();
94
95 loop {
96 self.load_from_file()?;
97 let all_present = {
98 let data = self.data.read();
99 keys.iter().all(|key| data.contains_key(key))
100 }; if all_present {
103 return Ok(());
104 }
105
106 if start.elapsed() > DEFAULT_TIMEOUT {
107 return Err(TorshDistributedError::communication_error(
108 "Store wait",
109 "Timeout waiting for keys",
110 ));
111 }
112
113 tokio::time::sleep(Duration::from_millis(100)).await;
114 }
115 }
116
117 async fn delete(&self, key: &str) -> TorshResult<()> {
118 self.data.write().remove(key);
119 self.save_to_file()?;
120 Ok(())
121 }
122
123 async fn num_keys(&self) -> TorshResult<usize> {
124 self.load_from_file()?;
125 Ok(self.data.read().len())
126 }
127
128 async fn contains(&self, key: &str) -> TorshResult<bool> {
129 self.load_from_file()?;
130 Ok(self.data.read().contains_key(key))
131 }
132
133 async fn set_with_expiry(&self, key: &str, value: &[u8], _ttl: Duration) -> TorshResult<()> {
134 self.set(key, value).await
136 }
137
138 async fn compare_and_swap(
139 &self,
140 key: &str,
141 expected: Option<&[u8]>,
142 value: &[u8],
143 ) -> TorshResult<bool> {
144 self.load_from_file()?;
145 let mut data = self.data.write();
146
147 match expected {
148 Some(expected_val) => {
149 if let Some(current) = data.get(key) {
150 if current.data() == expected_val {
151 let store_value = StoreValue::new(value.to_vec());
152 data.insert(key.to_string(), store_value);
153 drop(data);
154 self.save_to_file()?;
155 Ok(true)
156 } else {
157 Ok(false)
158 }
159 } else {
160 Ok(false)
161 }
162 }
163 None => {
164 if data.contains_key(key) {
165 Ok(false)
166 } else {
167 let store_value = StoreValue::new(value.to_vec());
168 data.insert(key.to_string(), store_value);
169 drop(data);
170 self.save_to_file()?;
171 Ok(true)
172 }
173 }
174 }
175 }
176
177 async fn add(&self, key: &str, value: i64) -> TorshResult<i64> {
178 self.load_from_file()?;
179 let mut data = self.data.write();
180
181 let new_value = if let Some(existing) = data.get(key) {
182 let current = i64::from_le_bytes(existing.data()[..8].try_into().map_err(|_| {
183 TorshDistributedError::invalid_argument(
184 "value",
185 "Failed to convert stored bytes to i64",
186 "8 bytes representing a valid i64 value",
187 )
188 })?);
189 current + value
190 } else {
191 value
192 };
193
194 let store_value = StoreValue::new(new_value.to_le_bytes().to_vec());
195 data.insert(key.to_string(), store_value);
196 drop(data);
197 self.save_to_file()?;
198 Ok(new_value)
199 }
200}