1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use xenith_core::{
9 ChainId, KeyMetadata, Result, StateKey, StateStore, StateValue, StateVersion, XenithError,
10};
11
12fn bytes_to_hex(b: &[u8]) -> String {
15 b.iter().map(|x| format!("{x:02x}")).collect()
16}
17
18fn hex_to_bytes(s: &str) -> Result<Vec<u8>> {
19 if !s.len().is_multiple_of(2) {
20 return Err(XenithError::StoreError("odd-length hex string".into()));
21 }
22 (0..s.len())
23 .step_by(2)
24 .map(|i| {
25 u8::from_str_radix(&s[i..i + 2], 16)
26 .map_err(|e| XenithError::StoreError(format!("hex decode error: {e}")))
27 })
28 .collect()
29}
30
31fn hex_to_array20(s: &str) -> Result<[u8; 20]> {
32 let v = hex_to_bytes(s)?;
33 v.try_into()
34 .map_err(|_| XenithError::StoreError(format!("expected 20 bytes, got {}", s.len() / 2)))
35}
36
37fn hex_to_array32(s: &str) -> Result<[u8; 32]> {
38 let v = hex_to_bytes(s)?;
39 v.try_into()
40 .map_err(|_| XenithError::StoreError(format!("expected 32 bytes, got {}", s.len() / 2)))
41}
42
43#[derive(Serialize, Deserialize, Clone)]
46struct StoredEntry {
47 data: String,
48 timestamp_ms: u64,
49 sequence: u64,
50 version_source_chain: u64,
51 updated_at: u64,
52 source_chain: u64,
53}
54
55#[derive(Serialize, Deserialize, Clone, Default)]
56struct StoredMetadata {
57 address: Option<String>,
58 slot: Option<String>,
59}
60
61#[derive(Serialize, Deserialize, Default)]
62struct StoreFile {
63 values: HashMap<String, StoredEntry>,
64 metadata: HashMap<String, StoredMetadata>,
65}
66
67fn to_stored_entry(value: &StateValue) -> StoredEntry {
70 StoredEntry {
71 data: bytes_to_hex(&value.data),
72 timestamp_ms: value.version.timestamp_ms,
73 sequence: value.version.sequence,
74 version_source_chain: value.version.source_chain,
75 updated_at: value.updated_at,
76 source_chain: value.source_chain.0,
77 }
78}
79
80fn from_stored_entry(entry: StoredEntry) -> Result<StateValue> {
81 let raw = hex_to_bytes(&entry.data)?;
82 Ok(StateValue {
83 data: Bytes::from(raw),
84 version: StateVersion {
85 timestamp_ms: entry.timestamp_ms,
86 sequence: entry.sequence,
87 source_chain: entry.version_source_chain,
88 },
89 updated_at: entry.updated_at,
90 source_chain: ChainId(entry.source_chain),
91 })
92}
93
94fn to_stored_metadata(meta: &KeyMetadata) -> StoredMetadata {
95 StoredMetadata {
96 address: meta.address.as_ref().map(|a| bytes_to_hex(a)),
97 slot: meta.slot.as_ref().map(|s| bytes_to_hex(s)),
98 }
99}
100
101fn from_stored_metadata(meta: StoredMetadata) -> Result<KeyMetadata> {
102 let address = meta.address.as_deref().map(hex_to_array20).transpose()?;
103 let slot = meta.slot.as_deref().map(hex_to_array32).transpose()?;
104 Ok(KeyMetadata { address, slot })
105}
106
107pub struct JsonFileStore {
129 path: PathBuf,
130 state: Arc<Mutex<StoreFile>>,
131}
132
133impl JsonFileStore {
134 pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
139 let path = path.into();
140 let state = if path.exists() {
141 let content = std::fs::read_to_string(&path)
142 .map_err(|e| XenithError::StoreError(format!("read store file: {e}")))?;
143 serde_json::from_str(&content)
144 .map_err(|e| XenithError::StoreError(format!("parse store file: {e}")))?
145 } else {
146 StoreFile::default()
147 };
148 Ok(Self {
149 path,
150 state: Arc::new(Mutex::new(state)),
151 })
152 }
153
154 fn flush(&self) -> Result<()> {
159 let json = {
160 let state = self
161 .state
162 .lock()
163 .map_err(|_| XenithError::StoreError("store lock poisoned".into()))?;
164 serde_json::to_string_pretty(&*state)
165 .map_err(|e| XenithError::StoreError(format!("serialize store: {e}")))?
166 };
167 let tmp = self.path.with_extension("tmp");
169 std::fs::write(&tmp, &json)
170 .map_err(|e| XenithError::StoreError(format!("write tmp store file: {e}")))?;
171 std::fs::rename(&tmp, &self.path)
172 .map_err(|e| XenithError::StoreError(format!("rename store file: {e}")))?;
173 Ok(())
174 }
175}
176
177#[async_trait]
178impl StateStore for JsonFileStore {
179 async fn get(&self, key: &StateKey) -> Result<Option<StateValue>> {
180 let state = self
181 .state
182 .lock()
183 .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
184 state
185 .values
186 .get(key.as_ref())
187 .cloned()
188 .map(from_stored_entry)
189 .transpose()
190 }
191
192 async fn set(&self, key: &StateKey, value: StateValue) -> Result<()> {
193 {
194 let mut state = self
195 .state
196 .lock()
197 .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
198 state
199 .values
200 .insert(key.as_ref().to_owned(), to_stored_entry(&value));
201 }
202 self.flush()
203 }
204
205 async fn delete(&self, key: &StateKey) -> Result<()> {
206 {
207 let mut state = self
208 .state
209 .lock()
210 .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
211 state.values.remove(key.as_ref());
212 }
213 self.flush()
214 }
215
216 async fn list_prefix(&self, prefix: &str) -> Result<Vec<StateKey>> {
217 let state = self
218 .state
219 .lock()
220 .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
221 let mut keys: Vec<StateKey> = state
222 .values
223 .keys()
224 .filter(|k| k.starts_with(prefix))
225 .map(|k| StateKey::from_raw(k.clone()))
226 .collect();
227 keys.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
228 Ok(keys)
229 }
230
231 async fn get_metadata(&self, key: &StateKey) -> Result<Option<KeyMetadata>> {
232 let state = self
233 .state
234 .lock()
235 .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
236 state
237 .metadata
238 .get(key.as_ref())
239 .cloned()
240 .map(from_stored_metadata)
241 .transpose()
242 }
243
244 async fn set_metadata(&self, key: &StateKey, meta: KeyMetadata) -> Result<()> {
245 {
246 let mut state = self
247 .state
248 .lock()
249 .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
250 state
251 .metadata
252 .insert(key.as_ref().to_owned(), to_stored_metadata(&meta));
253 }
254 self.flush()
255 }
256}
257
258#[cfg(test)]
261mod tests {
262 use super::*;
263 use xenith_core::StateVersion;
264
265 fn temp_path() -> PathBuf {
266 let mut p = std::env::temp_dir();
267 p.push(format!(
268 "xenith-test-{}.json",
269 std::time::SystemTime::now()
270 .duration_since(std::time::UNIX_EPOCH)
271 .unwrap()
272 .subsec_nanos()
273 ));
274 p
275 }
276
277 fn sample_value(ts: u64) -> StateValue {
278 StateValue {
279 data: Bytes::from(format!("data-{ts}")),
280 version: StateVersion {
281 timestamp_ms: ts,
282 sequence: 0,
283 source_chain: 1,
284 },
285 updated_at: ts / 1000,
286 source_chain: ChainId(1),
287 }
288 }
289
290 #[tokio::test]
291 async fn test_json_store_persists_across_instances() {
292 let path = temp_path();
293 let key = StateKey::new("proto", "pool", "0xabc");
294 let value = sample_value(1_700_000_000_000);
295
296 {
297 let store = JsonFileStore::new(&path).unwrap();
298 store.set(&key, value.clone()).await.unwrap();
299 }
300
301 let store2 = JsonFileStore::new(&path).unwrap();
303 let loaded = store2.get(&key).await.unwrap().expect("value must persist");
304 assert_eq!(loaded.data, value.data);
305 assert_eq!(loaded.version.timestamp_ms, value.version.timestamp_ms);
306 assert_eq!(loaded.source_chain, value.source_chain);
307
308 let _ = std::fs::remove_file(&path);
309 }
310
311 #[tokio::test]
312 async fn test_json_store_atomic_write() {
313 let path = temp_path();
314 let store = JsonFileStore::new(&path).unwrap();
315
316 for i in 0u64..5 {
317 let key = StateKey::new("proto", "pool", &format!("0x{i:04x}"));
318 store.set(&key, sample_value(i * 1000)).await.unwrap();
319
320 let content = std::fs::read_to_string(&path).unwrap();
322 assert!(
323 serde_json::from_str::<serde_json::Value>(&content).is_ok(),
324 "file must be valid JSON after write {i}"
325 );
326 }
327
328 let _ = std::fs::remove_file(&path);
329 }
330
331 #[tokio::test]
332 async fn test_json_store_metadata_roundtrip() {
333 let path = temp_path();
334 let key = StateKey::new("proto", "pool", "0xabc");
335 let meta = KeyMetadata {
336 address: Some([0xABu8; 20]),
337 slot: Some([0xCDu8; 32]),
338 };
339
340 {
341 let store = JsonFileStore::new(&path).unwrap();
342 store.set(&key, sample_value(1)).await.unwrap();
343 store.set_metadata(&key, meta.clone()).await.unwrap();
344 }
345
346 let store2 = JsonFileStore::new(&path).unwrap();
347 let loaded = store2
348 .get_metadata(&key)
349 .await
350 .unwrap()
351 .expect("metadata must persist");
352 assert_eq!(loaded.address, Some([0xABu8; 20]));
353 assert_eq!(loaded.slot, Some([0xCDu8; 32]));
354
355 let _ = std::fs::remove_file(&path);
356 }
357}