use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use xenith_core::{
ChainId, KeyMetadata, Result, StateKey, StateStore, StateValue, StateVersion, XenithError,
};
fn bytes_to_hex(b: &[u8]) -> String {
b.iter().map(|x| format!("{x:02x}")).collect()
}
fn hex_to_bytes(s: &str) -> Result<Vec<u8>> {
if !s.len().is_multiple_of(2) {
return Err(XenithError::StoreError("odd-length hex string".into()));
}
(0..s.len())
.step_by(2)
.map(|i| {
u8::from_str_radix(&s[i..i + 2], 16)
.map_err(|e| XenithError::StoreError(format!("hex decode error: {e}")))
})
.collect()
}
fn hex_to_array20(s: &str) -> Result<[u8; 20]> {
let v = hex_to_bytes(s)?;
v.try_into()
.map_err(|_| XenithError::StoreError(format!("expected 20 bytes, got {}", s.len() / 2)))
}
fn hex_to_array32(s: &str) -> Result<[u8; 32]> {
let v = hex_to_bytes(s)?;
v.try_into()
.map_err(|_| XenithError::StoreError(format!("expected 32 bytes, got {}", s.len() / 2)))
}
#[derive(Serialize, Deserialize, Clone)]
struct StoredEntry {
data: String,
timestamp_ms: u64,
sequence: u64,
version_source_chain: u64,
updated_at: u64,
source_chain: u64,
}
#[derive(Serialize, Deserialize, Clone, Default)]
struct StoredMetadata {
address: Option<String>,
slot: Option<String>,
}
#[derive(Serialize, Deserialize, Default)]
struct StoreFile {
values: HashMap<String, StoredEntry>,
metadata: HashMap<String, StoredMetadata>,
}
fn to_stored_entry(value: &StateValue) -> StoredEntry {
StoredEntry {
data: bytes_to_hex(&value.data),
timestamp_ms: value.version.timestamp_ms,
sequence: value.version.sequence,
version_source_chain: value.version.source_chain,
updated_at: value.updated_at,
source_chain: value.source_chain.0,
}
}
fn from_stored_entry(entry: StoredEntry) -> Result<StateValue> {
let raw = hex_to_bytes(&entry.data)?;
Ok(StateValue {
data: Bytes::from(raw),
version: StateVersion {
timestamp_ms: entry.timestamp_ms,
sequence: entry.sequence,
source_chain: entry.version_source_chain,
},
updated_at: entry.updated_at,
source_chain: ChainId(entry.source_chain),
})
}
fn to_stored_metadata(meta: &KeyMetadata) -> StoredMetadata {
StoredMetadata {
address: meta.address.as_ref().map(|a| bytes_to_hex(a)),
slot: meta.slot.as_ref().map(|s| bytes_to_hex(s)),
}
}
fn from_stored_metadata(meta: StoredMetadata) -> Result<KeyMetadata> {
let address = meta.address.as_deref().map(hex_to_array20).transpose()?;
let slot = meta.slot.as_deref().map(hex_to_array32).transpose()?;
Ok(KeyMetadata { address, slot })
}
pub struct JsonFileStore {
path: PathBuf,
state: Arc<Mutex<StoreFile>>,
}
impl JsonFileStore {
pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let state = if path.exists() {
let content = std::fs::read_to_string(&path)
.map_err(|e| XenithError::StoreError(format!("read store file: {e}")))?;
serde_json::from_str(&content)
.map_err(|e| XenithError::StoreError(format!("parse store file: {e}")))?
} else {
StoreFile::default()
};
Ok(Self {
path,
state: Arc::new(Mutex::new(state)),
})
}
fn flush(&self) -> Result<()> {
let json = {
let state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("store lock poisoned".into()))?;
serde_json::to_string_pretty(&*state)
.map_err(|e| XenithError::StoreError(format!("serialize store: {e}")))?
};
let tmp = self.path.with_extension("tmp");
std::fs::write(&tmp, &json)
.map_err(|e| XenithError::StoreError(format!("write tmp store file: {e}")))?;
std::fs::rename(&tmp, &self.path)
.map_err(|e| XenithError::StoreError(format!("rename store file: {e}")))?;
Ok(())
}
}
#[async_trait]
impl StateStore for JsonFileStore {
async fn get(&self, key: &StateKey) -> Result<Option<StateValue>> {
let state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
state
.values
.get(key.as_ref())
.cloned()
.map(from_stored_entry)
.transpose()
}
async fn set(&self, key: &StateKey, value: StateValue) -> Result<()> {
{
let mut state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
state
.values
.insert(key.as_ref().to_owned(), to_stored_entry(&value));
}
self.flush()
}
async fn delete(&self, key: &StateKey) -> Result<()> {
{
let mut state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
state.values.remove(key.as_ref());
}
self.flush()
}
async fn list_prefix(&self, prefix: &str) -> Result<Vec<StateKey>> {
let state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
let mut keys: Vec<StateKey> = state
.values
.keys()
.filter(|k| k.starts_with(prefix))
.map(|k| StateKey::from_raw(k.clone()))
.collect();
keys.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
Ok(keys)
}
async fn get_metadata(&self, key: &StateKey) -> Result<Option<KeyMetadata>> {
let state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
state
.metadata
.get(key.as_ref())
.cloned()
.map(from_stored_metadata)
.transpose()
}
async fn set_metadata(&self, key: &StateKey, meta: KeyMetadata) -> Result<()> {
{
let mut state = self
.state
.lock()
.map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
state
.metadata
.insert(key.as_ref().to_owned(), to_stored_metadata(&meta));
}
self.flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use xenith_core::StateVersion;
fn temp_path() -> PathBuf {
let mut p = std::env::temp_dir();
p.push(format!(
"xenith-test-{}.json",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.subsec_nanos()
));
p
}
fn sample_value(ts: u64) -> StateValue {
StateValue {
data: Bytes::from(format!("data-{ts}")),
version: StateVersion {
timestamp_ms: ts,
sequence: 0,
source_chain: 1,
},
updated_at: ts / 1000,
source_chain: ChainId(1),
}
}
#[tokio::test]
async fn test_json_store_persists_across_instances() {
let path = temp_path();
let key = StateKey::new("proto", "pool", "0xabc");
let value = sample_value(1_700_000_000_000);
{
let store = JsonFileStore::new(&path).unwrap();
store.set(&key, value.clone()).await.unwrap();
}
let store2 = JsonFileStore::new(&path).unwrap();
let loaded = store2.get(&key).await.unwrap().expect("value must persist");
assert_eq!(loaded.data, value.data);
assert_eq!(loaded.version.timestamp_ms, value.version.timestamp_ms);
assert_eq!(loaded.source_chain, value.source_chain);
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn test_json_store_atomic_write() {
let path = temp_path();
let store = JsonFileStore::new(&path).unwrap();
for i in 0u64..5 {
let key = StateKey::new("proto", "pool", &format!("0x{i:04x}"));
store.set(&key, sample_value(i * 1000)).await.unwrap();
let content = std::fs::read_to_string(&path).unwrap();
assert!(
serde_json::from_str::<serde_json::Value>(&content).is_ok(),
"file must be valid JSON after write {i}"
);
}
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn test_json_store_metadata_roundtrip() {
let path = temp_path();
let key = StateKey::new("proto", "pool", "0xabc");
let meta = KeyMetadata {
address: Some([0xABu8; 20]),
slot: Some([0xCDu8; 32]),
};
{
let store = JsonFileStore::new(&path).unwrap();
store.set(&key, sample_value(1)).await.unwrap();
store.set_metadata(&key, meta.clone()).await.unwrap();
}
let store2 = JsonFileStore::new(&path).unwrap();
let loaded = store2
.get_metadata(&key)
.await
.unwrap()
.expect("metadata must persist");
assert_eq!(loaded.address, Some([0xABu8; 20]));
assert_eq!(loaded.slot, Some([0xCDu8; 32]));
let _ = std::fs::remove_file(&path);
}
}