1use std::collections::HashMap;
31use std::time::{Duration, Instant};
32
33use serde::{Deserialize, Serialize};
34use std::sync::Arc;
35use tokio::sync::broadcast;
36use tokio::sync::RwLock;
37
38use crate::error::IndexerError;
39use crate::handler::DecodedEvent;
40use crate::indexer::{IndexerConfig, IndexerState};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ChainInstance {
47 pub config: IndexerConfig,
49 pub state: IndexerState,
51 pub head_block: u64,
53 pub events_processed: u64,
55 pub last_error: Option<String>,
57 pub started_at: Option<i64>,
59}
60
61impl ChainInstance {
62 pub fn new(config: IndexerConfig) -> Self {
64 Self {
65 config,
66 state: IndexerState::Idle,
67 head_block: 0,
68 events_processed: 0,
69 last_error: None,
70 started_at: None,
71 }
72 }
73
74 pub fn is_active(&self) -> bool {
76 matches!(
77 self.state,
78 IndexerState::Backfilling | IndexerState::Live | IndexerState::ReorgRecovery
79 )
80 }
81
82 pub fn is_error(&self) -> bool {
84 matches!(self.state, IndexerState::Error)
85 }
86
87 pub fn transition(&mut self, new_state: IndexerState, error: Option<String>) {
89 self.state = new_state;
90 if new_state == IndexerState::Error {
91 self.last_error = error;
92 } else if error.is_none() && !matches!(new_state, IndexerState::Error) {
93 self.last_error = None;
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct MultiChainConfig {
104 pub chains: Vec<IndexerConfig>,
106 pub max_concurrent_chains: usize,
109 pub health_check_interval: Duration,
111 pub restart_on_error: bool,
114 pub restart_delay: Duration,
116}
117
118impl Default for MultiChainConfig {
119 fn default() -> Self {
120 Self {
121 chains: vec![],
122 max_concurrent_chains: 0, health_check_interval: Duration::from_secs(30),
124 restart_on_error: true,
125 restart_delay: Duration::from_secs(5),
126 }
127 }
128}
129
130impl MultiChainConfig {
131 pub fn validate(&self) -> Option<String> {
133 if self.health_check_interval.is_zero() {
134 return Some("health_check_interval must be non-zero".into());
135 }
136 let mut seen = std::collections::HashSet::new();
138 for cfg in &self.chains {
139 if !seen.insert(&cfg.id) {
140 return Some(format!("duplicate chain id '{}'", cfg.id));
141 }
142 }
143 None
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct ChainHealth {
152 pub chain: String,
154 pub state: IndexerState,
156 pub head_block: u64,
158 pub events_processed: u64,
160 pub block_lag: u64,
162 pub uptime: Duration,
164 pub last_error: Option<String>,
166 pub is_healthy: bool,
168}
169
170impl ChainHealth {
171 fn from_instance(instance: &ChainInstance, now_secs: i64) -> Self {
172 let uptime = instance
173 .started_at
174 .map(|s| Duration::from_secs(now_secs.saturating_sub(s).max(0) as u64))
175 .unwrap_or(Duration::ZERO);
176
177 let is_healthy = instance.is_active() && instance.last_error.is_none();
178
179 Self {
180 chain: instance.config.id.clone(),
181 state: instance.state,
182 head_block: instance.head_block,
183 events_processed: instance.events_processed,
184 block_lag: 0, uptime,
186 last_error: instance.last_error.clone(),
187 is_healthy,
188 }
189 }
190}
191
192pub struct MultiChainCoordinator {
201 config: MultiChainConfig,
202 instances: RwLock<HashMap<String, ChainInstance>>,
204 started: Instant,
206}
207
208impl MultiChainCoordinator {
209 pub fn new(config: MultiChainConfig) -> Self {
214 let mut instances = HashMap::new();
215 for chain_cfg in &config.chains {
216 instances.insert(chain_cfg.id.clone(), ChainInstance::new(chain_cfg.clone()));
217 }
218 Self {
219 config,
220 instances: RwLock::new(instances),
221 started: Instant::now(),
222 }
223 }
224
225 pub async fn add_chain(&self, config: IndexerConfig) -> Result<(), IndexerError> {
231 let mut guard = self.instances.write().await;
232 if guard.contains_key(&config.id) {
233 return Err(IndexerError::Other(format!(
234 "chain '{}' already registered",
235 config.id
236 )));
237 }
238 guard.insert(config.id.clone(), ChainInstance::new(config));
239 Ok(())
240 }
241
242 pub async fn remove_chain(&self, chain_id: &str) -> Result<(), IndexerError> {
247 let mut guard = self.instances.write().await;
248 if guard.remove(chain_id).is_none() {
249 return Err(IndexerError::Other(format!(
250 "chain '{}' not found",
251 chain_id
252 )));
253 }
254 Ok(())
255 }
256
257 pub async fn pause_chain(&self, chain_id: &str) -> Result<(), IndexerError> {
262 let mut guard = self.instances.write().await;
263 let instance = guard
264 .get_mut(chain_id)
265 .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
266 if !instance.is_active() {
267 return Err(IndexerError::Other(format!(
268 "chain '{}' is not active (state: {})",
269 chain_id, instance.state
270 )));
271 }
272 instance.transition(IndexerState::Stopping, None);
273 tracing::info!(chain = %chain_id, "pausing chain");
274 Ok(())
275 }
276
277 pub async fn resume_chain(&self, chain_id: &str) -> Result<(), IndexerError> {
282 let mut guard = self.instances.write().await;
283 let instance = guard
284 .get_mut(chain_id)
285 .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
286 if instance.is_active() {
287 return Err(IndexerError::Other(format!(
288 "chain '{}' is already active (state: {})",
289 chain_id, instance.state
290 )));
291 }
292 instance.transition(IndexerState::Backfilling, None);
293 if instance.started_at.is_none() {
294 instance.started_at = Some(chrono::Utc::now().timestamp());
295 }
296 tracing::info!(chain = %chain_id, "resuming chain");
297 Ok(())
298 }
299
300 pub async fn update_state(
304 &self,
305 chain_id: &str,
306 new_state: IndexerState,
307 error: Option<String>,
308 ) -> Result<(), IndexerError> {
309 let mut guard = self.instances.write().await;
310 let instance = guard
311 .get_mut(chain_id)
312 .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
313 if (new_state == IndexerState::Backfilling || new_state == IndexerState::Live)
314 && instance.started_at.is_none()
315 {
316 instance.started_at = Some(chrono::Utc::now().timestamp());
317 }
318 instance.transition(new_state, error);
319 Ok(())
320 }
321
322 pub async fn record_block(
324 &self,
325 chain_id: &str,
326 block_number: u64,
327 events: u64,
328 ) -> Result<(), IndexerError> {
329 let mut guard = self.instances.write().await;
330 let instance = guard
331 .get_mut(chain_id)
332 .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
333 if block_number > instance.head_block {
334 instance.head_block = block_number;
335 }
336 instance.events_processed += events;
337 Ok(())
338 }
339
340 pub async fn health(&self) -> Vec<ChainHealth> {
344 let guard = self.instances.read().await;
345 let now = chrono::Utc::now().timestamp();
346 guard
347 .values()
348 .map(|inst| ChainHealth::from_instance(inst, now))
349 .collect()
350 }
351
352 pub async fn chain_health(&self, chain_id: &str) -> Option<ChainHealth> {
354 let guard = self.instances.read().await;
355 let now = chrono::Utc::now().timestamp();
356 guard
357 .get(chain_id)
358 .map(|inst| ChainHealth::from_instance(inst, now))
359 }
360
361 pub async fn chain_state(&self, chain_id: &str) -> Option<ChainInstance> {
363 let guard = self.instances.read().await;
364 guard.get(chain_id).cloned()
365 }
366
367 pub async fn chains(&self) -> Vec<String> {
369 let guard = self.instances.read().await;
370 guard.keys().cloned().collect()
371 }
372
373 pub async fn active_chains(&self) -> Vec<String> {
375 let guard = self.instances.read().await;
376 guard
377 .iter()
378 .filter(|(_, inst)| inst.is_active())
379 .map(|(id, _)| id.clone())
380 .collect()
381 }
382
383 pub async fn is_all_healthy(&self) -> bool {
385 let guard = self.instances.read().await;
386 guard
387 .values()
388 .all(|inst| inst.is_active() && inst.last_error.is_none())
389 }
390
391 pub async fn all_past_block(&self, min_block: u64) -> bool {
394 let guard = self.instances.read().await;
395 guard.values().all(|inst| inst.head_block >= min_block)
396 }
397
398 pub async fn chain_count(&self) -> usize {
400 self.instances.read().await.len()
401 }
402
403 pub fn config(&self) -> &MultiChainConfig {
405 &self.config
406 }
407
408 pub fn uptime(&self) -> Duration {
410 self.started.elapsed()
411 }
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrossChainEvent {
419 pub chain: String,
421 pub event: DecodedEvent,
423 pub received_at: i64,
425}
426
427pub type CrossChainReceiver = broadcast::Receiver<CrossChainEvent>;
429
430#[derive(Clone)]
439pub struct CrossChainEventBus {
440 sender: broadcast::Sender<CrossChainEvent>,
441}
442
443impl CrossChainEventBus {
444 pub fn new(capacity: usize) -> Self {
450 let (sender, _) = broadcast::channel(capacity);
451 Self { sender }
452 }
453
454 pub fn push(&self, chain: &str, event: DecodedEvent) -> usize {
459 let cross = CrossChainEvent {
460 chain: chain.to_string(),
461 event,
462 received_at: chrono::Utc::now().timestamp(),
463 };
464 self.sender.send(cross).unwrap_or(0)
466 }
467
468 pub fn subscribe(&self) -> CrossChainReceiver {
473 self.sender.subscribe()
474 }
475
476 pub fn subscriber_count(&self) -> usize {
478 self.sender.receiver_count()
479 }
480}
481
482#[derive(Debug, Clone, Default, Serialize, Deserialize)]
490pub struct ChainSyncStatus {
491 pub chains: HashMap<String, u64>,
493 tips: HashMap<String, u64>,
495 timestamps: HashMap<String, i64>,
497}
498
499impl ChainSyncStatus {
500 pub fn new() -> Self {
502 Self::default()
503 }
504
505 pub fn update(&mut self, chain: &str, head: u64) {
507 self.chains.insert(chain.to_string(), head);
508 }
509
510 pub fn update_with_timestamp(&mut self, chain: &str, head: u64, timestamp: i64) {
512 self.chains.insert(chain.to_string(), head);
513 self.timestamps.insert(chain.to_string(), timestamp);
514 }
515
516 pub fn update_tip(&mut self, chain: &str, tip: u64) {
518 self.tips.insert(chain.to_string(), tip);
519 }
520
521 pub fn min_timestamp(&self) -> Option<i64> {
524 self.timestamps.values().copied().reduce(i64::min)
525 }
526
527 pub fn max_timestamp(&self) -> Option<i64> {
529 self.timestamps.values().copied().reduce(i64::max)
530 }
531
532 pub fn all_past_block(&self, _chain: &str, block: u64) -> bool {
536 if self.chains.is_empty() {
537 return false;
538 }
539 self.chains.values().all(|&head| head >= block)
540 }
541
542 pub fn all_caught_up(&self, threshold_blocks: u64) -> bool {
547 if self.tips.is_empty() {
548 return false;
549 }
550 for (chain, &tip) in &self.tips {
551 let head = self.chains.get(chain).copied().unwrap_or(0);
552 if tip.saturating_sub(head) > threshold_blocks {
553 return false;
554 }
555 }
556 true
557 }
558
559 pub fn head_of(&self, chain: &str) -> Option<u64> {
561 self.chains.get(chain).copied()
562 }
563
564 pub fn lag_of(&self, chain: &str) -> Option<u64> {
567 let head = self.chains.get(chain).copied()?;
568 let tip = self.tips.get(chain).copied()?;
569 Some(tip.saturating_sub(head))
570 }
571
572 pub fn len(&self) -> usize {
574 self.chains.len()
575 }
576
577 pub fn is_empty(&self) -> bool {
579 self.chains.is_empty()
580 }
581
582 pub async fn from_coordinator(coordinator: &Arc<MultiChainCoordinator>) -> Self {
584 let guard = coordinator.instances.read().await;
585 let mut status = Self::new();
586 for (id, inst) in guard.iter() {
587 status.update(id, inst.head_block);
588 }
589 status
590 }
591}
592
593#[cfg(test)]
596mod tests {
597 use super::*;
598 use crate::types::EventFilter;
599
600 fn make_config(id: &str, chain: &str) -> IndexerConfig {
603 IndexerConfig {
604 id: id.into(),
605 chain: chain.into(),
606 from_block: 0,
607 to_block: None,
608 confirmation_depth: 12,
609 batch_size: 1000,
610 checkpoint_interval: 100,
611 poll_interval_ms: 2000,
612 filter: EventFilter::default(),
613 }
614 }
615
616 fn make_coordinator(ids: &[(&str, &str)]) -> MultiChainCoordinator {
617 let chains: Vec<IndexerConfig> = ids
618 .iter()
619 .map(|(id, chain)| make_config(id, chain))
620 .collect();
621 MultiChainCoordinator::new(MultiChainConfig {
622 chains,
623 ..Default::default()
624 })
625 }
626
627 fn dummy_event(chain: &str) -> DecodedEvent {
628 DecodedEvent {
629 chain: chain.into(),
630 schema: "ERC20Transfer".into(),
631 address: "0xabc".into(),
632 tx_hash: "0xdeadbeef".into(),
633 block_number: 100,
634 log_index: 0,
635 fields_json: serde_json::json!({"from": "0x1", "to": "0x2", "value": "1000"}),
636 }
637 }
638
639 #[test]
642 fn multichain_config_defaults() {
643 let cfg = MultiChainConfig::default();
644 assert!(cfg.chains.is_empty());
645 assert_eq!(cfg.max_concurrent_chains, 0);
646 assert_eq!(cfg.health_check_interval, Duration::from_secs(30));
647 assert!(cfg.restart_on_error);
648 assert_eq!(cfg.restart_delay, Duration::from_secs(5));
649 }
650
651 #[test]
652 fn multichain_config_validate_ok() {
653 let cfg = MultiChainConfig {
654 chains: vec![
655 make_config("eth", "ethereum"),
656 make_config("arb", "arbitrum"),
657 ],
658 ..Default::default()
659 };
660 assert!(cfg.validate().is_none());
661 }
662
663 #[test]
664 fn multichain_config_validate_duplicate_id() {
665 let cfg = MultiChainConfig {
666 chains: vec![
667 make_config("eth", "ethereum"),
668 make_config("eth", "arbitrum"), ],
670 ..Default::default()
671 };
672 let err = cfg.validate().expect("should report duplicate");
673 assert!(err.contains("duplicate chain id 'eth'"));
674 }
675
676 #[test]
677 fn multichain_config_validate_zero_interval() {
678 let cfg = MultiChainConfig {
679 health_check_interval: Duration::ZERO,
680 ..Default::default()
681 };
682 let err = cfg.validate().expect("should report invalid interval");
683 assert!(err.contains("health_check_interval"));
684 }
685
686 #[tokio::test]
689 async fn coordinator_add_chain() {
690 let coord = make_coordinator(&[]);
691 coord
692 .add_chain(make_config("eth", "ethereum"))
693 .await
694 .unwrap();
695 assert_eq!(coord.chain_count().await, 1);
696 }
697
698 #[tokio::test]
699 async fn coordinator_add_duplicate_chain_errors() {
700 let coord = make_coordinator(&[("eth", "ethereum")]);
701 let err = coord
702 .add_chain(make_config("eth", "ethereum"))
703 .await
704 .unwrap_err();
705 assert!(err.to_string().contains("already registered"));
706 }
707
708 #[tokio::test]
709 async fn coordinator_remove_chain() {
710 let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
711 coord.remove_chain("eth").await.unwrap();
712 assert_eq!(coord.chain_count().await, 1);
713 assert!(coord.chain_state("eth").await.is_none());
714 }
715
716 #[tokio::test]
717 async fn coordinator_remove_missing_chain_errors() {
718 let coord = make_coordinator(&[]);
719 let err = coord.remove_chain("unknown").await.unwrap_err();
720 assert!(err.to_string().contains("not found"));
721 }
722
723 #[tokio::test]
726 async fn coordinator_pause_and_resume() {
727 let coord = make_coordinator(&[("eth", "ethereum")]);
728
729 coord
731 .update_state("eth", IndexerState::Live, None)
732 .await
733 .unwrap();
734 assert!(coord.chain_state("eth").await.unwrap().is_active());
735
736 coord.pause_chain("eth").await.unwrap();
738 let inst = coord.chain_state("eth").await.unwrap();
739 assert_eq!(inst.state, IndexerState::Stopping);
740
741 let err = coord.pause_chain("eth").await.unwrap_err();
743 assert!(err.to_string().contains("not active"));
744
745 coord.resume_chain("eth").await.unwrap();
747 let inst = coord.chain_state("eth").await.unwrap();
748 assert_eq!(inst.state, IndexerState::Backfilling);
749 }
750
751 #[tokio::test]
752 async fn coordinator_resume_already_active_errors() {
753 let coord = make_coordinator(&[("eth", "ethereum")]);
754 coord
755 .update_state("eth", IndexerState::Live, None)
756 .await
757 .unwrap();
758 let err = coord.resume_chain("eth").await.unwrap_err();
759 assert!(err.to_string().contains("already active"));
760 }
761
762 #[tokio::test]
765 async fn health_reflects_state() {
766 let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
767 coord
768 .update_state("eth", IndexerState::Live, None)
769 .await
770 .unwrap();
771 coord
772 .update_state("arb", IndexerState::Error, Some("rpc timeout".into()))
773 .await
774 .unwrap();
775
776 let health = coord.health().await;
777 assert_eq!(health.len(), 2);
778
779 let eth_h = health.iter().find(|h| h.chain == "eth").unwrap();
780 assert!(eth_h.is_healthy);
781 assert_eq!(eth_h.state, IndexerState::Live);
782
783 let arb_h = health.iter().find(|h| h.chain == "arb").unwrap();
784 assert!(!arb_h.is_healthy);
785 assert_eq!(arb_h.last_error.as_deref(), Some("rpc timeout"));
786 }
787
788 #[tokio::test]
789 async fn is_all_healthy_false_when_error() {
790 let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
791 coord
792 .update_state("eth", IndexerState::Live, None)
793 .await
794 .unwrap();
795 coord
796 .update_state("arb", IndexerState::Error, Some("crash".into()))
797 .await
798 .unwrap();
799 assert!(!coord.is_all_healthy().await);
800 }
801
802 #[tokio::test]
803 async fn is_all_healthy_true_when_all_live() {
804 let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
805 coord
806 .update_state("eth", IndexerState::Live, None)
807 .await
808 .unwrap();
809 coord
810 .update_state("arb", IndexerState::Live, None)
811 .await
812 .unwrap();
813 assert!(coord.is_all_healthy().await);
814 }
815
816 #[tokio::test]
819 async fn active_chains_filters_correctly() {
820 let coord =
821 make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum"), ("sol", "solana")]);
822 coord
823 .update_state("eth", IndexerState::Live, None)
824 .await
825 .unwrap();
826 coord
827 .update_state("arb", IndexerState::Backfilling, None)
828 .await
829 .unwrap();
830 let active = coord.active_chains().await;
833 assert_eq!(active.len(), 2);
834 assert!(active.contains(&"eth".to_string()));
835 assert!(active.contains(&"arb".to_string()));
836 assert!(!active.contains(&"sol".to_string()));
837 }
838
839 #[tokio::test]
842 async fn error_state_records_message() {
843 let coord = make_coordinator(&[("eth", "ethereum")]);
844 coord
845 .update_state(
846 "eth",
847 IndexerState::Error,
848 Some("connection refused".into()),
849 )
850 .await
851 .unwrap();
852
853 let inst = coord.chain_state("eth").await.unwrap();
854 assert_eq!(inst.state, IndexerState::Error);
855 assert_eq!(inst.last_error.as_deref(), Some("connection refused"));
856 }
857
858 #[tokio::test]
859 async fn error_cleared_on_resume() {
860 let coord = make_coordinator(&[("eth", "ethereum")]);
861 coord
863 .update_state("eth", IndexerState::Error, Some("boom".into()))
864 .await
865 .unwrap();
866 coord.resume_chain("eth").await.unwrap();
868
869 let inst = coord.chain_state("eth").await.unwrap();
870 assert_eq!(inst.state, IndexerState::Backfilling);
871 assert!(inst.last_error.is_none());
872 }
873
874 #[test]
877 fn chain_instance_state_transitions() {
878 let cfg = make_config("eth", "ethereum");
879 let mut inst = ChainInstance::new(cfg);
880
881 assert_eq!(inst.state, IndexerState::Idle);
882 assert!(!inst.is_active());
883 assert!(!inst.is_error());
884
885 inst.transition(IndexerState::Backfilling, None);
886 assert!(inst.is_active());
887
888 inst.transition(IndexerState::Live, None);
889 assert!(inst.is_active());
890
891 inst.transition(IndexerState::ReorgRecovery, None);
892 assert!(inst.is_active());
893
894 inst.transition(IndexerState::Error, Some("test error".into()));
895 assert!(!inst.is_active());
896 assert!(inst.is_error());
897 assert_eq!(inst.last_error.as_deref(), Some("test error"));
898
899 inst.transition(IndexerState::Backfilling, None);
901 assert!(inst.is_active());
902 assert!(inst.last_error.is_none());
903 }
904
905 #[tokio::test]
908 async fn event_bus_push_and_subscribe() {
909 let bus = CrossChainEventBus::new(64);
910 let mut rx = bus.subscribe();
911
912 let event = dummy_event("ethereum");
913 bus.push("ethereum", event.clone());
914
915 let received = rx.recv().await.unwrap();
916 assert_eq!(received.chain, "ethereum");
917 assert_eq!(received.event.schema, "ERC20Transfer");
918 assert_eq!(received.event.tx_hash, "0xdeadbeef");
919 }
920
921 #[tokio::test]
922 async fn event_bus_multiple_subscribers() {
923 let bus = CrossChainEventBus::new(64);
924 let mut rx1 = bus.subscribe();
925 let mut rx2 = bus.subscribe();
926
927 bus.push("arbitrum", dummy_event("arbitrum"));
928
929 let e1 = rx1.recv().await.unwrap();
930 let e2 = rx2.recv().await.unwrap();
931 assert_eq!(e1.chain, "arbitrum");
932 assert_eq!(e2.chain, "arbitrum");
933 }
934
935 #[tokio::test]
936 async fn event_bus_no_subscribers_does_not_panic() {
937 let bus = CrossChainEventBus::new(16);
938 let count = bus.push("ethereum", dummy_event("ethereum"));
940 assert_eq!(count, 0);
941 }
942
943 #[tokio::test]
944 async fn event_bus_received_at_is_populated() {
945 let bus = CrossChainEventBus::new(16);
946 let mut rx = bus.subscribe();
947 bus.push("ethereum", dummy_event("ethereum"));
948 let ev = rx.recv().await.unwrap();
949 assert!(ev.received_at > 0);
950 }
951
952 #[test]
955 fn sync_status_update_and_query() {
956 let mut status = ChainSyncStatus::new();
957 status.update("ethereum", 1_000_000);
958 status.update("arbitrum", 200_000_000);
959
960 assert_eq!(status.head_of("ethereum"), Some(1_000_000));
961 assert_eq!(status.head_of("arbitrum"), Some(200_000_000));
962 assert_eq!(status.head_of("unknown"), None);
963 }
964
965 #[test]
966 fn sync_status_all_past_block() {
967 let mut status = ChainSyncStatus::new();
968 status.update("eth", 1000);
969 status.update("arb", 2000);
970 status.update("sol", 500);
971
972 assert!(status.all_past_block("", 400));
974 assert!(!status.all_past_block("", 600));
976 }
977
978 #[test]
979 fn sync_status_all_caught_up() {
980 let mut status = ChainSyncStatus::new();
981 status.update("eth", 990);
982 status.update_tip("eth", 1000);
983 status.update("arb", 199_990);
984 status.update_tip("arb", 200_000);
985
986 assert!(status.all_caught_up(20));
988 assert!(!status.all_caught_up(5));
990 }
991
992 #[test]
993 fn sync_status_min_timestamp() {
994 let mut status = ChainSyncStatus::new();
995 status.update_with_timestamp("eth", 1000, 1_700_000_100);
996 status.update_with_timestamp("arb", 2000, 1_700_000_050);
997 status.update_with_timestamp("sol", 3000, 1_700_000_200);
998
999 assert_eq!(status.min_timestamp(), Some(1_700_000_050));
1000 }
1001
1002 #[test]
1003 fn sync_status_min_timestamp_none_when_empty() {
1004 let status = ChainSyncStatus::new();
1005 assert!(status.min_timestamp().is_none());
1006 }
1007
1008 #[test]
1009 fn sync_status_lag_of() {
1010 let mut status = ChainSyncStatus::new();
1011 status.update("eth", 990);
1012 status.update_tip("eth", 1000);
1013
1014 assert_eq!(status.lag_of("eth"), Some(10));
1015 assert_eq!(status.lag_of("unknown"), None);
1016 }
1017
1018 #[test]
1019 fn sync_status_all_caught_up_no_tips_returns_false() {
1020 let mut status = ChainSyncStatus::new();
1021 status.update("eth", 1000);
1022 assert!(!status.all_caught_up(10));
1024 }
1025
1026 #[test]
1027 fn sync_status_is_empty() {
1028 let status = ChainSyncStatus::new();
1029 assert!(status.is_empty());
1030 let mut status = ChainSyncStatus::new();
1031 status.update("eth", 0);
1032 assert!(!status.is_empty());
1033 assert_eq!(status.len(), 1);
1034 }
1035}