Skip to main content

rustrade/
json_store.rs

1//! [`JsonFileStore`] — a durable [`StateStore`] backed by a single JSON file.
2//!
3//! This is the simplest real-durability backend: the framework's risk
4//! snapshots (per-symbol [`SessionPnl`](rustrade_risk::SessionPnl) /
5//! [`CircuitBreaker`](rustrade_risk::CircuitBreaker) and the account
6//! [`PortfolioRisk`](rustrade_risk::PortfolioRisk) latch) survive a restart, so
7//! a halted daily session or a tripped breaker isn't forgotten when the bot
8//! reboots. Wire it with `Bot::with_state_store`.
9//!
10//! Writes are **write-through and atomic**: every
11//! [`save`](StateStore::save) (and [`flush`](StateStore::flush)) serialises the
12//! full map to a sibling temp file and `rename`s it over the target, so a crash
13//! mid-write can't corrupt the store — you either keep the previous good file
14//! or get the new one. Saves are infrequent (one per realised trade) and the
15//! payload is tiny (a handful of symbols), so rewriting the whole file each
16//! time is cheap.
17//!
18//! For higher write volumes or multi-process access, a sqlite/Postgres backend
19//! (another `StateStore` impl) is the next step — the framework only depends on
20//! the trait.
21
22use std::collections::HashMap;
23use std::path::{Path, PathBuf};
24
25use async_trait::async_trait;
26use serde_json::Value;
27use tokio::sync::Mutex;
28
29use rustrade_core::{Error, Result, StateStore};
30
31/// A [`StateStore`] that persists snapshots to a JSON file on disk.
32///
33/// Open one with [`JsonFileStore::open`] (which loads any existing file), then
34/// hand it to `Bot::with_state_store`:
35///
36/// ```no_run
37/// use std::sync::Arc;
38/// use rustrade::JsonFileStore;
39/// # async fn demo() -> rustrade::Result<()> {
40/// let store = Arc::new(JsonFileStore::open("/var/lib/fks/bot-risk.json").await?);
41/// // Bot::builder()...build()?.with_state_store(store)
42/// # let _ = store;
43/// # Ok(())
44/// # }
45/// ```
46pub struct JsonFileStore {
47    path: PathBuf,
48    // Async mutex: held across the file write in `save`/`flush`, which both
49    // serialises writers (no concurrent file writes ⇒ no torn temp file) and
50    // keeps the in-memory map the single source of truth.
51    data: Mutex<HashMap<String, Value>>,
52}
53
54impl JsonFileStore {
55    /// Open (or create) a store at `path`, loading any existing snapshot map.
56    ///
57    /// A missing file starts empty (first boot). The parent directory is
58    /// created if needed.
59    ///
60    /// # Errors
61    /// [`Error::Storage`] if the parent dir can't be created, the file can't be
62    /// read, or an existing file is corrupt (not valid JSON object) — corruption
63    /// is surfaced rather than silently dropping persisted state.
64    pub async fn open(path: impl Into<PathBuf>) -> Result<Self> {
65        let path = path.into();
66
67        if let Some(parent) = path.parent()
68            && !parent.as_os_str().is_empty()
69        {
70            tokio::fs::create_dir_all(parent).await.map_err(|e| {
71                Error::Storage(format!(
72                    "JsonFileStore: create dir {}: {e}",
73                    parent.display()
74                ))
75            })?;
76        }
77
78        let data = match tokio::fs::read(&path).await {
79            Ok(bytes) => serde_json::from_slice::<HashMap<String, Value>>(&bytes).map_err(|e| {
80                Error::Storage(format!(
81                    "JsonFileStore: corrupt store {}: {e}",
82                    path.display()
83                ))
84            })?,
85            Err(e) if e.kind() == std::io::ErrorKind::NotFound => HashMap::new(),
86            Err(e) => {
87                return Err(Error::Storage(format!(
88                    "JsonFileStore: read {}: {e}",
89                    path.display()
90                )));
91            }
92        };
93
94        Ok(Self {
95            path,
96            data: Mutex::new(data),
97        })
98    }
99
100    /// The file path this store persists to.
101    #[must_use]
102    pub fn path(&self) -> &Path {
103        &self.path
104    }
105}
106
107/// Serialise `map` and write it atomically to `path` (temp file + rename).
108async fn persist(map: &HashMap<String, Value>, path: &Path) -> Result<()> {
109    let bytes = serde_json::to_vec_pretty(map)
110        .map_err(|e| Error::Storage(format!("JsonFileStore: serialize: {e}")))?;
111
112    let mut tmp = path.to_path_buf();
113    tmp.set_extension("json.tmp");
114
115    tokio::fs::write(&tmp, &bytes)
116        .await
117        .map_err(|e| Error::Storage(format!("JsonFileStore: write {}: {e}", tmp.display())))?;
118    tokio::fs::rename(&tmp, path).await.map_err(|e| {
119        Error::Storage(format!(
120            "JsonFileStore: rename {} -> {}: {e}",
121            tmp.display(),
122            path.display()
123        ))
124    })?;
125    Ok(())
126}
127
128#[async_trait]
129impl StateStore for JsonFileStore {
130    async fn load(&self, key: &str) -> Result<Option<Value>> {
131        Ok(self.data.lock().await.get(key).cloned())
132    }
133
134    async fn save(&self, key: &str, value: Value) -> Result<()> {
135        // Write-through under the lock so a crash right after a realised trade
136        // keeps that trade's effect on the risk gates.
137        let mut map = self.data.lock().await;
138        map.insert(key.to_string(), value);
139        persist(&map, &self.path).await
140    }
141
142    async fn flush(&self) -> Result<()> {
143        let map = self.data.lock().await;
144        persist(&map, &self.path).await
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use std::sync::atomic::{AtomicU64, Ordering};
152
153    /// A unique temp path per test invocation (no `tempfile` dep).
154    fn temp_path() -> PathBuf {
155        static N: AtomicU64 = AtomicU64::new(0);
156        let n = N.fetch_add(1, Ordering::Relaxed);
157        std::env::temp_dir().join(format!(
158            "rustrade-jsonstore-{}-{n}/risk.json",
159            std::process::id()
160        ))
161    }
162
163    /// Remove the temp dir a `temp_path()` lives in (best effort).
164    fn cleanup(path: &Path) {
165        if let Some(dir) = path.parent() {
166            let _ = std::fs::remove_dir_all(dir);
167        }
168    }
169
170    #[tokio::test]
171    async fn open_missing_file_starts_empty() {
172        let path = temp_path();
173        let store = JsonFileStore::open(&path).await.unwrap();
174        assert!(store.load("nope").await.unwrap().is_none());
175        cleanup(&path);
176    }
177
178    #[tokio::test]
179    async fn save_then_load_roundtrips() {
180        let path = temp_path();
181        let store = JsonFileStore::open(&path).await.unwrap();
182        let v = serde_json::json!({ "realised": 12.5, "halted": true });
183        store.save("bot/BTCUSDT", v.clone()).await.unwrap();
184        assert_eq!(store.load("bot/BTCUSDT").await.unwrap(), Some(v));
185        cleanup(&path);
186    }
187
188    #[tokio::test]
189    async fn state_survives_reopen() {
190        let path = temp_path();
191        {
192            let store = JsonFileStore::open(&path).await.unwrap();
193            store
194                .save("bot/ETHUSDT", serde_json::json!({ "trades": 3 }))
195                .await
196                .unwrap();
197            // Drop the store — the write-through already persisted it.
198        }
199        // Re-open the same path: the snapshot must be there.
200        let reopened = JsonFileStore::open(&path).await.unwrap();
201        assert_eq!(
202            reopened.load("bot/ETHUSDT").await.unwrap(),
203            Some(serde_json::json!({ "trades": 3 }))
204        );
205        cleanup(&path);
206    }
207
208    #[tokio::test]
209    async fn save_overwrites_and_persists_latest() {
210        let path = temp_path();
211        let store = JsonFileStore::open(&path).await.unwrap();
212        store.save("k", serde_json::json!(1)).await.unwrap();
213        store.save("k", serde_json::json!(2)).await.unwrap();
214        drop(store);
215        let reopened = JsonFileStore::open(&path).await.unwrap();
216        assert_eq!(
217            reopened.load("k").await.unwrap(),
218            Some(serde_json::json!(2))
219        );
220        cleanup(&path);
221    }
222
223    #[tokio::test]
224    async fn corrupt_file_is_an_error_not_silent_loss() {
225        let path = temp_path();
226        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
227        std::fs::write(&path, b"{ this is not valid json").unwrap();
228        let err = JsonFileStore::open(&path).await;
229        assert!(err.is_err(), "a corrupt store must surface an error");
230        cleanup(&path);
231    }
232
233    #[tokio::test]
234    async fn creates_missing_parent_dirs() {
235        let base = temp_path();
236        let dir = base.parent().unwrap().to_path_buf();
237        let nested = dir.join("a").join("b").join("c").join("risk.json");
238        let store = JsonFileStore::open(&nested).await.unwrap();
239        store.save("k", serde_json::json!("v")).await.unwrap();
240        assert!(
241            nested.exists(),
242            "nested dirs + file should have been created"
243        );
244        let _ = std::fs::remove_dir_all(&dir);
245    }
246}