Skip to main content

rustrade_core/
store.rs

1//! Persistence contract for framework state that must survive restarts.
2//!
3//! In 0.1 the risk primitives ([`SessionPnl`], [`CircuitBreaker`] in
4//! `rustrade-risk`) live entirely in memory, so a crash mid-session
5//! resets the daily drawdown cap and the loss-streak breaker. A
6//! [`StateStore`] lets the framework snapshot that state and restore it
7//! on the next boot.
8//!
9//! The trait is deliberately payload-agnostic: it stores opaque,
10//! versioned [`serde_json::Value`] blobs keyed by a string. The framework
11//! (the `rustrade` facade) owns the snapshot schema and the key layout;
12//! a backend only has to persist bytes durably. This keeps `rustrade-core`
13//! free of any real I/O — concrete durable backends (a JSON file, sqlite,
14//! Postgres, Redis, …) live in downstream crates and only need to depend
15//! on this trait.
16//!
17//! [`SessionPnl`]: https://docs.rs/rustrade-risk/latest/rustrade_risk/session_pnl/struct.SessionPnl.html
18//! [`CircuitBreaker`]: https://docs.rs/rustrade-risk/latest/rustrade_risk/circuit_breaker/struct.CircuitBreaker.html
19
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22
23use async_trait::async_trait;
24
25use crate::error::{Error, Result};
26
27/// A durable key/value store for framework state snapshots.
28///
29/// Implementors persist opaque [`serde_json::Value`] payloads keyed by a
30/// string. The framework calls [`save`](StateStore::save) after every
31/// realised trade and on graceful shutdown, and [`load`](StateStore::load)
32/// once per symbol at startup. Keys are stable for the lifetime of a
33/// `(bot, symbol)` pair.
34///
35/// # Object-safe + async
36///
37/// `async_trait` is used so `Arc<dyn StateStore>` works — the facade holds
38/// the store behind a trait object and never names the concrete backend.
39///
40/// # Example
41///
42/// A trivial echo store backed by a `Mutex<HashMap>` — this is exactly
43/// what [`InMemoryStore`] does. Real backends write to disk or a database.
44///
45/// ```
46/// use std::collections::HashMap;
47/// use std::sync::{Arc, Mutex};
48/// use async_trait::async_trait;
49/// use rustrade_core::{Result, StateStore};
50///
51/// #[derive(Default)]
52/// struct MapStore(Arc<Mutex<HashMap<String, serde_json::Value>>>);
53///
54/// #[async_trait]
55/// impl StateStore for MapStore {
56///     async fn load(&self, key: &str) -> Result<Option<serde_json::Value>> {
57///         Ok(self.0.lock().unwrap().get(key).cloned())
58///     }
59///     async fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
60///         self.0.lock().unwrap().insert(key.to_string(), value);
61///         Ok(())
62///     }
63/// }
64/// ```
65#[async_trait]
66pub trait StateStore: Send + Sync + 'static {
67    /// Load the snapshot stored under `key`, or `None` if there is none.
68    ///
69    /// A missing key is **not** an error — first-ever boots return `None`.
70    /// Reserve [`Error::Storage`] for genuine backend failures (I/O,
71    /// deserialization of a corrupt blob).
72    async fn load(&self, key: &str) -> Result<Option<serde_json::Value>>;
73
74    /// Persist `value` under `key`, overwriting any previous snapshot.
75    async fn save(&self, key: &str, value: serde_json::Value) -> Result<()>;
76
77    /// Flush any buffered writes to durable storage.
78    ///
79    /// The default is a no-op for stores that write through on every
80    /// [`save`](StateStore::save). Buffered backends override it; the
81    /// framework calls it once on graceful shutdown.
82    async fn flush(&self) -> Result<()> {
83        Ok(())
84    }
85}
86
87/// Non-durable [`StateStore`] backed by an in-memory map.
88///
89/// This is the default the framework uses when no store is configured: it
90/// keeps snapshots only for the lifetime of the process, so it does **not**
91/// survive a restart. It is useful as an explicit default, in tests, and
92/// as a reference implementation. For real restart durability, wire a
93/// disk- or database-backed store via `Bot::with_state_store`.
94///
95/// Cheaply cloneable — clones share the same underlying map (an `Arc`),
96/// so two handles to the same `InMemoryStore` see each other's writes.
97#[derive(Debug, Clone, Default)]
98pub struct InMemoryStore {
99    inner: Arc<Mutex<HashMap<String, serde_json::Value>>>,
100}
101
102impl InMemoryStore {
103    /// Create an empty in-memory store.
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Number of keys currently held.
109    pub fn len(&self) -> usize {
110        self.inner.lock().expect("InMemoryStore poisoned").len()
111    }
112
113    /// `true` when no keys are held.
114    pub fn is_empty(&self) -> bool {
115        self.len() == 0
116    }
117}
118
119#[async_trait]
120impl StateStore for InMemoryStore {
121    async fn load(&self, key: &str) -> Result<Option<serde_json::Value>> {
122        Ok(self
123            .inner
124            .lock()
125            .map_err(|_| Error::Storage("InMemoryStore mutex poisoned".into()))?
126            .get(key)
127            .cloned())
128    }
129
130    async fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
131        self.inner
132            .lock()
133            .map_err(|_| Error::Storage("InMemoryStore mutex poisoned".into()))?
134            .insert(key.to_string(), value);
135        Ok(())
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[tokio::test]
144    async fn load_missing_key_returns_none() {
145        let store = InMemoryStore::new();
146        assert!(store.load("nope").await.unwrap().is_none());
147        assert!(store.is_empty());
148    }
149
150    #[tokio::test]
151    async fn save_then_load_roundtrips() {
152        let store = InMemoryStore::new();
153        let val = serde_json::json!({"realised": 12.5, "halted": true});
154        store.save("bot/BTCUSDT", val.clone()).await.unwrap();
155        assert_eq!(store.load("bot/BTCUSDT").await.unwrap(), Some(val));
156        assert_eq!(store.len(), 1);
157    }
158
159    #[tokio::test]
160    async fn save_overwrites_previous() {
161        let store = InMemoryStore::new();
162        store.save("k", serde_json::json!(1)).await.unwrap();
163        store.save("k", serde_json::json!(2)).await.unwrap();
164        assert_eq!(store.load("k").await.unwrap(), Some(serde_json::json!(2)));
165        assert_eq!(store.len(), 1);
166    }
167
168    #[tokio::test]
169    async fn clones_share_state() {
170        let a = InMemoryStore::new();
171        let b = a.clone();
172        a.save("k", serde_json::json!("v")).await.unwrap();
173        // b sees a's write — same backing Arc.
174        assert_eq!(b.load("k").await.unwrap(), Some(serde_json::json!("v")));
175    }
176
177    #[tokio::test]
178    async fn flush_default_is_ok() {
179        let store = InMemoryStore::new();
180        assert!(store.flush().await.is_ok());
181    }
182}