xenith-sync 0.1.0

State sync engine for xenith — push, read, and resolve across chains
Documentation
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use xenith_core::{
    ChainId, KeyMetadata, Result, StateKey, StateStore, StateValue, StateVersion, XenithError,
};

// ── Serialization helpers ─────────────────────────────────────────────────────

fn bytes_to_hex(b: &[u8]) -> String {
    b.iter().map(|x| format!("{x:02x}")).collect()
}

fn hex_to_bytes(s: &str) -> Result<Vec<u8>> {
    if !s.len().is_multiple_of(2) {
        return Err(XenithError::StoreError("odd-length hex string".into()));
    }
    (0..s.len())
        .step_by(2)
        .map(|i| {
            u8::from_str_radix(&s[i..i + 2], 16)
                .map_err(|e| XenithError::StoreError(format!("hex decode error: {e}")))
        })
        .collect()
}

fn hex_to_array20(s: &str) -> Result<[u8; 20]> {
    let v = hex_to_bytes(s)?;
    v.try_into()
        .map_err(|_| XenithError::StoreError(format!("expected 20 bytes, got {}", s.len() / 2)))
}

fn hex_to_array32(s: &str) -> Result<[u8; 32]> {
    let v = hex_to_bytes(s)?;
    v.try_into()
        .map_err(|_| XenithError::StoreError(format!("expected 32 bytes, got {}", s.len() / 2)))
}

// ── On-disk types ─────────────────────────────────────────────────────────────

#[derive(Serialize, Deserialize, Clone)]
struct StoredEntry {
    data: String,
    timestamp_ms: u64,
    sequence: u64,
    version_source_chain: u64,
    updated_at: u64,
    source_chain: u64,
}

#[derive(Serialize, Deserialize, Clone, Default)]
struct StoredMetadata {
    address: Option<String>,
    slot: Option<String>,
}

#[derive(Serialize, Deserialize, Default)]
struct StoreFile {
    values: HashMap<String, StoredEntry>,
    metadata: HashMap<String, StoredMetadata>,
}

// ── Conversion helpers ────────────────────────────────────────────────────────

fn to_stored_entry(value: &StateValue) -> StoredEntry {
    StoredEntry {
        data: bytes_to_hex(&value.data),
        timestamp_ms: value.version.timestamp_ms,
        sequence: value.version.sequence,
        version_source_chain: value.version.source_chain,
        updated_at: value.updated_at,
        source_chain: value.source_chain.0,
    }
}

fn from_stored_entry(entry: StoredEntry) -> Result<StateValue> {
    let raw = hex_to_bytes(&entry.data)?;
    Ok(StateValue {
        data: Bytes::from(raw),
        version: StateVersion {
            timestamp_ms: entry.timestamp_ms,
            sequence: entry.sequence,
            source_chain: entry.version_source_chain,
        },
        updated_at: entry.updated_at,
        source_chain: ChainId(entry.source_chain),
    })
}

fn to_stored_metadata(meta: &KeyMetadata) -> StoredMetadata {
    StoredMetadata {
        address: meta.address.as_ref().map(|a| bytes_to_hex(a)),
        slot: meta.slot.as_ref().map(|s| bytes_to_hex(s)),
    }
}

fn from_stored_metadata(meta: StoredMetadata) -> Result<KeyMetadata> {
    let address = meta.address.as_deref().map(hex_to_array20).transpose()?;
    let slot = meta.slot.as_deref().map(hex_to_array32).transpose()?;
    Ok(KeyMetadata { address, slot })
}

// ── JsonFileStore ─────────────────────────────────────────────────────────────

/// A file-backed store that persists state across restarts.
///
/// Uses atomic file rename to prevent corruption on crash: every write serialises
/// to a `.tmp` file first, then renames it over the target path.
///
/// Suitable for single-process bot operators. For multi-process or high-throughput
/// use cases, see `RocksDbStore` (v0.2).
///
/// # Example
///
/// ```rust,no_run
/// use xenith_sync::JsonFileStore;
/// use xenith_core::StateStore;
///
/// # async fn example() -> xenith_core::Result<()> {
/// let store = JsonFileStore::new("/tmp/xenith-state.json")?;
/// # Ok(())
/// # }
/// ```
pub struct JsonFileStore {
    path: PathBuf,
    state: Arc<Mutex<StoreFile>>,
}

impl JsonFileStore {
    /// Open or create a JSON file store at `path`.
    ///
    /// If the file exists it is parsed on construction. If it does not exist an
    /// empty store is initialised (the file is created on the first write).
    pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
        let path = path.into();
        let state = if path.exists() {
            let content = std::fs::read_to_string(&path)
                .map_err(|e| XenithError::StoreError(format!("read store file: {e}")))?;
            serde_json::from_str(&content)
                .map_err(|e| XenithError::StoreError(format!("parse store file: {e}")))?
        } else {
            StoreFile::default()
        };
        Ok(Self {
            path,
            state: Arc::new(Mutex::new(state)),
        })
    }

    /// Atomically flush the in-memory state to disk.
    ///
    /// Serialises to a `.tmp` file then renames it over the target path so a
    /// crash mid-write cannot corrupt the store.
    fn flush(&self) -> Result<()> {
        let json = {
            let state = self
                .state
                .lock()
                .map_err(|_| XenithError::StoreError("store lock poisoned".into()))?;
            serde_json::to_string_pretty(&*state)
                .map_err(|e| XenithError::StoreError(format!("serialize store: {e}")))?
        };
        // I/O happens outside the lock.
        let tmp = self.path.with_extension("tmp");
        std::fs::write(&tmp, &json)
            .map_err(|e| XenithError::StoreError(format!("write tmp store file: {e}")))?;
        std::fs::rename(&tmp, &self.path)
            .map_err(|e| XenithError::StoreError(format!("rename store file: {e}")))?;
        Ok(())
    }
}

