use std::collections::HashMap;
use crate::chain::{
parse_block_number, ChainExtra, ChainId, ChainState, ChainStatus, REQ_ID_PARA_DB_SAVE,
REQ_ID_RELAY_DB_SAVE,
};
use crate::store::ChainStore;
const INITIAL_HEALTH_ID: u64 = 1000;
struct ChainEntry {
status: ChainStatus,
relay_db_key: String,
para_db_key: String,
chain_specs: Option<(String, String)>,
health_id: u64,
}
pub struct ChainStateMachine<S: ChainStore> {
chains: HashMap<ChainId, ChainEntry>,
store: S,
}
impl<S: ChainStore> ChainStateMachine<S> {
pub fn new(store: S) -> Self {
Self {
chains: HashMap::new(),
store,
}
}
pub fn register_chain(&mut self, chain: ChainId) {
self.chains.insert(
chain,
ChainEntry {
status: ChainStatus {
id: chain,
name: chain.display_name(),
state: ChainState::Connecting,
extra: ChainExtra::None,
},
relay_db_key: chain.relay_db_key().to_string(),
para_db_key: chain.para_db_key(),
chain_specs: chain
.chain_specs()
.map(|(r, p)| (r.to_string(), p.to_string())),
health_id: INITIAL_HEALTH_ID,
},
);
}
pub fn register_chain_entry(&mut self, entry: &crate::registry::ChainRegistryEntry) {
let chain = entry.id;
self.chains.insert(
chain,
ChainEntry {
status: ChainStatus {
id: chain,
name: chain.display_name(),
state: ChainState::Connecting,
extra: ChainExtra::None,
},
relay_db_key: entry.relay_db_key.clone(),
para_db_key: entry.para_db_key.clone(),
chain_specs: entry.chain_specs.clone(),
health_id: INITIAL_HEALTH_ID,
},
);
}
pub fn unregister_chain(&mut self, chain: ChainId) {
if let Some(entry) = self.chains.get_mut(&chain) {
entry.status.state = ChainState::Disconnected;
}
}
pub fn set_state(&mut self, chain: ChainId, state: ChainState) {
if let Some(entry) = self.chains.get_mut(&chain) {
entry.status.state = state;
}
}
pub fn set_state_with_extra(&mut self, chain: ChainId, state: ChainState, extra: ChainExtra) {
if let Some(entry) = self.chains.get_mut(&chain) {
entry.status.state = state;
entry.status.extra = extra;
}
}
pub fn store(&self) -> &S {
&self.store
}
pub fn process_response(&mut self, chain: ChainId, text: &str) {
let v: serde_json::Value = match serde_json::from_str(text) {
Ok(v) => v,
Err(_) => return,
};
let entry = match self.chains.get_mut(&chain) {
Some(e) => e,
None => return,
};
if let Some(id) = v.get("id").and_then(|i| i.as_u64()) {
if id == REQ_ID_PARA_DB_SAVE {
if let Some(db) = v.get("result").and_then(|r| r.as_str()) {
self.store.save(&entry.para_db_key, db);
log::info!("{chain:?}: saved para DB ({} bytes)", db.len());
} else if let Some(err) = v.get("error") {
log::warn!("{chain:?}: para DB save returned error: {err}");
}
return;
}
}
if let Some(result) = v.get("result") {
if let (Some(peers), Some(is_syncing)) = (
result.get("peers").and_then(|p| p.as_u64()),
result.get("isSyncing").and_then(|s| s.as_bool()),
) {
let current_block = match &entry.status.state {
ChainState::Live { best_block, .. }
| ChainState::Syncing { best_block, .. } => *best_block,
_ => 0,
};
entry.status.state = if is_syncing {
ChainState::Syncing {
best_block: current_block,
peers: peers as u32,
}
} else {
ChainState::Live {
best_block: current_block,
peers: peers as u32,
}
};
return;
}
}
if let Some(block) = parse_block_number(text) {
let (current_peers, current_syncing) = match &entry.status.state {
ChainState::Live { peers, .. } => (*peers, false),
ChainState::Syncing { peers, .. } => (*peers, true),
ChainState::Connecting => (0, true),
_ => (0, false),
};
entry.status.state = if current_syncing && current_peers > 0 {
ChainState::Syncing {
best_block: block,
peers: current_peers,
}
} else {
ChainState::Live {
best_block: block,
peers: current_peers,
}
};
}
}
pub fn process_relay_response(&mut self, chain: ChainId, text: &str) {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
if v.get("id").and_then(|i| i.as_u64()) == Some(REQ_ID_RELAY_DB_SAVE) {
if let Some(db) = v.get("result").and_then(|r| r.as_str()) {
let key = self
.chains
.get(&chain)
.map(|e| e.relay_db_key.as_str())
.unwrap_or_else(|| chain.relay_db_key());
self.store.save(key, db);
log::info!("{chain:?}: saved relay DB ({} bytes)", db.len());
} else if let Some(err) = v.get("error") {
log::warn!("{chain:?}: relay DB save returned error: {err}");
}
}
}
}
pub fn set_error(&mut self, chain: ChainId, msg: String) {
if let Some(entry) = self.chains.get_mut(&chain) {
entry.status.state = ChainState::Error(msg);
}
}
pub fn status(&self, chain: ChainId) -> ChainStatus {
self.chains
.get(&chain)
.map(|e| e.status.clone())
.unwrap_or_else(|| ChainStatus::disconnected(chain))
}
pub fn all_statuses(&self) -> Vec<ChainStatus> {
ChainId::all().iter().map(|&id| self.status(id)).collect()
}
pub fn subscribe_new_heads_request() -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "chain_subscribeNewHeads",
"params": []
})
.to_string()
}
pub fn health_check_request(&mut self, chain: ChainId) -> Option<String> {
let entry = self.chains.get_mut(&chain)?;
entry.health_id += 1;
Some(
serde_json::json!({
"jsonrpc": "2.0",
"id": entry.health_id,
"method": "system_health",
"params": []
})
.to_string(),
)
}
pub fn para_db_save_request() -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": REQ_ID_PARA_DB_SAVE,
"method": "chainHead_unstable_finalizedDatabase",
"params": []
})
.to_string()
}
pub fn relay_db_save_request() -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": REQ_ID_RELAY_DB_SAVE,
"method": "chainHead_unstable_finalizedDatabase",
"params": []
})
.to_string()
}
pub fn load_relay_db(&self, chain: ChainId) -> String {
match self.chains.get(&chain) {
Some(e) => self.store.load(&e.relay_db_key),
None => self.store.load(chain.relay_db_key()),
}
}
pub fn load_para_db(&self, chain: ChainId) -> String {
match self.chains.get(&chain) {
Some(e) => self.store.load(&e.para_db_key),
None => self.store.load(&chain.para_db_key()),
}
}
pub fn chain_specs(chain: ChainId) -> Option<(&'static str, &'static str)> {
chain.chain_specs()
}
pub fn chain_specs_owned(&self, chain: ChainId) -> Option<(String, String)> {
self.chains.get(&chain).and_then(|e| e.chain_specs.clone())
}
pub fn clear_chain_dbs(&self, chain: ChainId) {
let relay_key = self
.chains
.get(&chain)
.map(|e| e.relay_db_key.clone())
.unwrap_or_else(|| chain.relay_db_key().to_string());
let para_key = self
.chains
.get(&chain)
.map(|e| e.para_db_key.clone())
.unwrap_or_else(|| chain.para_db_key());
self.store.save(&relay_key, "");
self.store.save(¶_key, "");
}
pub fn statement_submit_request(encoded_hex: &str, request_id: u64) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "statement_submit",
"params": [encoded_hex]
})
.to_string()
}
pub fn statement_subscribe_request(request_id: u64) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "statement_subscribeStatement",
"params": ["any"]
})
.to_string()
}
pub fn statement_unsubscribe_request(sub_id: &str, request_id: u64) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "statement_unsubscribeStatement",
"params": [sub_id]
})
.to_string()
}
pub fn statement_broadcasts_request(topic_hexes: &[String], request_id: u64) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "statement_broadcastsStatement",
"params": [topic_hexes]
})
.to_string()
}
pub fn parse_statement_notification(text: &str) -> Vec<String> {
let v: serde_json::Value = match serde_json::from_str(text) {
Ok(v) => v,
Err(_) => return Vec::new(),
};
if v.get("method").and_then(|m| m.as_str()) != Some("statement_subscribeStatement") {
return Vec::new();
}
let result = match v.pointer("/params/result") {
Some(r) => r,
None => return Vec::new(),
};
let stmts = result
.pointer("/data/statements")
.or_else(|| result.pointer("/newStatements/statements"))
.or_else(|| result.get("statements"));
match stmts.and_then(|s| s.as_array()) {
Some(arr) => arr
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect(),
None => Vec::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
struct InMemoryStore {
data: RefCell<HashMap<String, String>>,
}
impl InMemoryStore {
fn new() -> Self {
Self {
data: RefCell::new(HashMap::new()),
}
}
}
impl ChainStore for InMemoryStore {
fn load(&self, key: &str) -> String {
self.data.borrow().get(key).cloned().unwrap_or_default()
}
fn save(&self, key: &str, data: &str) {
self.data
.borrow_mut()
.insert(key.to_string(), data.to_string());
}
}
fn make_sm() -> ChainStateMachine<InMemoryStore> {
let store = InMemoryStore::new();
let mut sm = ChainStateMachine::new(store);
sm.register_chain(ChainId::PaseoAssetHub);
sm
}
#[test]
fn register_sets_connecting() {
let sm = make_sm();
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(status.state, ChainState::Connecting));
}
#[test]
fn unregister_sets_disconnected() {
let mut sm = make_sm();
sm.unregister_chain(ChainId::PaseoAssetHub);
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(status.state, ChainState::Disconnected));
}
#[test]
fn unregistered_chain_returns_disconnected() {
let sm = make_sm();
let status = sm.status(ChainId::Ethereum);
assert!(matches!(status.state, ChainState::Disconnected));
}
#[test]
fn health_response_sets_live() {
let mut sm = make_sm();
let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
sm.process_response(ChainId::PaseoAssetHub, resp);
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(status.state, ChainState::Live { peers: 5, .. }));
}
#[test]
fn health_response_sets_syncing() {
let mut sm = make_sm();
let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":3,"isSyncing":true}}"#;
sm.process_response(ChainId::PaseoAssetHub, resp);
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(status.state, ChainState::Syncing { peers: 3, .. }));
}
#[test]
fn new_head_updates_block_number() {
let mut sm = make_sm();
let health = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
sm.process_response(ChainId::PaseoAssetHub, health);
let head =
r#"{"jsonrpc":"2.0","method":"chain_newHead","params":{"result":{"number":"0x1a4"}}}"#;
sm.process_response(ChainId::PaseoAssetHub, head);
let status = sm.status(ChainId::PaseoAssetHub);
match status.state {
ChainState::Live {
best_block, peers, ..
} => {
assert_eq!(best_block, 0x1a4);
assert_eq!(peers, 5);
}
other => panic!("expected Live, got {other:?}"),
}
}
#[test]
fn para_db_save_stores_to_store() {
let mut sm = make_sm();
let resp = format!(
r#"{{"jsonrpc":"2.0","id":{},"result":"saved-db-content"}}"#,
REQ_ID_PARA_DB_SAVE,
);
sm.process_response(ChainId::PaseoAssetHub, &resp);
assert_eq!(sm.store().load("PaseoAssetHub"), "saved-db-content");
}
#[test]
fn relay_db_save_stores_to_store() {
let mut sm = make_sm();
let resp = format!(
r#"{{"jsonrpc":"2.0","id":{},"result":"relay-db-content"}}"#,
REQ_ID_RELAY_DB_SAVE,
);
sm.process_relay_response(ChainId::PaseoAssetHub, &resp);
assert_eq!(
sm.store().load(ChainId::PaseoAssetHub.relay_db_key()),
"relay-db-content"
);
}
#[test]
fn set_state_preserves_extra() {
let mut sm = make_sm();
sm.set_state_with_extra(
ChainId::PaseoAssetHub,
ChainState::Live {
best_block: 100,
peers: 5,
},
ChainExtra::Eth {
finalized_block: 50,
gas_price_gwei: 20,
},
);
sm.set_state(
ChainId::PaseoAssetHub,
ChainState::Live {
best_block: 200,
peers: 3,
},
);
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(
status.extra,
ChainExtra::Eth {
finalized_block: 50,
gas_price_gwei: 20
}
));
assert!(matches!(
status.state,
ChainState::Live {
best_block: 200,
peers: 3
}
));
}
#[test]
fn set_state_with_extra_updates_both() {
let mut sm = make_sm();
sm.set_state_with_extra(
ChainId::PaseoAssetHub,
ChainState::Live {
best_block: 100,
peers: 5,
},
ChainExtra::Btc {
tip_height: 800000,
fee_rate_sat_vb: 10,
},
);
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(
status.state,
ChainState::Live {
best_block: 100,
peers: 5
}
));
assert!(matches!(
status.extra,
ChainExtra::Btc {
tip_height: 800000,
fee_rate_sat_vb: 10
}
));
}
#[test]
fn process_response_unregistered_chain_is_noop() {
let mut sm = make_sm();
let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
sm.process_response(ChainId::Ethereum, resp);
let status = sm.status(ChainId::Ethereum);
assert!(matches!(status.state, ChainState::Disconnected));
}
#[test]
fn health_check_request_unregistered_returns_none() {
let mut sm = make_sm();
assert!(sm.health_check_request(ChainId::Ethereum).is_none());
}
#[test]
fn health_check_request_id_starts_above_1000_and_increments() {
let mut sm = make_sm();
let req1 = sm.health_check_request(ChainId::PaseoAssetHub).unwrap();
let req2 = sm.health_check_request(ChainId::PaseoAssetHub).unwrap();
let v1: serde_json::Value = serde_json::from_str(&req1).unwrap();
let v2: serde_json::Value = serde_json::from_str(&req2).unwrap();
let id1 = v1["id"].as_u64().unwrap();
let id2 = v2["id"].as_u64().unwrap();
assert!(id1 > 1000);
assert_eq!(id2, id1 + 1);
}
#[test]
fn all_statuses_includes_registered_chains() {
let sm = make_sm();
let statuses = sm.all_statuses();
let paseo = statuses
.iter()
.find(|s| s.id == ChainId::PaseoAssetHub)
.expect("PaseoAssetHub should be in all_statuses");
assert!(matches!(paseo.state, ChainState::Connecting));
}
#[test]
fn all_statuses_returns_disconnected_for_unregistered() {
let sm = make_sm();
let statuses = sm.all_statuses();
let eth = statuses
.iter()
.find(|s| s.id == ChainId::Ethereum)
.expect("Ethereum should be in all_statuses (Disconnected)");
assert!(matches!(eth.state, ChainState::Disconnected));
}
#[test]
fn parse_statement_notification_data_statements_path() {
let text = r#"{"jsonrpc":"2.0","method":"statement_subscribeStatement","params":{"result":{"data":{"statements":["0xab","0xcd"]}}}}"#;
let stmts = ChainStateMachine::<InMemoryStore>::parse_statement_notification(text);
assert_eq!(stmts, vec!["0xab", "0xcd"]);
}
#[test]
fn parse_statement_notification_new_statements_path() {
let text = r#"{"jsonrpc":"2.0","method":"statement_subscribeStatement","params":{"result":{"newStatements":{"statements":["0xef"]}}}}"#;
let stmts = ChainStateMachine::<InMemoryStore>::parse_statement_notification(text);
assert_eq!(stmts, vec!["0xef"]);
}
#[test]
fn parse_statement_notification_plain_statements_path() {
let text = r#"{"jsonrpc":"2.0","method":"statement_subscribeStatement","params":{"result":{"statements":["0x11","0x22","0x33"]}}}"#;
let stmts = ChainStateMachine::<InMemoryStore>::parse_statement_notification(text);
assert_eq!(stmts, vec!["0x11", "0x22", "0x33"]);
}
#[test]
fn parse_statement_notification_wrong_method_returns_empty() {
let text = r#"{"jsonrpc":"2.0","method":"chain_newHead","params":{"result":{"statements":["0x11"]}}}"#;
let stmts = ChainStateMachine::<InMemoryStore>::parse_statement_notification(text);
assert!(stmts.is_empty());
}
#[test]
fn parse_statement_notification_no_statements_returns_empty() {
let text =
r#"{"jsonrpc":"2.0","method":"statement_subscribeStatement","params":{"result":{}}}"#;
let stmts = ChainStateMachine::<InMemoryStore>::parse_statement_notification(text);
assert!(stmts.is_empty());
}
#[test]
fn para_db_save_error_does_not_change_state() {
let mut sm = make_sm();
let health = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
sm.process_response(ChainId::PaseoAssetHub, health);
let error_resp = format!(
r#"{{"jsonrpc":"2.0","id":{},"error":{{"code":-32000,"message":"db error"}}}}"#,
REQ_ID_PARA_DB_SAVE,
);
sm.process_response(ChainId::PaseoAssetHub, &error_resp);
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(status.state, ChainState::Live { peers: 5, .. }));
assert_eq!(sm.store().load("PaseoAssetHub"), "");
}
#[test]
fn register_chain_entry_sets_connecting() {
use crate::chain::ConnectionBackend;
use crate::registry::ChainRegistryEntry;
let store = InMemoryStore::new();
let mut sm = ChainStateMachine::new(store);
sm.register_chain_entry(&ChainRegistryEntry {
id: ChainId::PaseoAssetHub,
genesis_hash: [0u8; 32],
display_name: "Paseo Asset Hub".to_string(),
endpoint: String::new(),
backend: ConnectionBackend::Smoldot,
relay_db_key: "custom-relay".to_string(),
para_db_key: "custom-para".to_string(),
chain_specs: None,
});
let status = sm.status(ChainId::PaseoAssetHub);
assert!(matches!(status.state, ChainState::Connecting));
}
#[test]
fn register_chain_entry_uses_entry_db_keys() {
use crate::chain::ConnectionBackend;
use crate::registry::ChainRegistryEntry;
let store = InMemoryStore::new();
let mut sm = ChainStateMachine::new(store);
sm.register_chain_entry(&ChainRegistryEntry {
id: ChainId::PaseoAssetHub,
genesis_hash: [0u8; 32],
display_name: "Paseo Asset Hub".to_string(),
endpoint: String::new(),
backend: ConnectionBackend::Smoldot,
relay_db_key: "custom-relay".to_string(),
para_db_key: "custom-para".to_string(),
chain_specs: None,
});
let resp = format!(
r#"{{"jsonrpc":"2.0","id":{},"result":"relay-data"}}"#,
REQ_ID_RELAY_DB_SAVE,
);
sm.process_relay_response(ChainId::PaseoAssetHub, &resp);
assert_eq!(sm.store().load("custom-relay"), "relay-data");
let resp2 = format!(
r#"{{"jsonrpc":"2.0","id":{},"result":"para-data"}}"#,
REQ_ID_PARA_DB_SAVE,
);
sm.process_response(ChainId::PaseoAssetHub, &resp2);
assert_eq!(sm.store().load("custom-para"), "para-data");
}
#[test]
fn chain_specs_owned_returns_entry_specs() {
use crate::chain::ConnectionBackend;
use crate::registry::ChainRegistryEntry;
let store = InMemoryStore::new();
let mut sm = ChainStateMachine::new(store);
sm.register_chain_entry(&ChainRegistryEntry {
id: ChainId::PaseoAssetHub,
genesis_hash: [0u8; 32],
display_name: "Paseo Asset Hub".to_string(),
endpoint: String::new(),
backend: ConnectionBackend::Rpc,
relay_db_key: "r".to_string(),
para_db_key: "p".to_string(),
chain_specs: Some(("relay-spec".to_string(), "para-spec".to_string())),
});
let specs = sm.chain_specs_owned(ChainId::PaseoAssetHub);
assert_eq!(
specs,
Some(("relay-spec".to_string(), "para-spec".to_string()))
);
assert!(sm.chain_specs_owned(ChainId::Ethereum).is_none());
}
#[test]
fn register_chain_entry_backward_compat_with_register_chain() {
use crate::chain::ConnectionBackend;
use crate::registry::ChainRegistryEntry;
let store1 = InMemoryStore::new();
let mut sm1 = ChainStateMachine::new(store1);
sm1.register_chain(ChainId::PaseoAssetHub);
let store2 = InMemoryStore::new();
let mut sm2 = ChainStateMachine::new(store2);
sm2.register_chain_entry(&ChainRegistryEntry {
id: ChainId::PaseoAssetHub,
genesis_hash: [0u8; 32],
display_name: ChainId::PaseoAssetHub.display_name().to_string(),
endpoint: ChainId::PaseoAssetHub.endpoint().to_string(),
backend: ConnectionBackend::Smoldot,
relay_db_key: ChainId::PaseoAssetHub.relay_db_key().to_string(),
para_db_key: ChainId::PaseoAssetHub.para_db_key(),
chain_specs: ChainId::PaseoAssetHub
.chain_specs()
.map(|(r, p)| (r.to_string(), p.to_string())),
});
let s1 = sm1.status(ChainId::PaseoAssetHub);
let s2 = sm2.status(ChainId::PaseoAssetHub);
assert_eq!(s1.name, s2.name);
assert!(matches!(s1.state, ChainState::Connecting));
assert!(matches!(s2.state, ChainState::Connecting));
let resp = format!(
r#"{{"jsonrpc":"2.0","id":{},"result":"db-content"}}"#,
REQ_ID_PARA_DB_SAVE,
);
sm1.process_response(ChainId::PaseoAssetHub, &resp);
sm2.process_response(ChainId::PaseoAssetHub, &resp);
assert_eq!(
sm1.store().load(&ChainId::PaseoAssetHub.para_db_key()),
sm2.store().load(&ChainId::PaseoAssetHub.para_db_key()),
);
}
}