1use std::collections::HashMap;
8
9use crate::chain::{
10 parse_block_number, ChainExtra, ChainId, ChainState, ChainStatus, REQ_ID_PARA_DB_SAVE,
11 REQ_ID_RELAY_DB_SAVE,
12};
13use crate::store::ChainStore;
14
15struct ChainEntry {
17 status: ChainStatus,
18 para_db_key: String,
19 health_id: u64,
20}
21
22pub struct ChainStateMachine<S: ChainStore> {
27 chains: HashMap<ChainId, ChainEntry>,
28 store: S,
29}
30
31impl<S: ChainStore> ChainStateMachine<S> {
32 pub fn new(store: S) -> Self {
33 Self {
34 chains: HashMap::new(),
35 store,
36 }
37 }
38
39 pub fn register_chain(&mut self, chain: ChainId) {
41 let para_db_key = chain.para_db_key();
42 self.chains.insert(
43 chain,
44 ChainEntry {
45 status: ChainStatus {
46 id: chain,
47 name: chain.display_name(),
48 state: ChainState::Connecting,
49 extra: ChainExtra::None,
50 },
51 para_db_key,
52 health_id: 1000,
53 },
54 );
55 }
56
57 pub fn unregister_chain(&mut self, chain: ChainId) {
59 if let Some(entry) = self.chains.get_mut(&chain) {
60 entry.status.state = ChainState::Disconnected;
61 }
62 }
63
64 pub fn set_state(&mut self, chain: ChainId, state: ChainState) {
66 if let Some(entry) = self.chains.get_mut(&chain) {
67 entry.status.state = state;
68 }
69 }
70
71 pub fn set_state_with_extra(&mut self, chain: ChainId, state: ChainState, extra: ChainExtra) {
73 if let Some(entry) = self.chains.get_mut(&chain) {
74 entry.status.state = state;
75 entry.status.extra = extra;
76 }
77 }
78
79 pub fn store(&self) -> &S {
81 &self.store
82 }
83
84 pub fn process_response(&mut self, chain: ChainId, text: &str) {
86 let v: serde_json::Value = match serde_json::from_str(text) {
87 Ok(v) => v,
88 Err(_) => return,
89 };
90
91 let entry = match self.chains.get_mut(&chain) {
92 Some(e) => e,
93 None => return,
94 };
95
96 if let Some(id) = v.get("id").and_then(|i| i.as_u64()) {
98 if id == REQ_ID_PARA_DB_SAVE {
99 if let Some(db) = v.get("result").and_then(|r| r.as_str()) {
100 self.store.save(&entry.para_db_key, db);
101 log::info!("{chain:?}: saved para DB ({} bytes)", db.len());
102 } else if let Some(err) = v.get("error") {
103 log::warn!("{chain:?}: para DB save returned error: {err}");
104 }
105 return;
106 }
107 }
108
109 if let Some(result) = v.get("result") {
111 if let (Some(peers), Some(is_syncing)) = (
112 result.get("peers").and_then(|p| p.as_u64()),
113 result.get("isSyncing").and_then(|s| s.as_bool()),
114 ) {
115 let current_block = match &entry.status.state {
116 ChainState::Live { best_block, .. }
117 | ChainState::Syncing { best_block, .. } => *best_block,
118 _ => 0,
119 };
120
121 entry.status.state = if is_syncing {
122 ChainState::Syncing {
123 best_block: current_block,
124 peers: peers as u32,
125 }
126 } else {
127 ChainState::Live {
128 best_block: current_block,
129 peers: peers as u32,
130 }
131 };
132 return;
133 }
134 }
135
136 if let Some(block) = parse_block_number(text) {
138 let (current_peers, current_syncing) = match &entry.status.state {
139 ChainState::Live { peers, .. } => (*peers, false),
140 ChainState::Syncing { peers, .. } => (*peers, true),
141 ChainState::Connecting => (0, true),
142 _ => (0, false),
143 };
144
145 entry.status.state = if current_syncing && current_peers > 0 {
146 ChainState::Syncing {
147 best_block: block,
148 peers: current_peers,
149 }
150 } else {
151 ChainState::Live {
152 best_block: block,
153 peers: current_peers,
154 }
155 };
156 }
157 }
158
159 pub fn process_relay_response(&mut self, chain: ChainId, text: &str) {
161 if let Ok(v) = serde_json::from_str::<serde_json::Value>(text) {
162 if v.get("id").and_then(|i| i.as_u64()) == Some(REQ_ID_RELAY_DB_SAVE) {
163 if let Some(db) = v.get("result").and_then(|r| r.as_str()) {
164 self.store.save(chain.relay_db_key(), db);
165 log::info!("{chain:?}: saved relay DB ({} bytes)", db.len());
166 } else if let Some(err) = v.get("error") {
167 log::warn!("{chain:?}: relay DB save returned error: {err}");
168 }
169 }
170 }
171 }
172
173 pub fn set_error(&mut self, chain: ChainId, msg: String) {
175 if let Some(entry) = self.chains.get_mut(&chain) {
176 entry.status.state = ChainState::Error(msg);
177 }
178 }
179
180 pub fn status(&self, chain: ChainId) -> ChainStatus {
181 self.chains
182 .get(&chain)
183 .map(|e| e.status.clone())
184 .unwrap_or_else(|| ChainStatus::disconnected(chain))
185 }
186
187 pub fn all_statuses(&self) -> Vec<ChainStatus> {
188 ChainId::all().iter().map(|&id| self.status(id)).collect()
189 }
190
191 pub fn subscribe_new_heads_request() -> String {
193 serde_json::json!({
194 "jsonrpc": "2.0",
195 "id": 1,
196 "method": "chain_subscribeNewHeads",
197 "params": []
198 })
199 .to_string()
200 }
201
202 pub fn health_check_request(&mut self, chain: ChainId) -> Option<String> {
205 let entry = self.chains.get_mut(&chain)?;
206 entry.health_id += 1;
207 Some(
208 serde_json::json!({
209 "jsonrpc": "2.0",
210 "id": entry.health_id,
211 "method": "system_health",
212 "params": []
213 })
214 .to_string(),
215 )
216 }
217
218 pub fn para_db_save_request() -> String {
220 serde_json::json!({
221 "jsonrpc": "2.0",
222 "id": REQ_ID_PARA_DB_SAVE,
223 "method": "chainHead_unstable_finalizedDatabase",
224 "params": []
225 })
226 .to_string()
227 }
228
229 pub fn relay_db_save_request() -> String {
231 serde_json::json!({
232 "jsonrpc": "2.0",
233 "id": REQ_ID_RELAY_DB_SAVE,
234 "method": "chainHead_unstable_finalizedDatabase",
235 "params": []
236 })
237 .to_string()
238 }
239
240 pub fn load_relay_db(&self, chain: ChainId) -> String {
242 self.store.load(chain.relay_db_key())
243 }
244
245 pub fn load_para_db(&self, chain: ChainId) -> String {
247 self.store.load(&chain.para_db_key())
248 }
249
250 pub fn chain_specs(chain: ChainId) -> Option<(&'static str, &'static str)> {
252 chain.chain_specs()
253 }
254
255 pub fn statement_submit_request(encoded_hex: &str, request_id: u64) -> String {
259 serde_json::json!({
260 "jsonrpc": "2.0",
261 "id": request_id,
262 "method": "statement_submit",
263 "params": [encoded_hex]
264 })
265 .to_string()
266 }
267
268 pub fn statement_subscribe_request(request_id: u64) -> String {
270 serde_json::json!({
271 "jsonrpc": "2.0",
272 "id": request_id,
273 "method": "statement_subscribeStatement",
274 "params": ["any"]
275 })
276 .to_string()
277 }
278
279 pub fn statement_unsubscribe_request(sub_id: &str, request_id: u64) -> String {
281 serde_json::json!({
282 "jsonrpc": "2.0",
283 "id": request_id,
284 "method": "statement_unsubscribeStatement",
285 "params": [sub_id]
286 })
287 .to_string()
288 }
289
290 pub fn statement_broadcasts_request(topic_hexes: &[String], request_id: u64) -> String {
292 serde_json::json!({
293 "jsonrpc": "2.0",
294 "id": request_id,
295 "method": "statement_broadcastsStatement",
296 "params": [topic_hexes]
297 })
298 .to_string()
299 }
300
301 pub fn parse_statement_notification(text: &str) -> Vec<String> {
305 let v: serde_json::Value = match serde_json::from_str(text) {
306 Ok(v) => v,
307 Err(_) => return Vec::new(),
308 };
309
310 if v.get("method").and_then(|m| m.as_str()) != Some("statement_subscribeStatement") {
311 return Vec::new();
312 }
313
314 let result = match v.pointer("/params/result") {
315 Some(r) => r,
316 None => return Vec::new(),
317 };
318
319 let stmts = result
320 .pointer("/data/statements")
321 .or_else(|| result.pointer("/newStatements/statements"))
322 .or_else(|| result.get("statements"));
323
324 match stmts.and_then(|s| s.as_array()) {
325 Some(arr) => arr
326 .iter()
327 .filter_map(|v| v.as_str().map(String::from))
328 .collect(),
329 None => Vec::new(),
330 }
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use std::cell::RefCell;
338
339 struct InMemoryStore {
340 data: RefCell<HashMap<String, String>>,
341 }
342
343 impl InMemoryStore {
344 fn new() -> Self {
345 Self {
346 data: RefCell::new(HashMap::new()),
347 }
348 }
349 }
350
351 impl ChainStore for InMemoryStore {
352 fn load(&self, key: &str) -> String {
353 self.data.borrow().get(key).cloned().unwrap_or_default()
354 }
355
356 fn save(&self, key: &str, data: &str) {
357 self.data
358 .borrow_mut()
359 .insert(key.to_string(), data.to_string());
360 }
361 }
362
363 fn make_sm() -> ChainStateMachine<InMemoryStore> {
364 let store = InMemoryStore::new();
365 let mut sm = ChainStateMachine::new(store);
366 sm.register_chain(ChainId::PaseoAssetHub);
367 sm
368 }
369
370 #[test]
371 fn register_sets_connecting() {
372 let sm = make_sm();
373 let status = sm.status(ChainId::PaseoAssetHub);
374 assert!(matches!(status.state, ChainState::Connecting));
375 }
376
377 #[test]
378 fn unregister_sets_disconnected() {
379 let mut sm = make_sm();
380 sm.unregister_chain(ChainId::PaseoAssetHub);
381 let status = sm.status(ChainId::PaseoAssetHub);
382 assert!(matches!(status.state, ChainState::Disconnected));
383 }
384
385 #[test]
386 fn unregistered_chain_returns_disconnected() {
387 let sm = make_sm();
388 let status = sm.status(ChainId::Ethereum);
389 assert!(matches!(status.state, ChainState::Disconnected));
390 }
391
392 #[test]
393 fn health_response_sets_live() {
394 let mut sm = make_sm();
395 let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
396 sm.process_response(ChainId::PaseoAssetHub, resp);
397 let status = sm.status(ChainId::PaseoAssetHub);
398 assert!(matches!(status.state, ChainState::Live { peers: 5, .. }));
399 }
400
401 #[test]
402 fn health_response_sets_syncing() {
403 let mut sm = make_sm();
404 let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":3,"isSyncing":true}}"#;
405 sm.process_response(ChainId::PaseoAssetHub, resp);
406 let status = sm.status(ChainId::PaseoAssetHub);
407 assert!(matches!(status.state, ChainState::Syncing { peers: 3, .. }));
408 }
409
410 #[test]
411 fn new_head_updates_block_number() {
412 let mut sm = make_sm();
413 let health = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
415 sm.process_response(ChainId::PaseoAssetHub, health);
416 let head =
418 r#"{"jsonrpc":"2.0","method":"chain_newHead","params":{"result":{"number":"0x1a4"}}}"#;
419 sm.process_response(ChainId::PaseoAssetHub, head);
420 let status = sm.status(ChainId::PaseoAssetHub);
421 match status.state {
422 ChainState::Live {
423 best_block, peers, ..
424 } => {
425 assert_eq!(best_block, 0x1a4);
426 assert_eq!(peers, 5);
427 }
428 other => panic!("expected Live, got {other:?}"),
429 }
430 }
431
432 #[test]
433 fn para_db_save_stores_to_store() {
434 let mut sm = make_sm();
435 let resp = format!(
436 r#"{{"jsonrpc":"2.0","id":{},"result":"saved-db-content"}}"#,
437 REQ_ID_PARA_DB_SAVE,
438 );
439 sm.process_response(ChainId::PaseoAssetHub, &resp);
440 assert_eq!(sm.store().load("PaseoAssetHub"), "saved-db-content");
441 }
442
443 #[test]
444 fn relay_db_save_stores_to_store() {
445 let mut sm = make_sm();
446 let resp = format!(
447 r#"{{"jsonrpc":"2.0","id":{},"result":"relay-db-content"}}"#,
448 REQ_ID_RELAY_DB_SAVE,
449 );
450 sm.process_relay_response(ChainId::PaseoAssetHub, &resp);
451 assert_eq!(
452 sm.store().load(ChainId::PaseoAssetHub.relay_db_key()),
453 "relay-db-content"
454 );
455 }
456
457 #[test]
458 fn set_state_preserves_extra() {
459 let mut sm = make_sm();
460 sm.set_state_with_extra(
461 ChainId::PaseoAssetHub,
462 ChainState::Live {
463 best_block: 100,
464 peers: 5,
465 },
466 ChainExtra::Eth {
467 finalized_block: 50,
468 gas_price_gwei: 20,
469 },
470 );
471 sm.set_state(
473 ChainId::PaseoAssetHub,
474 ChainState::Live {
475 best_block: 200,
476 peers: 3,
477 },
478 );
479 let status = sm.status(ChainId::PaseoAssetHub);
480 assert!(matches!(
481 status.extra,
482 ChainExtra::Eth {
483 finalized_block: 50,
484 gas_price_gwei: 20
485 }
486 ));
487 assert!(matches!(
488 status.state,
489 ChainState::Live {
490 best_block: 200,
491 peers: 3
492 }
493 ));
494 }
495
496 #[test]
497 fn set_state_with_extra_updates_both() {
498 let mut sm = make_sm();
499 sm.set_state_with_extra(
500 ChainId::PaseoAssetHub,
501 ChainState::Live {
502 best_block: 100,
503 peers: 5,
504 },
505 ChainExtra::Btc {
506 tip_height: 800000,
507 fee_rate_sat_vb: 10,
508 },
509 );
510 let status = sm.status(ChainId::PaseoAssetHub);
511 assert!(matches!(
512 status.state,
513 ChainState::Live {
514 best_block: 100,
515 peers: 5
516 }
517 ));
518 assert!(matches!(
519 status.extra,
520 ChainExtra::Btc {
521 tip_height: 800000,
522 fee_rate_sat_vb: 10
523 }
524 ));
525 }
526
527 #[test]
528 fn process_response_unregistered_chain_is_noop() {
529 let mut sm = make_sm();
530 let resp = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
531 sm.process_response(ChainId::Ethereum, resp);
533 let status = sm.status(ChainId::Ethereum);
534 assert!(matches!(status.state, ChainState::Disconnected));
535 }
536
537 #[test]
538 fn health_check_request_unregistered_returns_none() {
539 let mut sm = make_sm();
540 assert!(sm.health_check_request(ChainId::Ethereum).is_none());
541 }
542
543 #[test]
544 fn health_check_request_id_starts_above_1000_and_increments() {
545 let mut sm = make_sm();
546 let req1 = sm.health_check_request(ChainId::PaseoAssetHub).unwrap();
547 let req2 = sm.health_check_request(ChainId::PaseoAssetHub).unwrap();
548
549 let v1: serde_json::Value = serde_json::from_str(&req1).unwrap();
550 let v2: serde_json::Value = serde_json::from_str(&req2).unwrap();
551
552 let id1 = v1["id"].as_u64().unwrap();
553 let id2 = v2["id"].as_u64().unwrap();
554
555 assert!(id1 > 1000);
556 assert_eq!(id2, id1 + 1);
557 }
558
559 #[test]
560 fn para_db_save_error_does_not_change_state() {
561 let mut sm = make_sm();
562 let health = r#"{"jsonrpc":"2.0","id":1001,"result":{"peers":5,"isSyncing":false}}"#;
564 sm.process_response(ChainId::PaseoAssetHub, health);
565
566 let error_resp = format!(
568 r#"{{"jsonrpc":"2.0","id":{},"error":{{"code":-32000,"message":"db error"}}}}"#,
569 REQ_ID_PARA_DB_SAVE,
570 );
571 sm.process_response(ChainId::PaseoAssetHub, &error_resp);
572
573 let status = sm.status(ChainId::PaseoAssetHub);
575 assert!(matches!(status.state, ChainState::Live { peers: 5, .. }));
576 assert_eq!(sm.store().load("PaseoAssetHub"), "");
578 }
579}