rustrade_core/store.rs
1//! Persistence contract for framework state that must survive restarts.
2//!
3//! In 0.1 the risk primitives ([`SessionPnl`], [`CircuitBreaker`] in
4//! `rustrade-risk`) live entirely in memory, so a crash mid-session
5//! resets the daily drawdown cap and the loss-streak breaker. A
6//! [`StateStore`] lets the framework snapshot that state and restore it
7//! on the next boot.
8//!
9//! The trait is deliberately payload-agnostic: it stores opaque,
10//! versioned [`serde_json::Value`] blobs keyed by a string. The framework
11//! (the `rustrade` facade) owns the snapshot schema and the key layout;
12//! a backend only has to persist bytes durably. This keeps `rustrade-core`
13//! free of any real I/O — concrete durable backends (a JSON file, sqlite,
14//! Postgres, Redis, …) live in downstream crates and only need to depend
15//! on this trait.
16//!
17//! [`SessionPnl`]: https://docs.rs/rustrade-risk/latest/rustrade_risk/session_pnl/struct.SessionPnl.html
18//! [`CircuitBreaker`]: https://docs.rs/rustrade-risk/latest/rustrade_risk/circuit_breaker/struct.CircuitBreaker.html
19
20use std::collections::HashMap;
21use std::sync::{Arc, Mutex};
22
23use async_trait::async_trait;
24
25use crate::error::{Error, Result};
26
27/// A durable key/value store for framework state snapshots.
28///
29/// Implementors persist opaque [`serde_json::Value`] payloads keyed by a
30/// string. The framework calls [`save`](StateStore::save) after every
31/// realised trade and on graceful shutdown, and [`load`](StateStore::load)
32/// once per symbol at startup. Keys are stable for the lifetime of a
33/// `(bot, symbol)` pair.
34///
35/// # Object-safe + async
36///
37/// `async_trait` is used so `Arc<dyn StateStore>` works — the facade holds
38/// the store behind a trait object and never names the concrete backend.
39///
40/// # Example
41///
42/// A trivial echo store backed by a `Mutex<HashMap>` — this is exactly
43/// what [`InMemoryStore`] does. Real backends write to disk or a database.
44///
45/// ```
46/// use std::collections::HashMap;
47/// use std::sync::{Arc, Mutex};
48/// use async_trait::async_trait;
49/// use rustrade_core::{Result, StateStore};
50///
51/// #[derive(Default)]
52/// struct MapStore(Arc<Mutex<HashMap<String, serde_json::Value>>>);
53///
54/// #[async_trait]
55/// impl StateStore for MapStore {
56/// async fn load(&self, key: &str) -> Result<Option<serde_json::Value>> {
57/// Ok(self.0.lock().unwrap().get(key).cloned())
58/// }
59/// async fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
60/// self.0.lock().unwrap().insert(key.to_string(), value);
61/// Ok(())
62/// }
63/// }
64/// ```
65#[async_trait]
66pub trait StateStore: Send + Sync + 'static {
67 /// Load the snapshot stored under `key`, or `None` if there is none.
68 ///
69 /// A missing key is **not** an error — first-ever boots return `None`.
70 /// Reserve [`Error::Storage`] for genuine backend failures (I/O,
71 /// deserialization of a corrupt blob).
72 async fn load(&self, key: &str) -> Result<Option<serde_json::Value>>;
73
74 /// Persist `value` under `key`, overwriting any previous snapshot.
75 async fn save(&self, key: &str, value: serde_json::Value) -> Result<()>;
76
77 /// Flush any buffered writes to durable storage.
78 ///
79 /// The default is a no-op for stores that write through on every
80 /// [`save`](StateStore::save). Buffered backends override it; the
81 /// framework calls it once on graceful shutdown.
82 async fn flush(&self) -> Result<()> {
83 Ok(())
84 }
85}
86
87/// Non-durable [`StateStore`] backed by an in-memory map.
88///
89/// This is the default the framework uses when no store is configured: it
90/// keeps snapshots only for the lifetime of the process, so it does **not**
91/// survive a restart. It is useful as an explicit default, in tests, and
92/// as a reference implementation. For real restart durability, wire a
93/// disk- or database-backed store via `Bot::with_state_store`.
94///
95/// Cheaply cloneable — clones share the same underlying map (an `Arc`),
96/// so two handles to the same `InMemoryStore` see each other's writes.
97#[derive(Debug, Clone, Default)]
98pub struct InMemoryStore {
99 inner: Arc<Mutex<HashMap<String, serde_json::Value>>>,
100}
101
102impl InMemoryStore {
103 /// Create an empty in-memory store.
104 pub fn new() -> Self {
105 Self::default()
106 }
107
108 /// Number of keys currently held.
109 pub fn len(&self) -> usize {
110 self.inner.lock().expect("InMemoryStore poisoned").len()
111 }
112
113 /// `true` when no keys are held.
114 pub fn is_empty(&self) -> bool {
115 self.len() == 0
116 }
117}
118
119#[async_trait]
120impl StateStore for InMemoryStore {
121 async fn load(&self, key: &str) -> Result<Option<serde_json::Value>> {
122 Ok(self
123 .inner
124 .lock()
125 .map_err(|_| Error::Storage("InMemoryStore mutex poisoned".into()))?
126 .get(key)
127 .cloned())
128 }
129
130 async fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
131 self.inner
132 .lock()
133 .map_err(|_| Error::Storage("InMemoryStore mutex poisoned".into()))?
134 .insert(key.to_string(), value);
135 Ok(())
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[tokio::test]
144 async fn load_missing_key_returns_none() {
145 let store = InMemoryStore::new();
146 assert!(store.load("nope").await.unwrap().is_none());
147 assert!(store.is_empty());
148 }
149
150 #[tokio::test]
151 async fn save_then_load_roundtrips() {
152 let store = InMemoryStore::new();
153 let val = serde_json::json!({"realised": 12.5, "halted": true});
154 store.save("bot/BTCUSDT", val.clone()).await.unwrap();
155 assert_eq!(store.load("bot/BTCUSDT").await.unwrap(), Some(val));
156 assert_eq!(store.len(), 1);
157 }
158
159 #[tokio::test]
160 async fn save_overwrites_previous() {
161 let store = InMemoryStore::new();
162 store.save("k", serde_json::json!(1)).await.unwrap();
163 store.save("k", serde_json::json!(2)).await.unwrap();
164 assert_eq!(store.load("k").await.unwrap(), Some(serde_json::json!(2)));
165 assert_eq!(store.len(), 1);
166 }
167
168 #[tokio::test]
169 async fn clones_share_state() {
170 let a = InMemoryStore::new();
171 let b = a.clone();
172 a.save("k", serde_json::json!("v")).await.unwrap();
173 // b sees a's write — same backing Arc.
174 assert_eq!(b.load("k").await.unwrap(), Some(serde_json::json!("v")));
175 }
176
177 #[tokio::test]
178 async fn flush_default_is_ok() {
179 let store = InMemoryStore::new();
180 assert!(store.flush().await.is_ok());
181 }
182}