Skip to main content

host_chain_core/
state_machine.rs

1//! Generic chain state machine — shared between WASM and native.
2//!
3//! Tracks chain connection status, processes JSON-RPC responses, and persists
4//! chain databases. The state machine is generic over [`ChainStore`] so the
5//! same logic works with localStorage (WASM) and the filesystem (native).
6
7use std::collections::HashMap;
8
9use crate::chain::{
10    parse_block_number, ChainExtra, ChainId, ChainState, ChainStatus, REQ_ID_PARA_DB_SAVE,
11    REQ_ID_RELAY_DB_SAVE,
12};
13use crate::store::ChainStore;
14
15/// Per-chain state tracked by the state machine.
16struct ChainEntry {
17    status: ChainStatus,
18    para_db_key: String,
19    health_id: u64,
20}
21
22/// Pure state machine for chain connections — no networking, no async.
23///
24/// Processes JSON-RPC responses from smoldot (or smoldot-js on WASM),
25/// tracks chain status, and persists chain databases via a [`ChainStore`].
26pub struct ChainStateMachine<S: ChainStore> {
27    chains: HashMap<ChainId, ChainEntry>,
28    store: S,
29}
30
31impl<S: ChainStore> ChainStateMachine<S> {
32    pub fn new(store: S) -> Self {
33        Self {
34            chains: HashMap::new(),
35            store,
36        }
37    }
38
39    /// Register a chain for tracking. Initial state is `Connecting`.
40    pub fn register_chain(&mut self, chain: ChainId) {
41        let para_db_key = chain.para_db_key();
42        self.chains.insert(
43            chain,
44            ChainEntry {
45                status: ChainStatus {
46                    id: chain,
47                    name: chain.display_name(),
48                    state: ChainState::Connecting,
49                    extra: ChainExtra::None,
50                },
51                para_db_key,
52                health_id: 1000,
53            },
54        );
55    }
56
57    /// Unregister a chain (sets state to Disconnected).
58    pub fn unregister_chain(&mut self, chain: ChainId) {
59        if let Some(entry) = self.chains.get_mut(&chain) {
60            entry.status.state = ChainState::Disconnected;
61        }
62    }
63
64    /// Set the chain state, preserving existing `extra`.
65    pub fn set_state(&mut self, chain: ChainId, state: ChainState) {
66        if let Some(entry) = self.chains.get_mut(&chain) {
67            entry.status.state = state;
68        }
69    }
70
71    /// Set both state and extra data.
72    pub fn set_state_with_extra(&mut self, chain: ChainId, state: ChainState, extra: ChainExtra) {
73        if let Some(entry) = self.chains.get_mut(&chain) {
74            entry.status.state = state;
75            entry.status.extra = extra;
76        }
77    }
78
79    /// Access the underlying store.
80    pub fn store(&self) -> &S {
81        &self.store
82    }
83
84    /// Process a JSON-RPC response from smoldot for a given chain.
85    pub fn process_response(&mut self, chain: ChainId, text: &str) {
86        let v: serde_json::Value = match serde_json::from_str(text) {
87            Ok(v) => v,
88            Err(_) => return,
89        };
90
91        let entry = match self.chains.get_mut(&chain) {
92            Some(e) => e,
93            None => return,
94        };
95
96        // Check for para DB save response.
97        if let Some(id) = v.get("id").and_then(|i| i.as_u64()) {
98            if id == REQ_ID_PARA_DB_SAVE {
99                if let Some(db) = v.get("result").and_then(|r| r.as_str()) {
100                    self.store.save(&entry.para_db_key, db);
101                    log::info!("{chain:?}: saved para DB ({} bytes)", db.len());
102                } else if let Some(err) = v.get("error") {
103                    log::warn!("{chain:?}: para DB save returned error: {err}");
104                }
105                return;
106            }
107        }
108
109        // Check for system_health response.
110        if let Some(result) = v.get("result") {
111            if let (Some(peers), Some(is_syncing)) = (
112                result.get("peers").and_then(|p| p.as_u64()),
113                result.get("isSyncing").and_then(|s| s.as_bool()),
114            ) {
115                let current_block = match &entry.status.state {
116                    ChainState::Live { best_block, .. }
117                    | ChainState::Syncing { best_block, .. } => *best_block,
118                    _ => 0,
119                };
120
121                entry.status.state = if is_syncing {
122                    ChainState::Syncing {
123                        best_block: current_block,
124                        peers: peers as u32,
125                    }
126                } else {
127                    ChainState::Live {
128                        best_block: current_block,
129                        peers: peers as u32,
130                    }
131                };
132                return;
133            }
134        }
135
136        // Check for chain_newHead subscription notification.
137        if let Some(block) = parse_block_number(text) {
138            let (current_peers, current_syncing) = match &entry.status.state {
139                ChainState::Live { peers, .. } => (*peers, false),
140                ChainState::Syncing { peers, .. } => (*peers, true),
141                ChainState::Connecting => (0, true),
142                _ => (0, false),
143            };
144
145            entry.status.state = if current_syncing && current_peers > 0 {
146                ChainState::Syncing {
147                    best_block: block,
148                    peers: current_peers,
149                }
150            } else {
151                ChainState::Live {
152                    best_block: block,
153                    peers: current_peers,
154                }
155            };
156        }
157    }
158
159    /// Process a JSON-RPC response for a relay chain DB save.
160    pub fn process_relay_response(&mut self, chain: ChainId, text: &str) {
161        if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
162            if v.get("id").and_then(|i| i.as_u64()) == Some(REQ_ID_RELAY_DB_SAVE) {
163                if let Some(db) = v.get("result").and_then(|r| r.as_str()) {
164                    self.store.save(chain.relay_db_key(), db);
165                    log::info!("{chain:?}: saved relay DB ({} bytes)", db.len());
166                } else if let Some(err) = v.get("error") {
167                    log::warn!("{chain:?}: relay DB save returned error: {err}");
168                }
169            }
170        }
171    }
172
173    /// Set the chain state to Error.
174    pub fn set_error(&mut self, chain: ChainId, msg: String) {
175        if let Some(entry) = self.chains.get_mut(&chain) {
176            entry.status.state = ChainState::Error(msg);
177        }
178    }
179
180    pub fn status(&self, chain: ChainId) -> ChainStatus {
181        self.chains
182            .get(&chain)
183            .map(|e| e.status.clone())
184            .unwrap_or_else(|| ChainStatus::disconnected(chain))
185    }
186
187    pub fn all_statuses(&self) -> Vec<ChainStatus> {
188        ChainId::all().iter().map(|&id| self.status(id)).collect()
189    }
190
191    /// Generate the chain_subscribeNewHeads JSON-RPC request.
192    pub fn subscribe_new_heads_request() -> String {
193        serde_json::json!({
194            "jsonrpc": "2.0",
195            "id": 1,
196            "method": "chain_subscribeNewHeads",
197            "params": []
198        })
199        .to_string()
200    }
201
202    /// Generate a system_health JSON-RPC request with an incrementing ID.
203    /// Returns `None` if the chain is not registered.
204    pub fn health_check_request(&mut self, chain: ChainId) -> Option<String> {
205        let entry = self.chains.get_mut(&chain)?;
206        entry.health_id += 1;
207        Some(
208            serde_json::json!({
209                "jsonrpc": "2.0",
210                "id": entry.health_id,
211                "method": "system_health",
212                "params": []
213            })
214            .to_string(),
215        )
216    }
217
218    /// Generate a parachain DB save request.
219    pub fn para_db_save_request() -> String {
220        serde_json::json!({
221            "jsonrpc": "2.0",
222            "id": REQ_ID_PARA_DB_SAVE,
223            "method": "chainHead_unstable_finalizedDatabase",
224            "params": []
225        })
226        .to_string()
227    }
228
229    /// Generate a relay chain DB save request.
230    pub fn relay_db_save_request() -> String {
231        serde_json::json!({
232            "jsonrpc": "2.0",
233            "id": REQ_ID_RELAY_DB_SAVE,
234            "method": "chainHead_unstable_finalizedDatabase",
235            "params": []
236        })
237        .to_string()
238    }
239
240    /// Load persisted relay chain DB.
241    pub fn load_relay_db(&self, chain: ChainId) -> String {
242        self.store.load(chain.relay_db_key())
243    }
244
245    /// Load persisted parachain DB.
246    pub fn load_para_db(&self, chain: ChainId) -> String {
247        self.store.load(&chain.para_db_key())
248    }
249
250    /// Get chain specs for a smoldot chain. Returns (relay_spec, para_spec).
251    pub fn chain_specs(chain: ChainId) -> Option<(&'static str, &'static str)> {
252        chain.chain_specs()
253    }
254
255    // -- Statement store RPC request generators --
256
257    /// Generate a statement_submit JSON-RPC request.
258    pub fn statement_submit_request(encoded_hex: &str, request_id: u64) -> String {
259        serde_json::json!({
260            "jsonrpc": "2.0",
261            "id": request_id,
262            "method": "statement_submit",
263            "params": [encoded_hex]
264        })
265        .to_string()
266    }
267
268    /// Generate a statement_subscribeStatement JSON-RPC request.
269    pub fn statement_subscribe_request(request_id: u64) -> String {
270        serde_json::json!({
271            "jsonrpc": "2.0",
272            "id": request_id,
273            "method": "statement_subscribeStatement",
274            "params": ["any"]
275        })
276        .to_string()
277    }
278
279    /// Generate a statement_unsubscribeStatement JSON-RPC request.
280    pub fn statement_unsubscribe_request(sub_id: &str, request_id: u64) -> String {
281        serde_json::json!({
282            "jsonrpc": "2.0",
283            "id": request_id,
284            "method": "statement_unsubscribeStatement",
285            "params": [sub_id]
286        })
287        .to_string()
288    }
289
290    /// Generate a statement_broadcastsStatement JSON-RPC request.
291    pub fn statement_broadcasts_request(topic_hexes: &[String], request_id: u64) -> String {
292        serde_json::json!({
293            "jsonrpc": "2.0",
294            "id": request_id,
295            "method": "statement_broadcastsStatement",
296            "params": [topic_hexes]
297        })
298        .to_string()
299    }
300
301    /// Parse a statement_subscribeStatement notification.
302    /// Returns hex-encoded statement strings found in the notification,
303    /// or an empty vec if this is not a statement notification.
304    pub fn parse_statement_notification(text: &str) -> Vec<String> {
305        let v: serde_json::Value = match serde_json::from_str(text) {
306            Ok(v) => v,
307            Err(_) => return Vec::new(),
308        };
309
310        if v.get("method").and_then(|m| m.as_str()) != Some("statement_subscribeStatement") {
311            return Vec::new();
312        }
313
314        let result = match v.pointer("/params/result") {
315            Some(r) => r,
316            None => return Vec::new(),
317        };
318
319        let stmts = result
320            .pointer("/data/statements")
321            .or_else(|| result.pointer("/newStatements/statements"))
322            .or_else(|| result.get("statements"));
323
324        match stmts.and_then(|s| s.as_array()) {
325            Some(arr) => arr
326                .iter()
327                .filter_map(|v| v.as_str().map(String::from))
328                .collect(),
329            None => Vec::new(),
330        }
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use std::cell::RefCell;
338
339    struct InMemoryStore {
340        data: RefCell<HashMap<String, String>>,
341    }
342
343    impl InMemoryStore {
344        fn new() -> Self {
345            Self {
346                data: RefCell::new(HashMap::new()),
347            }
348        }
349    }
350
351    impl ChainStore for InMemoryStore {
352        fn load(&self, key: &str) -> String {
353            self.data.borrow().get(key).cloned().unwrap_or_default()
354        }
355
356        fn save(&self, key: &str, data: &str) {
357            self.data
358                .borrow_mut()
359                .insert(key.to_string(), data.to_string());
360        }
361    }
362
363    fn make_sm() -> ChainStateMachine<InMemoryStore> {
364        let store = InMemoryStore::new();
365        let mut sm = ChainStateMachine::new(store);
366        sm.register_chain(ChainId::PaseoAssetHub);
367        sm
368    }
369
370    #[test]
371    fn register_sets_connecting() {
372        let sm = make_sm();
373        let status = sm.status(ChainId::PaseoAssetHub);
374        assert!(matches!(status.state, ChainState::Connecting));
375    }
376
377    #[test]
378    fn unregister_sets_disconnected() {
379        let mut sm = make_sm();
380        sm.unregister_chain(ChainId::PaseoAssetHub);
381        let status = sm.status(ChainId::PaseoAssetHub);
382        assert!(matches!(status.state, ChainState::Disconnected));
383    }
384
385    #[test]
386    fn unregistered_chain_returns_disconnected() {
387        let sm = make_sm();
388        let status = sm.status(ChainId::Ethereum);
389        assert!(matches!(status.state, ChainState::Disconnected));
390    }
391
392    #[test]
393    fn health_response_sets_live() {
394        let mut sm = make_sm();
395        let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
396        sm.process_response(ChainId::PaseoAssetHub, resp);
397        let status = sm.status(ChainId::PaseoAssetHub);
398        assert!(matches!(status.state, ChainState::Live { peers: 5, .. }));
399    }
400
401    #[test]
402    fn health_response_sets_syncing() {
403        let mut sm = make_sm();
404        let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":3,"isSyncing":true}}"#;
405        sm.process_response(ChainId::PaseoAssetHub, resp);
406        let status = sm.status(ChainId::PaseoAssetHub);
407        assert!(matches!(status.state, ChainState::Syncing { peers: 3, .. }));
408    }
409
410    #[test]
411    fn new_head_updates_block_number() {
412        let mut sm = make_sm();
413        // First set to Live via health
414        let health = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
415        sm.process_response(ChainId::PaseoAssetHub, health);
416        // Then new head notification
417        let head =
418            r#"{"jsonrpc":"2.0","method":"chain_newHead","params":{"result":{"number":"0x1a4"}}}"#;
419        sm.process_response(ChainId::PaseoAssetHub, head);
420        let status = sm.status(ChainId::PaseoAssetHub);
421        match status.state {
422            ChainState::Live {
423                best_block, peers, ..
424            } => {
425                assert_eq!(best_block, 0x1a4);
426                assert_eq!(peers, 5);
427            }
428            other => panic!("expected Live, got {other:?}"),
429        }
430    }
431
432    #[test]
433    fn para_db_save_stores_to_store() {
434        let mut sm = make_sm();
435        let resp = format!(
436            r#"{{"jsonrpc":"2.0","id":{},"result":"saved-db-content"}}"#,
437            REQ_ID_PARA_DB_SAVE,
438        );
439        sm.process_response(ChainId::PaseoAssetHub, &resp);
440        assert_eq!(sm.store().load("PaseoAssetHub"), "saved-db-content");
441    }
442
443    #[test]
444    fn relay_db_save_stores_to_store() {
445        let mut sm = make_sm();
446        let resp = format!(
447            r#"{{"jsonrpc":"2.0","id":{},"result":"relay-db-content"}}"#,
448            REQ_ID_RELAY_DB_SAVE,
449        );
450        sm.process_relay_response(ChainId::PaseoAssetHub, &resp);
451        assert_eq!(
452            sm.store().load(ChainId::PaseoAssetHub.relay_db_key()),
453            "relay-db-content"
454        );
455    }
456
457    #[test]
458    fn set_state_preserves_extra() {
459        let mut sm = make_sm();
460        sm.set_state_with_extra(
461            ChainId::PaseoAssetHub,
462            ChainState::Live {
463                best_block: 100,
464                peers: 5,
465            },
466            ChainExtra::Eth {
467                finalized_block: 50,
468                gas_price_gwei: 20,
469            },
470        );
471        // set_state (without extra) should preserve the Eth extra
472        sm.set_state(
473            ChainId::PaseoAssetHub,
474            ChainState::Live {
475                best_block: 200,
476                peers: 3,
477            },
478        );
479        let status = sm.status(ChainId::PaseoAssetHub);
480        assert!(matches!(
481            status.extra,
482            ChainExtra::Eth {
483                finalized_block: 50,
484                gas_price_gwei: 20
485            }
486        ));
487        assert!(matches!(
488            status.state,
489            ChainState::Live {
490                best_block: 200,
491                peers: 3
492            }
493        ));
494    }
495
496    #[test]
497    fn set_state_with_extra_updates_both() {
498        let mut sm = make_sm();
499        sm.set_state_with_extra(
500            ChainId::PaseoAssetHub,
501            ChainState::Live {
502                best_block: 100,
503                peers: 5,
504            },
505            ChainExtra::Btc {
506                tip_height: 800000,
507                fee_rate_sat_vb: 10,
508            },
509        );
510        let status = sm.status(ChainId::PaseoAssetHub);
511        assert!(matches!(
512            status.state,
513            ChainState::Live {
514                best_block: 100,
515                peers: 5
516            }
517        ));
518        assert!(matches!(
519            status.extra,
520            ChainExtra::Btc {
521                tip_height: 800000,
522                fee_rate_sat_vb: 10
523            }
524        ));
525    }
526
527    #[test]
528    fn process_response_unregistered_chain_is_noop() {
529        let mut sm = make_sm();
530        let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
531        // Ethereum is not registered — should not panic or change anything
532        sm.process_response(ChainId::Ethereum, resp);
533        let status = sm.status(ChainId::Ethereum);
534        assert!(matches!(status.state, ChainState::Disconnected));
535    }
536
537    #[test]
538    fn health_check_request_unregistered_returns_none() {
539        let mut sm = make_sm();
540        assert!(sm.health_check_request(ChainId::Ethereum).is_none());
541    }
542
543    #[test]
544    fn health_check_request_id_starts_above_1000_and_increments() {
545        let mut sm = make_sm();
546        let req1 = sm.health_check_request(ChainId::PaseoAssetHub).unwrap();
547        let req2 = sm.health_check_request(ChainId::PaseoAssetHub).unwrap();
548
549        let v1: serde_json::Value = serde_json::from_str(&req1).unwrap();
550        let v2: serde_json::Value = serde_json::from_str(&req2).unwrap();
551
552        let id1 = v1["id"].as_u64().unwrap();
553        let id2 = v2["id"].as_u64().unwrap();
554
555        assert!(id1 > 1000);
556        assert_eq!(id2, id1 + 1);
557    }
558
559    #[test]
560    fn para_db_save_error_does_not_change_state() {
561        let mut sm = make_sm();
562        // Set to Live first
563        let health = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
564        sm.process_response(ChainId::PaseoAssetHub, health);
565
566        // DB save error response — should not trigger health or new-head branches
567        let error_resp = format!(
568            r#"{{"jsonrpc":"2.0","id":{},"error":{{"code":-32000,"message":"db error"}}}}"#,
569            REQ_ID_PARA_DB_SAVE,
570        );
571        sm.process_response(ChainId::PaseoAssetHub, &error_resp);
572
573        // State should still be Live with peers=5
574        let status = sm.status(ChainId::PaseoAssetHub);
575        assert!(matches!(status.state, ChainState::Live { peers: 5, .. }));
576        // And nothing should have been saved to the store
577        assert_eq!(sm.store().load("PaseoAssetHub"), "");
578    }
579}