#[async_trait]
impl StateStore for JsonFileStore {
    async fn get(&self, key: &StateKey) -> Result<Option<StateValue>> {
        let state = self
            .state
            .lock()
            .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
        state
            .values
            .get(key.as_ref())
            .cloned()
            .map(from_stored_entry)
            .transpose()
    }

    async fn set(&self, key: &StateKey, value: StateValue) -> Result<()> {
        {
            let mut state = self
                .state
                .lock()
                .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
            state
                .values
                .insert(key.as_ref().to_owned(), to_stored_entry(&value));
        }
        self.flush()
    }

    async fn delete(&self, key: &StateKey) -> Result<()> {
        {
            let mut state = self
                .state
                .lock()
                .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
            state.values.remove(key.as_ref());
        }
        self.flush()
    }

    async fn list_prefix(&self, prefix: &str) -> Result<Vec<StateKey>> {
        let state = self
            .state
            .lock()
            .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
        let mut keys: Vec<StateKey> = state
            .values
            .keys()
            .filter(|k| k.starts_with(prefix))
            .map(|k| StateKey::from_raw(k.clone()))
            .collect();
        keys.sort_by(|a, b| a.as_ref().cmp(b.as_ref()));
        Ok(keys)
    }

    async fn get_metadata(&self, key: &StateKey) -> Result<Option<KeyMetadata>> {
        let state = self
            .state
            .lock()
            .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
        state
            .metadata
            .get(key.as_ref())
            .cloned()
            .map(from_stored_metadata)
            .transpose()
    }

    async fn set_metadata(&self, key: &StateKey, meta: KeyMetadata) -> Result<()> {
        {
            let mut state = self
                .state
                .lock()
                .map_err(|_| XenithError::StoreError("lock poisoned".into()))?;
            state
                .metadata
                .insert(key.as_ref().to_owned(), to_stored_metadata(&meta));
        }
        self.flush()
    }
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use xenith_core::StateVersion;

    fn temp_path() -> PathBuf {
        let mut p = std::env::temp_dir();
        p.push(format!(
            "xenith-test-{}.json",
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .subsec_nanos()
        ));
        p
    }

    fn sample_value(ts: u64) -> StateValue {
        StateValue {
            data: Bytes::from(format!("data-{ts}")),
            version: StateVersion {
                timestamp_ms: ts,
                sequence: 0,
                source_chain: 1,
            },
            updated_at: ts / 1000,
            source_chain: ChainId(1),
        }
    }

    #[tokio::test]
    async fn test_json_store_persists_across_instances() {
        let path = temp_path();
        let key = StateKey::new("proto", "pool", "0xabc");
        let value = sample_value(1_700_000_000_000);

        {
            let store = JsonFileStore::new(&path).unwrap();
            store.set(&key, value.clone()).await.unwrap();
        }

        // Reload from disk.
        let store2 = JsonFileStore::new(&path).unwrap();
        let loaded = store2.get(&key).await.unwrap().expect("value must persist");
        assert_eq!(loaded.data, value.data);
        assert_eq!(loaded.version.timestamp_ms, value.version.timestamp_ms);
        assert_eq!(loaded.source_chain, value.source_chain);

        let _ = std::fs::remove_file(&path);
    }

    #[tokio::test]
    async fn test_json_store_atomic_write() {
        let path = temp_path();
        let store = JsonFileStore::new(&path).unwrap();

        for i in 0u64..5 {
            let key = StateKey::new("proto", "pool", &format!("0x{i:04x}"));
            store.set(&key, sample_value(i * 1000)).await.unwrap();

            // After every write the file must be valid JSON.
            let content = std::fs::read_to_string(&path).unwrap();
            assert!(
                serde_json::from_str::<serde_json::Value>(&content).is_ok(),
                "file must be valid JSON after write {i}"
            );
        }

        let _ = std::fs::remove_file(&path);
    }

    #[tokio::test]
    async fn test_json_store_metadata_roundtrip() {
        let path = temp_path();
        let key = StateKey::new("proto", "pool", "0xabc");
        let meta = KeyMetadata {
            address: Some([0xABu8; 20]),
            slot: Some([0xCDu8; 32]),
        };

        {
            let store = JsonFileStore::new(&path).unwrap();
            store.set(&key, sample_value(1)).await.unwrap();
            store.set_metadata(&key, meta.clone()).await.unwrap();
        }

        let store2 = JsonFileStore::new(&path).unwrap();
        let loaded = store2
            .get_metadata(&key)
            .await
            .unwrap()
            .expect("metadata must persist");
        assert_eq!(loaded.address, Some([0xABu8; 20]));
        assert_eq!(loaded.slot, Some([0xCDu8; 32]));

        let _ = std::fs::remove_file(&path);
    }
}