Skip to main content

xenith_sync/
json_store.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use xenith_core::{
9    ChainId, KeyMetadata, Result, StateKey, StateStore, StateValue, StateVersion, XenithError,
10};
11
12// ── Serialization helpers ─────────────────────────────────────────────────────
13
14fn bytes_to_hex(b: &[u8]) -> String {
15    b.iter().map(|x| format!("{x:02x}")).collect()
16}
17
18fn hex_to_bytes(s: &str) -> Result<Vec<u8>> {
19    if !s.len().is_multiple_of(2) {
20        return Err(XenithError::StoreError("odd-length hex string".into()));
21    }
22    (0..s.len())
23        .step_by(2)
24        .map(|i| {
25            u8::from_str_radix(&s[i..i + 2], 16)
26                .map_err(|e| XenithError::StoreError(format!("hex decode error: {e}")))
27        })
28        .collect()
29}
30
31fn hex_to_array20(s: &str) -> Result<[u8; 20]> {
32    let v = hex_to_bytes(s)?;
33    v.try_into()
34        .map_err(|_| XenithError::StoreError(format!("expected 20 bytes, got {}", s.len() / 2)))
35}
36
37fn hex_to_array32(s: &str) -> Result<[u8; 32]> {
38    let v = hex_to_bytes(s)?;
39    v.try_into()
40        .map_err(|_| XenithError::StoreError(format!("expected 32 bytes, got {}", s.len() / 2)))
41}
42
43// ── On-disk types ─────────────────────────────────────────────────────────────
44
45#[derive(Serialize, Deserialize, Clone)]
46struct StoredEntry {
47    data: String,
48    timestamp_ms: u64,
49    sequence: u64,
50    version_source_chain: u64,
51    updated_at: u64,
52    source_chain: u64,
53}
54
55#[derive(Serialize, Deserialize, Clone, Default)]
56struct StoredMetadata {
57    address: Option<String>,
58    slot: Option<String>,
59}
60
61#[derive(Serialize, Deserialize, Default)]
62struct StoreFile {
63    values: HashMap<String, StoredEntry>,
64    metadata: HashMap<String, StoredMetadata>,
65}
66
67// ── Conversion helpers ────────────────────────────────────────────────────────
68
69fn to_stored_entry(value: &StateValue) -> StoredEntry {
70    StoredEntry {
71        data: bytes_to_hex(&value.data),
72        timestamp_ms: value.version.timestamp_ms,
73        sequence: value.version.sequence,
74        version_source_chain: value.version.source_chain,
75        updated_at: value.updated_at,
76        source_chain: value.source_chain.0,
77    }
78}
79
80fn from_stored_entry(entry: StoredEntry) -> Result<StateValue> {
81    let raw = hex_to_bytes(&entry.data)?;
82    Ok(StateValue {
83        data: Bytes::from(raw),
84        version: StateVersion {
85            timestamp_ms: entry.timestamp_ms,
86            sequence: entry.sequence,
87            source_chain: entry.version_source_chain,
88        },
89        updated_at: entry.updated_at,
90        source_chain: ChainId(entry.source_chain),
91    })
92}
93
94fn to_stored_metadata(meta: &KeyMetadata) -> StoredMetadata {
95    StoredMetadata {
96        address: meta.address.as_ref().map(|a| bytes_to_hex(a)),
97        slot: meta.slot.as_ref().map(|s| bytes_to_hex(s)),
98    }
99}
100
101fn from_stored_metadata(meta: StoredMetadata) -> Result<KeyMetadata> {
102    let address = meta.address.as_deref().map(hex_to_array20).transpose()?;
103    let slot = meta.slot.as_deref().map(hex_to_array32).transpose()?;
104    Ok(KeyMetadata { address, slot })
105}
106
107// ── JsonFileStore ─────────────────────────────────────────────────────────────
108
109/// A file-backed store that persists state across restarts.
110///
111/// Uses atomic file rename to prevent corruption on crash: every write serialises
112/// to a `.tmp` file first, then renames it over the target path.
113///
114/// Suitable for single-process bot operators. For multi-process or high-throughput
115/// use cases, see `RocksDbStore` (v0.2).
116///
117/// # Example
118///
119/// ```rust,no_run
120/// use xenith_sync::JsonFileStore;
121/// use xenith_core::StateStore;
122///
123/// # async fn example() -> xenith_core::Result<()> {
124/// let store = JsonFileStore::new("/tmp/xenith-state.json")?;
125/// # Ok(())
126/// # }
127/// ```
128pub struct JsonFileStore {
129    path: PathBuf,
130    state: Arc<Mutex<StoreFile>>,
131}
132
133impl JsonFileStore {
134    /// Open or create a JSON file store at `path`.
135    ///
136    /// If the file exists it is parsed on construction. If it does not exist an
137    /// empty store is initialised (the file is created on the first write).
138    pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
139        let path = path.into();
140        let state = if path.exists() {
141            let content = std::fs::read_to_string(&path)
142                .map_err(|e| XenithError::StoreError(format!("read store file: {e}")))?;
143            serde_json::from_str(&content)
144                .map_err(|e| XenithError::StoreError(format!("parse store file: {e}")))?
145        } else {
146            StoreFile::default()
147        };
148        Ok(Self {
149            path,
150            state: Arc::new(Mutex::new(state)),
151        })
152    }
153
154    /// Atomically flush the in-memory state to disk.
155    ///
156    /// Serialises to a `.tmp` file then renames it over the target path so a
157    /// crash mid-write cannot corrupt the store.
158    fn flush(&self) -> Result<()> {
159        let json = {
160            let state = self
161                .state
162                .lock()
163                .map_err(|_| XenithError::StoreError("store lock poisoned".into()))?;
164            serde_json::to_string_pretty(&*state)
165                .map_err(|e| XenithError::StoreError(format!("serialize store: {e}")))?
166        };
167        // I/O happens outside the lock.
168        let tmp = self.path.with_extension("tmp");
169        std::fs::write(&tmp, &json)
170            .map_err(|e| XenithError::StoreError(format!("write tmp store file: {e}")))?;
171        std::fs::rename(&tmp, &self.path)
172            .map_err(|e| XenithError::StoreError(format!("rename store file: {e}")))?;
173        Ok(())
174    }
175}
176
177#[async_trait]
178impl StateStore for JsonFileStore {
179    async fn get(&self, key: &StateKey) -> Result<Option<StateValue>> {
180        let state = self
181            .state
182            .lock()
183            .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
184        state
185            .values
186            .get(key.as_ref())
187            .cloned()
188            .map(from_stored_entry)
189            .transpose()
190    }
191
192    async fn set(&self, key: &StateKey, value: StateValue) -> Result<()> {
193        {
194            let mut state = self
195                .state
196                .lock()
197                .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
198            state
199                .values
200                .insert(key.as_ref().to_owned(), to_stored_entry(&value));
201        }
202        self.flush()
203    }
204
205    async fn delete(&self, key: &StateKey) -> Result<()> {
206        {
207            let mut state = self
208                .state
209                .lock()
210                .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
211            state.values.remove(key.as_ref());
212        }
213        self.flush()
214    }
215
216    async fn list_prefix(&self, prefix: &str) -> Result<Vec<StateKey>> {
217        let state = self
218            .state
219            .lock()
220            .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
221        let mut keys: Vec<StateKey> = state
222            .values
223            .keys()
224            .filter(|k| k.starts_with(prefix))
225            .map(|k| StateKey::from_raw(k.clone()))
226            .collect();
227        keys.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
228        Ok(keys)
229    }
230
231    async fn get_metadata(&self, key: &StateKey) -> Result<Option<KeyMetadata>> {
232        let state = self
233            .state
234            .lock()
235            .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
236        state
237            .metadata
238            .get(key.as_ref())
239            .cloned()
240            .map(from_stored_metadata)
241            .transpose()
242    }
243
244    async fn set_metadata(&self, key: &StateKey, meta: KeyMetadata) -> Result<()> {
245        {
246            let mut state = self
247                .state
248                .lock()
249                .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
250            state
251                .metadata
252                .insert(key.as_ref().to_owned(), to_stored_metadata(&meta));
253        }
254        self.flush()
255    }
256}
257
258// ── Tests ─────────────────────────────────────────────────────────────────────
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use xenith_core::StateVersion;
264
265    fn temp_path() -> PathBuf {
266        let mut p = std::env::temp_dir();
267        p.push(format!(
268            "xenith-test-{}.json",
269            std::time::SystemTime::now()
270                .duration_since(std::time::UNIX_EPOCH)
271                .unwrap()
272                .subsec_nanos()
273        ));
274        p
275    }
276
277    fn sample_value(ts: u64) -> StateValue {
278        StateValue {
279            data: Bytes::from(format!("data-{ts}")),
280            version: StateVersion {
281                timestamp_ms: ts,
282                sequence: 0,
283                source_chain: 1,
284            },
285            updated_at: ts / 1000,
286            source_chain: ChainId(1),
287        }
288    }
289
290    #[tokio::test]
291    async fn test_json_store_persists_across_instances() {
292        let path = temp_path();
293        let key = StateKey::new("proto", "pool", "0xabc");
294        let value = sample_value(1_700_000_000_000);
295
296        {
297            let store = JsonFileStore::new(&path).unwrap();
298            store.set(&key, value.clone()).await.unwrap();
299        }
300
301        // Reload from disk.
302        let store2 = JsonFileStore::new(&path).unwrap();
303        let loaded = store2.get(&key).await.unwrap().expect("value must persist");
304        assert_eq!(loaded.data, value.data);
305        assert_eq!(loaded.version.timestamp_ms, value.version.timestamp_ms);
306        assert_eq!(loaded.source_chain, value.source_chain);
307
308        let _ = std::fs::remove_file(&path);
309    }
310
311    #[tokio::test]
312    async fn test_json_store_atomic_write() {
313        let path = temp_path();
314        let store = JsonFileStore::new(&path).unwrap();
315
316        for i in 0u64..5 {
317            let key = StateKey::new("proto", "pool", &format!("0x{i:04x}"));
318            store.set(&key, sample_value(i * 1000)).await.unwrap();
319
320            // After every write the file must be valid JSON.
321            let content = std::fs::read_to_string(&path).unwrap();
322            assert!(
323                serde_json::from_str::<serde_json::Value>(&content).is_ok(),
324                "file must be valid JSON after write {i}"
325            );
326        }
327
328        let _ = std::fs::remove_file(&path);
329    }
330
331    #[tokio::test]
332    async fn test_json_store_metadata_roundtrip() {
333        let path = temp_path();
334        let key = StateKey::new("proto", "pool", "0xabc");
335        let meta = KeyMetadata {
336            address: Some([0xABu8; 20]),
337            slot: Some([0xCDu8; 32]),
338        };
339
340        {
341            let store = JsonFileStore::new(&path).unwrap();
342            store.set(&key, sample_value(1)).await.unwrap();
343            store.set_metadata(&key, meta.clone()).await.unwrap();
344        }
345
346        let store2 = JsonFileStore::new(&path).unwrap();
347        let loaded = store2
348            .get_metadata(&key)
349            .await
350            .unwrap()
351            .expect("metadata must persist");
352        assert_eq!(loaded.address, Some([0xABu8; 20]));
353        assert_eq!(loaded.slot, Some([0xCDu8; 32]));
354
355        let _ = std::fs::remove_file(&path);
356    }
357}