use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use backon::{ExponentialBuilder, Retryable};
use bytes::Bytes;
use xenith_core::{
wire, ChainId, ConflictResolver, KeyMetadata, MessageId, MessagingTransport, ReadStrategy,
Result, SendOptions, StateKey, StateStore, StateValue, StateVersion, SyncStatus, SyncedState,
XenithError,
};
use xenith_read::MultiChainReader;
use crate::subscription::SubscriptionHandle;
pub struct SyncConfig {
pub retry_attempts: u32,
pub retry_delay_ms: u64,
pub default_strategy: ReadStrategy,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
retry_attempts: 3,
retry_delay_ms: 500,
default_strategy: ReadStrategy::Latest,
}
}
}
#[derive(Clone, Debug)]
pub struct SyncReceipt {
pub key: StateKey,
pub successes: Vec<(ChainId, MessageId)>,
pub failures: Vec<(ChainId, XenithError)>,
pub status: SyncStatus,
pub store_written: bool,
}
pub struct SyncEngine {
pub transport: Arc<dyn MessagingTransport>,
pub store: Arc<dyn StateStore>,
pub config: SyncConfig,
reader: Option<Arc<MultiChainReader>>,
}
impl SyncEngine {
pub fn new(
transport: Arc<dyn MessagingTransport>,
store: Arc<dyn StateStore>,
config: SyncConfig,
) -> Self {
Self {
transport,
store,
config,
reader: None,
}
}
pub fn new_with_reader(
transport: Arc<dyn MessagingTransport>,
store: Arc<dyn StateStore>,
config: SyncConfig,
reader: MultiChainReader,
) -> Self {
Self {
transport,
store,
config,
reader: Some(Arc::new(reader)),
}
}
pub async fn push(
&self,
key: StateKey,
value: Bytes,
targets: Vec<ChainId>,
source: ChainId,
metadata: Option<KeyMetadata>,
) -> Result<SyncReceipt> {
let ts_ms = unix_now_ms()?;
let state_value = StateValue {
data: value.clone(),
version: StateVersion {
timestamp_ms: ts_ms,
sequence: 0,
source_chain: source.0,
},
updated_at: ts_ms / 1_000,
source_chain: source,
};
let payload = wire::encode(&key, &state_value, metadata.as_ref());
let mut successes: Vec<(ChainId, MessageId)> = Vec::with_capacity(targets.len());
let mut failures: Vec<(ChainId, XenithError)> = Vec::new();
for chain in &targets {
match send_with_retry(
Arc::clone(&self.transport),
*chain,
payload.clone(),
SendOptions::default(),
self.config.retry_attempts,
self.config.retry_delay_ms,
)
.await
{
Ok(id) => successes.push((*chain, id)),
Err(e) => failures.push((*chain, e)),
}
}
let store_written = targets.is_empty() || !successes.is_empty();
if store_written {
self.store.set(&key, state_value.clone()).await?;
if let Some(ref m) = metadata {
self.store.set_metadata(&key, m.clone()).await?;
}
}
let status = if failures.is_empty() {
successes
.first()
.map(|&(_, id)| SyncStatus::Pending { message_id: id })
.unwrap_or(SyncStatus::Synced)
} else {
SyncStatus::PartialFailure {
succeeded: successes.iter().map(|&(c, _)| c).collect(),
failed: failures.iter().map(|&(c, _)| c).collect(),
}
};
Ok(SyncReceipt {
key,
successes,
failures,
status,
store_written,
})
}
pub async fn read(&self, key: StateKey, strategy: ReadStrategy) -> Result<SyncedState> {
let value = self
.store
.get(&key)
.await?
.ok_or_else(|| XenithError::StoreError("key not found".into()))?;
let chains = vec![value.source_chain];
match strategy {
ReadStrategy::SourceOfTruth(chain) => {
let status = if value.source_chain == chain {
SyncStatus::Synced
} else {
SyncStatus::Diverged {
chains: vec![(value.source_chain, value.clone())],
}
};
Ok(SyncedState {
key,
value,
chains,
status,
})
}
ReadStrategy::Latest => Ok(SyncedState {
key,
value,
chains,
status: SyncStatus::Synced,
}),
ReadStrategy::Quorum(n) => {
let reader = self.reader.as_ref().ok_or_else(|| {
XenithError::StoreError(
"Quorum strategy requires a MultiChainReader — \
use SyncEngine::new_with_reader"
.into(),
)
})?;
let meta = self.store.get_metadata(&key).await?.ok_or_else(|| {
XenithError::StoreError(
"Quorum read requires KeyMetadata (address + slot) — \
call store.set_metadata before pushing"
.into(),
)
})?;
let address = meta
.address
.ok_or_else(|| XenithError::StoreError("KeyMetadata.address is None".into()))?;
let slot = meta
.slot
.ok_or_else(|| XenithError::StoreError("KeyMetadata.slot is None".into()))?;
let target_chains: Vec<ChainId> = reader.providers.keys().copied().collect();
let all_chains = target_chains.clone();
let readings = reader.read_parallel(target_chains, address, slot).await?;
let mut counts: HashMap<[u8; 32], usize> = HashMap::new();
for (_, raw) in &readings {
*counts.entry(*raw).or_insert(0) += 1;
}
let max_count = counts.values().copied().max().unwrap_or(0);
let status = if max_count >= n {
SyncStatus::Synced
} else {
let diverged: Vec<(ChainId, StateValue)> = readings
.iter()
.map(|(chain, raw)| {
(
*chain,
StateValue {
data: Bytes::copy_from_slice(raw.as_ref()),
version: value.version,
updated_at: value.updated_at,
source_chain: *chain,
},
)
})
.collect();
SyncStatus::Diverged { chains: diverged }
};
Ok(SyncedState {
key,
value,
chains: all_chains,
status,
})
}
ReadStrategy::Custom(f) => {
let resolved = f(vec![(value.source_chain, value)]);
let resolved_chain = resolved.source_chain;
Ok(SyncedState {
key,
chains: vec![resolved_chain],
value: resolved,
status: SyncStatus::Synced,
})
}
}
}
pub async fn resolve(
&self,
key: StateKey,
resolver: &dyn ConflictResolver,
) -> Result<StateValue> {
let value = self
.store
.get(&key)
.await?
.ok_or_else(|| XenithError::StoreError("key not found".into()))?;
let candidates = vec![(value.source_chain, value)];
let resolved = resolver.resolve(&key, candidates).await?;
self.store.set(&key, resolved.clone()).await?;
Ok(resolved)
}
pub async fn subscribe<F, Fut>(
&self,
key: StateKey,
source: ChainId,
poll_interval_ms: u64,
handler: F,
) -> Result<SubscriptionHandle>
where
F: Fn(StateValue) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send,
{
let store = Arc::clone(&self.store);
let transport = Arc::clone(&self.transport);
let key_clone = key.clone();
let interval = tokio::time::Duration::from_millis(poll_interval_ms);
let join_handle = tokio::spawn(async move {
let mut last_seen: Option<StateVersion> = None;
loop {
if let Ok(messages) = transport.poll_incoming().await {
for (incoming_key, incoming_value, incoming_metadata) in messages {
if incoming_key == key_clone && incoming_value.source_chain == source {
let _ = store.set(&key_clone, incoming_value.clone()).await;
if let Some(ref m) = incoming_metadata {
let _ = store.set_metadata(&key_clone, m.clone()).await;
}
last_seen = Some(incoming_value.version);
handler(incoming_value).await;
}
}
}
if let Ok(Some(value)) = store.get(&key_clone).await {
let is_newer = last_seen.map(|seen| value.version > seen).unwrap_or(true);
if is_newer {
last_seen = Some(value.version);
handler(value).await;
}
}
tokio::time::sleep(interval).await;
}
});
Ok(SubscriptionHandle::new(
key,
source,
join_handle.abort_handle(),
))
}
pub async fn unsubscribe(&self, handle: SubscriptionHandle) {
handle.cancel();
}
}
async fn send_with_retry(
transport: Arc<dyn MessagingTransport>,
chain: ChainId,
payload: Bytes,
options: SendOptions,
attempts: u32,
delay_ms: u64,
) -> Result<MessageId> {
let backoff = ExponentialBuilder::default()
.with_max_times(attempts as usize)
.with_min_delay(Duration::from_millis(delay_ms));
(|| {
let t = Arc::clone(&transport);
let p = payload.clone();
let o = options.clone();
async move { t.send_message(chain, p, o).await }
})
.retry(&backoff)
.when(|e| matches!(e, XenithError::Transport { .. }))
.notify(|e, dur| {
eprintln!(
"xenith: transient error on chain {}: {e}; retrying in {dur:?}",
chain.0
);
})
.await
}
fn unix_now_ms() -> Result<u64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.map_err(|_| XenithError::StoreError("system clock is before the Unix epoch".into()))
}
#[cfg(test)]
mod tests {
use super::*;
use xenith_core::{InMemoryStore, LatestVersionResolver, ReadStrategy};
use xenith_layerzero::LayerZeroTransport;
fn make_engine(chains: &[(u64, u32)]) -> SyncEngine {
let transport = Arc::new(LayerZeroTransport::new(
[0u8; 20],
chains
.iter()
.map(|&(c, eid)| (ChainId::from(c), eid))
.collect(),
));
let store = Arc::new(InMemoryStore::default());
SyncEngine::new(transport, store, SyncConfig::default())
}
#[tokio::test]
async fn push_then_read_source_of_truth() {
let engine = make_engine(&[(42161, 30110)]);
let key = StateKey::new("uniswap", "pool", "0xabc");
let receipt = engine
.push(
key.clone(),
Bytes::from_static(b"price=100"),
vec![ChainId(42161)],
ChainId(1),
None,
)
.await
.unwrap();
assert_eq!(receipt.successes.len(), 1);
assert!(receipt.failures.is_empty());
assert!(matches!(receipt.status, SyncStatus::Pending { .. }));
let state = engine
.read(key, ReadStrategy::SourceOfTruth(ChainId(1)))
.await
.unwrap();
assert!(matches!(state.status, SyncStatus::Synced));
assert_eq!(state.value.data, Bytes::from_static(b"price=100"));
assert_eq!(state.value.source_chain, ChainId(1));
}
#[tokio::test]
async fn push_then_read_wrong_source_of_truth_is_diverged() {
let engine = make_engine(&[(42161, 30110)]);
let key = StateKey::new("uniswap", "pool", "0xdef");
engine
.push(
key.clone(),
Bytes::from_static(b"v"),
vec![ChainId(42161)],
ChainId(1),
None,
)
.await
.unwrap();
let state = engine
.read(key, ReadStrategy::SourceOfTruth(ChainId(42161)))
.await
.unwrap();
assert!(matches!(state.status, SyncStatus::Diverged { .. }));
}
#[tokio::test]
async fn push_then_resolve_latest_version() {
let engine = make_engine(&[(42161, 30110)]);
let key = StateKey::new("aave", "reserve", "0x1");
engine
.push(
key.clone(),
Bytes::from_static(b"ltv=0.8"),
vec![ChainId(42161)],
ChainId(1),
None,
)
.await
.unwrap();
let resolved = engine
.resolve(key.clone(), &LatestVersionResolver)
.await
.unwrap();
assert_eq!(resolved.data, Bytes::from_static(b"ltv=0.8"));
let state = engine.read(key, ReadStrategy::Latest).await.unwrap();
assert_eq!(state.value.data, Bytes::from_static(b"ltv=0.8"));
}
#[tokio::test]
async fn push_no_targets_yields_synced_receipt() {
let engine = make_engine(&[]);
let key = StateKey::new("proto", "x", "1");
let receipt = engine
.push(key, Bytes::from_static(b"d"), vec![], ChainId(1), None)
.await
.unwrap();
assert!(receipt.successes.is_empty());
assert!(receipt.failures.is_empty());
assert!(matches!(receipt.status, SyncStatus::Synced));
}
#[tokio::test]
async fn read_missing_key_returns_error() {
let engine = make_engine(&[]);
let err = engine
.read(StateKey::new("x", "y", "z"), ReadStrategy::Latest)
.await
.unwrap_err();
assert!(matches!(err, XenithError::StoreError(_)));
}
#[tokio::test]
async fn push_to_unsupported_chain_is_partial_failure() {
let engine = make_engine(&[]); let receipt = engine
.push(
StateKey::new("p", "q", "r"),
Bytes::from_static(b"x"),
vec![ChainId(42161)],
ChainId(1),
None,
)
.await
.unwrap();
assert_eq!(receipt.failures.len(), 1);
assert!(matches!(
receipt.failures[0].1,
XenithError::UnsupportedChain(_)
));
assert!(matches!(receipt.status, SyncStatus::PartialFailure { .. }));
}
#[tokio::test]
async fn test_subscribe_fires_on_new_value() {
use tokio::sync::mpsc;
let store = Arc::new(InMemoryStore::default());
let transport = Arc::new(LayerZeroTransport::new([0u8; 20], vec![]));
let engine = SyncEngine::new(
transport,
Arc::clone(&store) as Arc<dyn StateStore>,
SyncConfig::default(),
);
let key = StateKey::new("test", "pos", "u1");
let (tx, mut rx) = mpsc::channel::<StateValue>(1);
let handle = engine
.subscribe(key.clone(), ChainId(1), 10, move |value| {
let tx = tx.clone();
async move {
let _ = tx.send(value).await;
}
})
.await
.unwrap();
let new_value = StateValue {
data: Bytes::from_static(b"new_data"),
version: StateVersion {
timestamp_ms: 1_000_000,
sequence: 0,
source_chain: 1,
},
updated_at: 1000,
source_chain: ChainId(1),
};
store.set(&key, new_value.clone()).await.unwrap();
let received = tokio::time::timeout(tokio::time::Duration::from_millis(200), rx.recv())
.await
.expect("timed out waiting for subscription event")
.expect("channel closed");
assert_eq!(received, new_value);
handle.cancel();
}
#[tokio::test]
async fn test_subscribe_fires_on_incoming_message() {
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc;
let key = StateKey::new("proto", "entity", "id1");
let incoming_value = StateValue {
data: Bytes::from_static(b"incoming_data"),
version: StateVersion {
timestamp_ms: 9_000_000,
sequence: 0,
source_chain: 1,
},
updated_at: 9000,
source_chain: ChainId(1),
};
struct MockTransport {
returned: AtomicBool,
message: (StateKey, StateValue, Option<xenith_core::KeyMetadata>),
}
#[async_trait::async_trait]
impl xenith_core::MessagingTransport for MockTransport {
async fn send_message(
&self,
destination: ChainId,
_: Bytes,
_: SendOptions,
) -> xenith_core::Result<MessageId> {
Err(XenithError::UnsupportedChain(destination))
}
async fn estimate_fee(&self, _: ChainId, _: Bytes) -> xenith_core::Result<u128> {
Ok(0)
}
async fn message_status(
&self,
_: MessageId,
) -> xenith_core::Result<xenith_core::MessageStatus> {
Ok(xenith_core::MessageStatus::Delivered)
}
fn sender_address(&self) -> Option<[u8; 20]> {
None
}
async fn poll_incoming(
&self,
) -> xenith_core::Result<
Vec<(
xenith_core::StateKey,
xenith_core::StateValue,
Option<xenith_core::KeyMetadata>,
)>,
> {
if self.returned.swap(true, Ordering::SeqCst) {
Ok(vec![])
} else {
Ok(vec![self.message.clone()])
}
}
}
let (tx, mut rx) = mpsc::channel::<StateValue>(1);
let store = Arc::new(InMemoryStore::default());
let mock_transport = Arc::new(MockTransport {
returned: AtomicBool::new(false),
message: (key.clone(), incoming_value.clone(), None),
});
let engine = SyncEngine::new(
mock_transport as Arc<dyn xenith_core::MessagingTransport>,
Arc::clone(&store) as Arc<dyn StateStore>,
SyncConfig::default(),
);
let handle = engine
.subscribe(key.clone(), ChainId(1), 10, move |value| {
let tx = tx.clone();
async move {
let _ = tx.send(value).await;
}
})
.await
.unwrap();
let received = tokio::time::timeout(tokio::time::Duration::from_millis(200), rx.recv())
.await
.expect("timed out waiting for subscription event from transport")
.expect("channel closed");
assert_eq!(received.data, Bytes::from_static(b"incoming_data"));
assert_eq!(received.source_chain, ChainId(1));
handle.cancel();
}
struct FailNTimesTransport {
fail_count: Arc<std::sync::atomic::AtomicU32>,
fail_times: u32,
}
impl FailNTimesTransport {
fn new(fail_times: u32) -> (Arc<std::sync::atomic::AtomicU32>, Arc<Self>) {
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let t = Arc::new(Self {
fail_count: Arc::clone(&counter),
fail_times,
});
(counter, t)
}
}
#[async_trait::async_trait]
impl xenith_core::MessagingTransport for FailNTimesTransport {
async fn send_message(
&self,
_destination: ChainId,
_payload: Bytes,
_options: SendOptions,
) -> xenith_core::Result<MessageId> {
let prev = self
.fail_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if prev < self.fail_times {
Err(XenithError::Transport {
chain: ChainId(1),
message: "transient".into(),
})
} else {
Ok(MessageId(prev as u64 + 1))
}
}
async fn estimate_fee(
&self,
_destination: ChainId,
_payload: Bytes,
) -> xenith_core::Result<u128> {
Ok(0)
}
async fn message_status(
&self,
_message_id: MessageId,
) -> xenith_core::Result<xenith_core::MessageStatus> {
Ok(xenith_core::MessageStatus::Delivered)
}
fn sender_address(&self) -> Option<[u8; 20]> {
None
}
async fn poll_incoming(
&self,
) -> xenith_core::Result<
Vec<(
xenith_core::StateKey,
xenith_core::StateValue,
Option<xenith_core::KeyMetadata>,
)>,
> {
Ok(vec![])
}
}
#[tokio::test]
async fn push_retries_on_transport_error() {
let (call_count, transport) = FailNTimesTransport::new(2);
let engine = SyncEngine::new(
transport as Arc<dyn xenith_core::MessagingTransport>,
Arc::new(InMemoryStore::default()),
SyncConfig {
retry_attempts: 3,
retry_delay_ms: 1, ..SyncConfig::default()
},
);
let receipt = engine
.push(
StateKey::new("test", "retry", "1"),
Bytes::from_static(b"v"),
vec![ChainId(1)],
ChainId(1),
None,
)
.await
.unwrap();
assert_eq!(
call_count.load(std::sync::atomic::Ordering::SeqCst),
3,
"expected 3 calls (2 failures + 1 success)"
);
assert_eq!(receipt.successes.len(), 1);
assert!(receipt.failures.is_empty());
}
#[tokio::test]
async fn push_does_not_retry_unsupported_chain() {
let (call_count, transport) = FailNTimesTransport::new(u32::MAX);
let _engine = SyncEngine::new(
transport as Arc<dyn xenith_core::MessagingTransport>,
Arc::new(InMemoryStore::default()),
SyncConfig {
retry_attempts: 3,
retry_delay_ms: 1,
..SyncConfig::default()
},
);
struct UnsupportedTransport;
#[async_trait::async_trait]
impl xenith_core::MessagingTransport for UnsupportedTransport {
async fn send_message(
&self,
destination: ChainId,
_payload: Bytes,
_options: SendOptions,
) -> xenith_core::Result<MessageId> {
Err(XenithError::UnsupportedChain(destination))
}
async fn estimate_fee(
&self,
_dst: ChainId,
_payload: Bytes,
) -> xenith_core::Result<u128> {
Ok(0)
}
async fn message_status(
&self,
_id: MessageId,
) -> xenith_core::Result<xenith_core::MessageStatus> {
Ok(xenith_core::MessageStatus::Delivered)
}
fn sender_address(&self) -> Option<[u8; 20]> {
None
}
async fn poll_incoming(
&self,
) -> xenith_core::Result<
Vec<(
xenith_core::StateKey,
xenith_core::StateValue,
Option<xenith_core::KeyMetadata>,
)>,
> {
Ok(vec![])
}
}
let engine2 = SyncEngine::new(
Arc::new(UnsupportedTransport) as Arc<dyn xenith_core::MessagingTransport>,
Arc::new(InMemoryStore::default()),
SyncConfig {
retry_attempts: 3,
retry_delay_ms: 1,
..SyncConfig::default()
},
);
let receipt = engine2
.push(
StateKey::new("test", "no-retry", "1"),
Bytes::from_static(b"v"),
vec![ChainId(99)],
ChainId(1),
None,
)
.await
.unwrap();
assert_eq!(receipt.failures.len(), 1);
assert!(matches!(
receipt.failures[0].1,
XenithError::UnsupportedChain(_)
));
let _ = call_count; }
}