1use std::collections::HashMap;
23use std::path::{Path, PathBuf};
24
25use async_trait::async_trait;
26use serde_json::Value;
27use tokio::sync::Mutex;
28
29use rustrade_core::{Error, Result, StateStore};
30
31pub struct JsonFileStore {
47 path: PathBuf,
48 data: Mutex<HashMap<String, Value>>,
52}
53
54impl JsonFileStore {
55 pub async fn open(path: impl Into<PathBuf>) -> Result<Self> {
65 let path = path.into();
66
67 if let Some(parent) = path.parent()
68 && !parent.as_os_str().is_empty()
69 {
70 tokio::fs::create_dir_all(parent).await.map_err(|e| {
71 Error::Storage(format!(
72 "JsonFileStore: create dir {}: {e}",
73 parent.display()
74 ))
75 })?;
76 }
77
78 let data = match tokio::fs::read(&path).await {
79 Ok(bytes) => serde_json::from_slice::<HashMap<String, Value>>(&bytes).map_err(|e| {
80 Error::Storage(format!(
81 "JsonFileStore: corrupt store {}: {e}",
82 path.display()
83 ))
84 })?,
85 Err(e) if e.kind() == std::io::ErrorKind::NotFound => HashMap::new(),
86 Err(e) => {
87 return Err(Error::Storage(format!(
88 "JsonFileStore: read {}: {e}",
89 path.display()
90 )));
91 }
92 };
93
94 Ok(Self {
95 path,
96 data: Mutex::new(data),
97 })
98 }
99
100 #[must_use]
102 pub fn path(&self) -> &Path {
103 &self.path
104 }
105}
106
107async fn persist(map: &HashMap<String, Value>, path: &Path) -> Result<()> {
109 let bytes = serde_json::to_vec_pretty(map)
110 .map_err(|e| Error::Storage(format!("JsonFileStore: serialize: {e}")))?;
111
112 let mut tmp = path.to_path_buf();
113 tmp.set_extension("json.tmp");
114
115 tokio::fs::write(&tmp, &bytes)
116 .await
117 .map_err(|e| Error::Storage(format!("JsonFileStore: write {}: {e}", tmp.display())))?;
118 tokio::fs::rename(&tmp, path).await.map_err(|e| {
119 Error::Storage(format!(
120 "JsonFileStore: rename {} -> {}: {e}",
121 tmp.display(),
122 path.display()
123 ))
124 })?;
125 Ok(())
126}
127
128#[async_trait]
129impl StateStore for JsonFileStore {
130 async fn load(&self, key: &str) -> Result<Option<Value>> {
131 Ok(self.data.lock().await.get(key).cloned())
132 }
133
134 async fn save(&self, key: &str, value: Value) -> Result<()> {
135 let mut map = self.data.lock().await;
138 map.insert(key.to_string(), value);
139 persist(&map, &self.path).await
140 }
141
142 async fn flush(&self) -> Result<()> {
143 let map = self.data.lock().await;
144 persist(&map, &self.path).await
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use std::sync::atomic::{AtomicU64, Ordering};
152
153 fn temp_path() -> PathBuf {
155 static N: AtomicU64 = AtomicU64::new(0);
156 let n = N.fetch_add(1, Ordering::Relaxed);
157 std::env::temp_dir().join(format!(
158 "rustrade-jsonstore-{}-{n}/risk.json",
159 std::process::id()
160 ))
161 }
162
163 fn cleanup(path: &Path) {
165 if let Some(dir) = path.parent() {
166 let _ = std::fs::remove_dir_all(dir);
167 }
168 }
169
170 #[tokio::test]
171 async fn open_missing_file_starts_empty() {
172 let path = temp_path();
173 let store = JsonFileStore::open(&path).await.unwrap();
174 assert!(store.load("nope").await.unwrap().is_none());
175 cleanup(&path);
176 }
177
178 #[tokio::test]
179 async fn save_then_load_roundtrips() {
180 let path = temp_path();
181 let store = JsonFileStore::open(&path).await.unwrap();
182 let v = serde_json::json!({ "realised": 12.5, "halted": true });
183 store.save("bot/BTCUSDT", v.clone()).await.unwrap();
184 assert_eq!(store.load("bot/BTCUSDT").await.unwrap(), Some(v));
185 cleanup(&path);
186 }
187
188 #[tokio::test]
189 async fn state_survives_reopen() {
190 let path = temp_path();
191 {
192 let store = JsonFileStore::open(&path).await.unwrap();
193 store
194 .save("bot/ETHUSDT", serde_json::json!({ "trades": 3 }))
195 .await
196 .unwrap();
197 }
199 let reopened = JsonFileStore::open(&path).await.unwrap();
201 assert_eq!(
202 reopened.load("bot/ETHUSDT").await.unwrap(),
203 Some(serde_json::json!({ "trades": 3 }))
204 );
205 cleanup(&path);
206 }
207
208 #[tokio::test]
209 async fn save_overwrites_and_persists_latest() {
210 let path = temp_path();
211 let store = JsonFileStore::open(&path).await.unwrap();
212 store.save("k", serde_json::json!(1)).await.unwrap();
213 store.save("k", serde_json::json!(2)).await.unwrap();
214 drop(store);
215 let reopened = JsonFileStore::open(&path).await.unwrap();
216 assert_eq!(
217 reopened.load("k").await.unwrap(),
218 Some(serde_json::json!(2))
219 );
220 cleanup(&path);
221 }
222
223 #[tokio::test]
224 async fn corrupt_file_is_an_error_not_silent_loss() {
225 let path = temp_path();
226 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
227 std::fs::write(&path, b"{ this is not valid json").unwrap();
228 let err = JsonFileStore::open(&path).await;
229 assert!(err.is_err(), "a corrupt store must surface an error");
230 cleanup(&path);
231 }
232
233 #[tokio::test]
234 async fn creates_missing_parent_dirs() {
235 let base = temp_path();
236 let dir = base.parent().unwrap().to_path_buf();
237 let nested = dir.join("a").join("b").join("c").join("risk.json");
238 let store = JsonFileStore::open(&nested).await.unwrap();
239 store.save("k", serde_json::json!("v")).await.unwrap();
240 assert!(
241 nested.exists(),
242 "nested dirs + file should have been created"
243 );
244 let _ = std::fs::remove_dir_all(&dir);
245 }
246}