Skip to main content

chainindex_core/
multichain.rs

1//! Multi-chain indexer coordinator.
2//!
3//! Manages multiple [`IndexerConfig`] instances — one per chain — from a single
4//! engine. The coordinator tracks runtime state for each chain, provides health
5//! reporting, and aggregates events from all chains onto a shared broadcast bus.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use chainindex_core::multichain::{MultiChainConfig, MultiChainCoordinator, CrossChainEventBus};
11//! use chainindex_core::indexer::IndexerConfig;
12//! use std::time::Duration;
13//!
14//! let eth_cfg = IndexerConfig { id: "eth-main".into(), chain: "ethereum".into(), ..Default::default() };
15//! let arb_cfg = IndexerConfig { id: "arb-main".into(), chain: "arbitrum".into(), ..Default::default() };
16//!
17//! let config = MultiChainConfig {
18//!     chains: vec![eth_cfg, arb_cfg],
19//!     max_concurrent_chains: 4,
20//!     health_check_interval: Duration::from_secs(30),
21//!     restart_on_error: true,
22//!     restart_delay: Duration::from_secs(5),
23//! };
24//!
25//! let coordinator = MultiChainCoordinator::new(config);
26//! let bus = CrossChainEventBus::new(1024);
27//! let mut rx = bus.subscribe();
28//! ```
29
30use std::collections::HashMap;
31use std::time::{Duration, Instant};
32
33use serde::{Deserialize, Serialize};
34use std::sync::Arc;
35use tokio::sync::broadcast;
36use tokio::sync::RwLock;
37
38use crate::error::IndexerError;
39use crate::handler::DecodedEvent;
40use crate::indexer::{IndexerConfig, IndexerState};
41
42// ─── ChainInstance ────────────────────────────────────────────────────────────
43
44/// Runtime state for a single chain managed by the coordinator.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ChainInstance {
47    /// The configuration driving this chain.
48    pub config: IndexerConfig,
49    /// Current runtime state of this chain's indexer.
50    pub state: IndexerState,
51    /// Latest block number that has been fully processed.
52    pub head_block: u64,
53    /// Total number of events emitted so far by this chain.
54    pub events_processed: u64,
55    /// The last error message if `state == IndexerState::Error`.
56    pub last_error: Option<String>,
57    /// Unix timestamp (seconds) when the chain was started, if it has been.
58    pub started_at: Option<i64>,
59}
60
61impl ChainInstance {
62    /// Construct a new idle instance from a config.
63    pub fn new(config: IndexerConfig) -> Self {
64        Self {
65            config,
66            state: IndexerState::Idle,
67            head_block: 0,
68            events_processed: 0,
69            last_error: None,
70            started_at: None,
71        }
72    }
73
74    /// Returns `true` when the chain is actively making progress.
75    pub fn is_active(&self) -> bool {
76        matches!(
77            self.state,
78            IndexerState::Backfilling | IndexerState::Live | IndexerState::ReorgRecovery
79        )
80    }
81
82    /// Returns `true` when the chain is in an error state.
83    pub fn is_error(&self) -> bool {
84        matches!(self.state, IndexerState::Error)
85    }
86
87    /// Transition to a new state, recording errors if appropriate.
88    pub fn transition(&mut self, new_state: IndexerState, error: Option<String>) {
89        self.state = new_state;
90        if new_state == IndexerState::Error {
91            self.last_error = error;
92        } else if error.is_none() && !matches!(new_state, IndexerState::Error) {
93            // Clear error when transitioning away from error state
94            self.last_error = None;
95        }
96    }
97}
98
99// ─── MultiChainConfig ─────────────────────────────────────────────────────────
100
101/// Configuration for the multi-chain coordinator.
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct MultiChainConfig {
104    /// One [`IndexerConfig`] per chain to manage.
105    pub chains: Vec<IndexerConfig>,
106    /// Maximum number of chains that may run concurrently. `0` means unlimited
107    /// (all chains run simultaneously).
108    pub max_concurrent_chains: usize,
109    /// How often the coordinator evaluates chain health. Default: 30 s.
110    pub health_check_interval: Duration,
111    /// If `true`, a chain that enters [`IndexerState::Error`] is automatically
112    /// restarted after `restart_delay`. Default: `true`.
113    pub restart_on_error: bool,
114    /// Delay before automatically restarting a failed chain. Default: 5 s.
115    pub restart_delay: Duration,
116}
117
118impl Default for MultiChainConfig {
119    fn default() -> Self {
120        Self {
121            chains: vec![],
122            max_concurrent_chains: 0, // unlimited
123            health_check_interval: Duration::from_secs(30),
124            restart_on_error: true,
125            restart_delay: Duration::from_secs(5),
126        }
127    }
128}
129
130impl MultiChainConfig {
131    /// Returns an error string if the configuration is invalid, otherwise `None`.
132    pub fn validate(&self) -> Option<String> {
133        if self.health_check_interval.is_zero() {
134            return Some("health_check_interval must be non-zero".into());
135        }
136        // Check for duplicate chain IDs.
137        let mut seen = std::collections::HashSet::new();
138        for cfg in &self.chains {
139            if !seen.insert(&cfg.id) {
140                return Some(format!("duplicate chain id '{}'", cfg.id));
141            }
142        }
143        None
144    }
145}
146
147// ─── ChainHealth ─────────────────────────────────────────────────────────────
148
149/// Health snapshot for a single chain instance.
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct ChainHealth {
152    /// Chain identifier matching [`IndexerConfig::id`].
153    pub chain: String,
154    /// Current indexer state.
155    pub state: IndexerState,
156    /// Latest processed block number.
157    pub head_block: u64,
158    /// Total events processed since start.
159    pub events_processed: u64,
160    /// Approximate blocks behind the chain tip (0 when caught up or unknown).
161    pub block_lag: u64,
162    /// How long the chain has been running.
163    pub uptime: Duration,
164    /// Last error message, if any.
165    pub last_error: Option<String>,
166    /// `true` when the chain is actively running without any error.
167    pub is_healthy: bool,
168}
169
170impl ChainHealth {
171    fn from_instance(instance: &ChainInstance, now_secs: i64) -> Self {
172        let uptime = instance
173            .started_at
174            .map(|s| Duration::from_secs(now_secs.saturating_sub(s).max(0) as u64))
175            .unwrap_or(Duration::ZERO);
176
177        let is_healthy = instance.is_active() && instance.last_error.is_none();
178
179        Self {
180            chain: instance.config.id.clone(),
181            state: instance.state,
182            head_block: instance.head_block,
183            events_processed: instance.events_processed,
184            block_lag: 0, // populated externally when tip info is available
185            uptime,
186            last_error: instance.last_error.clone(),
187            is_healthy,
188        }
189    }
190}
191
192// ─── MultiChainCoordinator ───────────────────────────────────────────────────
193
194/// Tracks and coordinates multiple chain indexer instances.
195///
196/// This is a **state-management** layer: it stores the runtime state of each
197/// chain and provides query/mutation primitives. The actual indexer tasks
198/// (which require an RPC provider) are started by the caller using the config
199/// returned from [`chain_state`].
200pub struct MultiChainCoordinator {
201    config: MultiChainConfig,
202    /// Keyed by [`IndexerConfig::id`].
203    instances: RwLock<HashMap<String, ChainInstance>>,
204    /// Wall-clock start time, used to compute uptimes.
205    started: Instant,
206}
207
208impl MultiChainCoordinator {
209    /// Create a new coordinator from the given config.
210    ///
211    /// All chains in `config.chains` are registered in the [`IndexerState::Idle`]
212    /// state — they are not started automatically.
213    pub fn new(config: MultiChainConfig) -> Self {
214        let mut instances = HashMap::new();
215        for chain_cfg in &config.chains {
216            instances.insert(chain_cfg.id.clone(), ChainInstance::new(chain_cfg.clone()));
217        }
218        Self {
219            config,
220            instances: RwLock::new(instances),
221            started: Instant::now(),
222        }
223    }
224
225    // ── Chain lifecycle ────────────────────────────────────────────────────
226
227    /// Register a new chain at runtime.
228    ///
229    /// Returns an error if a chain with the same `id` already exists.
230    pub async fn add_chain(&self, config: IndexerConfig) -> Result<(), IndexerError> {
231        let mut guard = self.instances.write().await;
232        if guard.contains_key(&config.id) {
233            return Err(IndexerError::Other(format!(
234                "chain '{}' already registered",
235                config.id
236            )));
237        }
238        guard.insert(config.id.clone(), ChainInstance::new(config));
239        Ok(())
240    }
241
242    /// Remove a chain from the coordinator.
243    ///
244    /// The caller is responsible for ensuring the underlying task is stopped
245    /// before calling this method. Returns an error if the chain is not found.
246    pub async fn remove_chain(&self, chain_id: &str) -> Result<(), IndexerError> {
247        let mut guard = self.instances.write().await;
248        if guard.remove(chain_id).is_none() {
249            return Err(IndexerError::Other(format!(
250                "chain '{}' not found",
251                chain_id
252            )));
253        }
254        Ok(())
255    }
256
257    /// Pause a chain by transitioning it to [`IndexerState::Stopping`].
258    ///
259    /// Only chains in an active state (Backfilling, Live, ReorgRecovery) can
260    /// be paused. Returns an error if the chain is not found or not active.
261    pub async fn pause_chain(&self, chain_id: &str) -> Result<(), IndexerError> {
262        let mut guard = self.instances.write().await;
263        let instance = guard
264            .get_mut(chain_id)
265            .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
266        if !instance.is_active() {
267            return Err(IndexerError::Other(format!(
268                "chain '{}' is not active (state: {})",
269                chain_id, instance.state
270            )));
271        }
272        instance.transition(IndexerState::Stopping, None);
273        tracing::info!(chain = %chain_id, "pausing chain");
274        Ok(())
275    }
276
277    /// Resume a paused or stopped chain by transitioning it back to
278    /// [`IndexerState::Backfilling`].
279    ///
280    /// Returns an error if the chain is not found or is already active.
281    pub async fn resume_chain(&self, chain_id: &str) -> Result<(), IndexerError> {
282        let mut guard = self.instances.write().await;
283        let instance = guard
284            .get_mut(chain_id)
285            .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
286        if instance.is_active() {
287            return Err(IndexerError::Other(format!(
288                "chain '{}' is already active (state: {})",
289                chain_id, instance.state
290            )));
291        }
292        instance.transition(IndexerState::Backfilling, None);
293        if instance.started_at.is_none() {
294            instance.started_at = Some(chrono::Utc::now().timestamp());
295        }
296        tracing::info!(chain = %chain_id, "resuming chain");
297        Ok(())
298    }
299
300    // ── State mutations ────────────────────────────────────────────────────
301
302    /// Update the state of a chain. Called by the underlying indexer task.
303    pub async fn update_state(
304        &self,
305        chain_id: &str,
306        new_state: IndexerState,
307        error: Option<String>,
308    ) -> Result<(), IndexerError> {
309        let mut guard = self.instances.write().await;
310        let instance = guard
311            .get_mut(chain_id)
312            .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
313        if (new_state == IndexerState::Backfilling || new_state == IndexerState::Live)
314            && instance.started_at.is_none()
315        {
316            instance.started_at = Some(chrono::Utc::now().timestamp());
317        }
318        instance.transition(new_state, error);
319        Ok(())
320    }
321
322    /// Record a new block processed by a chain.
323    pub async fn record_block(
324        &self,
325        chain_id: &str,
326        block_number: u64,
327        events: u64,
328    ) -> Result<(), IndexerError> {
329        let mut guard = self.instances.write().await;
330        let instance = guard
331            .get_mut(chain_id)
332            .ok_or_else(|| IndexerError::Other(format!("chain '{}' not found", chain_id)))?;
333        if block_number > instance.head_block {
334            instance.head_block = block_number;
335        }
336        instance.events_processed += events;
337        Ok(())
338    }
339
340    // ── Queries ────────────────────────────────────────────────────────────
341
342    /// Returns health snapshots for every registered chain.
343    pub async fn health(&self) -> Vec<ChainHealth> {
344        let guard = self.instances.read().await;
345        let now = chrono::Utc::now().timestamp();
346        guard
347            .values()
348            .map(|inst| ChainHealth::from_instance(inst, now))
349            .collect()
350    }
351
352    /// Returns health for a specific chain by id.
353    pub async fn chain_health(&self, chain_id: &str) -> Option<ChainHealth> {
354        let guard = self.instances.read().await;
355        let now = chrono::Utc::now().timestamp();
356        guard
357            .get(chain_id)
358            .map(|inst| ChainHealth::from_instance(inst, now))
359    }
360
361    /// Returns a clone of the runtime state for a single chain.
362    pub async fn chain_state(&self, chain_id: &str) -> Option<ChainInstance> {
363        let guard = self.instances.read().await;
364        guard.get(chain_id).cloned()
365    }
366
367    /// Returns the ids of all registered chains.
368    pub async fn chains(&self) -> Vec<String> {
369        let guard = self.instances.read().await;
370        guard.keys().cloned().collect()
371    }
372
373    /// Returns the ids of all chains that are actively indexing.
374    pub async fn active_chains(&self) -> Vec<String> {
375        let guard = self.instances.read().await;
376        guard
377            .iter()
378            .filter(|(_, inst)| inst.is_active())
379            .map(|(id, _)| id.clone())
380            .collect()
381    }
382
383    /// Returns `true` when every registered chain is healthy (active, no error).
384    pub async fn is_all_healthy(&self) -> bool {
385        let guard = self.instances.read().await;
386        guard
387            .values()
388            .all(|inst| inst.is_active() && inst.last_error.is_none())
389    }
390
391    /// Returns `true` when every registered chain has reached at least
392    /// `min_block`.
393    pub async fn all_past_block(&self, min_block: u64) -> bool {
394        let guard = self.instances.read().await;
395        guard.values().all(|inst| inst.head_block >= min_block)
396    }
397
398    /// Returns the number of registered chains.
399    pub async fn chain_count(&self) -> usize {
400        self.instances.read().await.len()
401    }
402
403    /// Returns the coordinator config.
404    pub fn config(&self) -> &MultiChainConfig {
405        &self.config
406    }
407
408    /// Returns how long the coordinator has been running.
409    pub fn uptime(&self) -> Duration {
410        self.started.elapsed()
411    }
412}
413
414// ─── CrossChainEvent ─────────────────────────────────────────────────────────
415
416/// An event received from any of the managed chains.
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrossChainEvent {
419    /// Chain identifier that produced this event.
420    pub chain: String,
421    /// The decoded event payload.
422    pub event: DecodedEvent,
423    /// Unix timestamp (seconds) at which the event was received by the bus.
424    pub received_at: i64,
425}
426
427/// Type alias for the receive half of a [`CrossChainEventBus`] subscription.
428pub type CrossChainReceiver = broadcast::Receiver<CrossChainEvent>;
429
430// ─── CrossChainEventBus ──────────────────────────────────────────────────────
431
432/// Fan-out broadcast bus that aggregates events from all managed chains.
433///
434/// Multiple subscribers can each receive every event via
435/// `tokio::sync::broadcast`. If a subscriber falls behind and the buffer
436/// fills, lagged events are dropped for that subscriber (the channel returns
437/// [`broadcast::error::RecvError::Lagged`]).
438#[derive(Clone)]
439pub struct CrossChainEventBus {
440    sender: broadcast::Sender<CrossChainEvent>,
441}
442
443impl CrossChainEventBus {
444    /// Create a new bus with the given channel capacity.
445    ///
446    /// `capacity` is the maximum number of events buffered per subscriber
447    /// before the oldest events are dropped. A value of 1024 is a reasonable
448    /// default for most use cases.
449    pub fn new(capacity: usize) -> Self {
450        let (sender, _) = broadcast::channel(capacity);
451        Self { sender }
452    }
453
454    /// Push an event from `chain` onto the bus.
455    ///
456    /// Returns the number of active subscribers that received the event.
457    /// If there are no subscribers, the event is silently discarded.
458    pub fn push(&self, chain: &str, event: DecodedEvent) -> usize {
459        let cross = CrossChainEvent {
460            chain: chain.to_string(),
461            event,
462            received_at: chrono::Utc::now().timestamp(),
463        };
464        // `send` only fails when there are no receivers — that is fine.
465        self.sender.send(cross).unwrap_or(0)
466    }
467
468    /// Subscribe to the event bus.
469    ///
470    /// Each subscriber receives a clone of every event pushed after the
471    /// subscription is created.
472    pub fn subscribe(&self) -> CrossChainReceiver {
473        self.sender.subscribe()
474    }
475
476    /// Returns the number of active subscribers.
477    pub fn subscriber_count(&self) -> usize {
478        self.sender.receiver_count()
479    }
480}
481
482// ─── ChainSyncStatus ─────────────────────────────────────────────────────────
483
484/// Cross-chain synchronization tracker.
485///
486/// Stores the current head block for each chain so callers can ask questions
487/// like "have all chains passed block N?" or "are all chains within K blocks
488/// of their respective tips?"
489#[derive(Debug, Clone, Default, Serialize, Deserialize)]
490pub struct ChainSyncStatus {
491    /// Maps chain id → latest known head block.
492    pub chains: HashMap<String, u64>,
493    /// Optional tip (chain head) for each chain, used by `all_caught_up`.
494    tips: HashMap<String, u64>,
495    /// Stores the last known block timestamp per chain (unix seconds).
496    timestamps: HashMap<String, i64>,
497}
498
499impl ChainSyncStatus {
500    /// Create an empty sync status tracker.
501    pub fn new() -> Self {
502        Self::default()
503    }
504
505    /// Update the head block for a chain.
506    pub fn update(&mut self, chain: &str, head: u64) {
507        self.chains.insert(chain.to_string(), head);
508    }
509
510    /// Update both the head block and its timestamp for a chain.
511    pub fn update_with_timestamp(&mut self, chain: &str, head: u64, timestamp: i64) {
512        self.chains.insert(chain.to_string(), head);
513        self.timestamps.insert(chain.to_string(), timestamp);
514    }
515
516    /// Update the known chain tip (latest block on the network) for a chain.
517    pub fn update_tip(&mut self, chain: &str, tip: u64) {
518        self.tips.insert(chain.to_string(), tip);
519    }
520
521    /// Returns the earliest (minimum) timestamp across all chains with a
522    /// recorded timestamp. Returns `None` if no timestamps are recorded.
523    pub fn min_timestamp(&self) -> Option<i64> {
524        self.timestamps.values().copied().reduce(i64::min)
525    }
526
527    /// Returns the latest (maximum) timestamp across all chains.
528    pub fn max_timestamp(&self) -> Option<i64> {
529        self.timestamps.values().copied().reduce(i64::max)
530    }
531
532    /// Returns `true` if every registered chain has processed past `block`.
533    ///
534    /// Returns `false` if there are no chains registered.
535    pub fn all_past_block(&self, _chain: &str, block: u64) -> bool {
536        if self.chains.is_empty() {
537            return false;
538        }
539        self.chains.values().all(|&head| head >= block)
540    }
541
542    /// Returns `true` if all chains are within `threshold_blocks` of their
543    /// recorded tips.
544    ///
545    /// Returns `false` when no chains have tip information recorded.
546    pub fn all_caught_up(&self, threshold_blocks: u64) -> bool {
547        if self.tips.is_empty() {
548            return false;
549        }
550        for (chain, &tip) in &self.tips {
551            let head = self.chains.get(chain).copied().unwrap_or(0);
552            if tip.saturating_sub(head) > threshold_blocks {
553                return false;
554            }
555        }
556        true
557    }
558
559    /// Returns the head block for a specific chain.
560    pub fn head_of(&self, chain: &str) -> Option<u64> {
561        self.chains.get(chain).copied()
562    }
563
564    /// Returns the lag (tip - head) for a specific chain. `None` if the chain
565    /// or its tip is not recorded.
566    pub fn lag_of(&self, chain: &str) -> Option<u64> {
567        let head = self.chains.get(chain).copied()?;
568        let tip = self.tips.get(chain).copied()?;
569        Some(tip.saturating_sub(head))
570    }
571
572    /// Returns the number of chains being tracked.
573    pub fn len(&self) -> usize {
574        self.chains.len()
575    }
576
577    /// Returns `true` when no chains are tracked.
578    pub fn is_empty(&self) -> bool {
579        self.chains.is_empty()
580    }
581
582    /// Build a [`ChainSyncStatus`] snapshot from a [`MultiChainCoordinator`].
583    pub async fn from_coordinator(coordinator: &Arc<MultiChainCoordinator>) -> Self {
584        let guard = coordinator.instances.read().await;
585        let mut status = Self::new();
586        for (id, inst) in guard.iter() {
587            status.update(id, inst.head_block);
588        }
589        status
590    }
591}
592
593// ─── Tests ────────────────────────────────────────────────────────────────────
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::types::EventFilter;
599
600    // ── helpers ───────────────────────────────────────────────────────────
601
602    fn make_config(id: &str, chain: &str) -> IndexerConfig {
603        IndexerConfig {
604            id: id.into(),
605            chain: chain.into(),
606            from_block: 0,
607            to_block: None,
608            confirmation_depth: 12,
609            batch_size: 1000,
610            checkpoint_interval: 100,
611            poll_interval_ms: 2000,
612            filter: EventFilter::default(),
613        }
614    }
615
616    fn make_coordinator(ids: &[(&str, &str)]) -> MultiChainCoordinator {
617        let chains: Vec<IndexerConfig> = ids
618            .iter()
619            .map(|(id, chain)| make_config(id, chain))
620            .collect();
621        MultiChainCoordinator::new(MultiChainConfig {
622            chains,
623            ..Default::default()
624        })
625    }
626
627    fn dummy_event(chain: &str) -> DecodedEvent {
628        DecodedEvent {
629            chain: chain.into(),
630            schema: "ERC20Transfer".into(),
631            address: "0xabc".into(),
632            tx_hash: "0xdeadbeef".into(),
633            block_number: 100,
634            log_index: 0,
635            fields_json: serde_json::json!({"from": "0x1", "to": "0x2", "value": "1000"}),
636        }
637    }
638
639    // ── MultiChainConfig ──────────────────────────────────────────────────
640
641    #[test]
642    fn multichain_config_defaults() {
643        let cfg = MultiChainConfig::default();
644        assert!(cfg.chains.is_empty());
645        assert_eq!(cfg.max_concurrent_chains, 0);
646        assert_eq!(cfg.health_check_interval, Duration::from_secs(30));
647        assert!(cfg.restart_on_error);
648        assert_eq!(cfg.restart_delay, Duration::from_secs(5));
649    }
650
651    #[test]
652    fn multichain_config_validate_ok() {
653        let cfg = MultiChainConfig {
654            chains: vec![
655                make_config("eth", "ethereum"),
656                make_config("arb", "arbitrum"),
657            ],
658            ..Default::default()
659        };
660        assert!(cfg.validate().is_none());
661    }
662
663    #[test]
664    fn multichain_config_validate_duplicate_id() {
665        let cfg = MultiChainConfig {
666            chains: vec![
667                make_config("eth", "ethereum"),
668                make_config("eth", "arbitrum"), // duplicate id
669            ],
670            ..Default::default()
671        };
672        let err = cfg.validate().expect("should report duplicate");
673        assert!(err.contains("duplicate chain id 'eth'"));
674    }
675
676    #[test]
677    fn multichain_config_validate_zero_interval() {
678        let cfg = MultiChainConfig {
679            health_check_interval: Duration::ZERO,
680            ..Default::default()
681        };
682        let err = cfg.validate().expect("should report invalid interval");
683        assert!(err.contains("health_check_interval"));
684    }
685
686    // ── Coordinator add/remove ────────────────────────────────────────────
687
688    #[tokio::test]
689    async fn coordinator_add_chain() {
690        let coord = make_coordinator(&[]);
691        coord
692            .add_chain(make_config("eth", "ethereum"))
693            .await
694            .unwrap();
695        assert_eq!(coord.chain_count().await, 1);
696    }
697
698    #[tokio::test]
699    async fn coordinator_add_duplicate_chain_errors() {
700        let coord = make_coordinator(&[("eth", "ethereum")]);
701        let err = coord
702            .add_chain(make_config("eth", "ethereum"))
703            .await
704            .unwrap_err();
705        assert!(err.to_string().contains("already registered"));
706    }
707
708    #[tokio::test]
709    async fn coordinator_remove_chain() {
710        let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
711        coord.remove_chain("eth").await.unwrap();
712        assert_eq!(coord.chain_count().await, 1);
713        assert!(coord.chain_state("eth").await.is_none());
714    }
715
716    #[tokio::test]
717    async fn coordinator_remove_missing_chain_errors() {
718        let coord = make_coordinator(&[]);
719        let err = coord.remove_chain("unknown").await.unwrap_err();
720        assert!(err.to_string().contains("not found"));
721    }
722
723    // ── Pause / resume ────────────────────────────────────────────────────
724
725    #[tokio::test]
726    async fn coordinator_pause_and_resume() {
727        let coord = make_coordinator(&[("eth", "ethereum")]);
728
729        // Transition to Live so we can pause
730        coord
731            .update_state("eth", IndexerState::Live, None)
732            .await
733            .unwrap();
734        assert!(coord.chain_state("eth").await.unwrap().is_active());
735
736        // Pause
737        coord.pause_chain("eth").await.unwrap();
738        let inst = coord.chain_state("eth").await.unwrap();
739        assert_eq!(inst.state, IndexerState::Stopping);
740
741        // Cannot pause again (not active)
742        let err = coord.pause_chain("eth").await.unwrap_err();
743        assert!(err.to_string().contains("not active"));
744
745        // Resume
746        coord.resume_chain("eth").await.unwrap();
747        let inst = coord.chain_state("eth").await.unwrap();
748        assert_eq!(inst.state, IndexerState::Backfilling);
749    }
750
751    #[tokio::test]
752    async fn coordinator_resume_already_active_errors() {
753        let coord = make_coordinator(&[("eth", "ethereum")]);
754        coord
755            .update_state("eth", IndexerState::Live, None)
756            .await
757            .unwrap();
758        let err = coord.resume_chain("eth").await.unwrap_err();
759        assert!(err.to_string().contains("already active"));
760    }
761
762    // ── Health reporting ──────────────────────────────────────────────────
763
764    #[tokio::test]
765    async fn health_reflects_state() {
766        let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
767        coord
768            .update_state("eth", IndexerState::Live, None)
769            .await
770            .unwrap();
771        coord
772            .update_state("arb", IndexerState::Error, Some("rpc timeout".into()))
773            .await
774            .unwrap();
775
776        let health = coord.health().await;
777        assert_eq!(health.len(), 2);
778
779        let eth_h = health.iter().find(|h| h.chain == "eth").unwrap();
780        assert!(eth_h.is_healthy);
781        assert_eq!(eth_h.state, IndexerState::Live);
782
783        let arb_h = health.iter().find(|h| h.chain == "arb").unwrap();
784        assert!(!arb_h.is_healthy);
785        assert_eq!(arb_h.last_error.as_deref(), Some("rpc timeout"));
786    }
787
788    #[tokio::test]
789    async fn is_all_healthy_false_when_error() {
790        let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
791        coord
792            .update_state("eth", IndexerState::Live, None)
793            .await
794            .unwrap();
795        coord
796            .update_state("arb", IndexerState::Error, Some("crash".into()))
797            .await
798            .unwrap();
799        assert!(!coord.is_all_healthy().await);
800    }
801
802    #[tokio::test]
803    async fn is_all_healthy_true_when_all_live() {
804        let coord = make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum")]);
805        coord
806            .update_state("eth", IndexerState::Live, None)
807            .await
808            .unwrap();
809        coord
810            .update_state("arb", IndexerState::Live, None)
811            .await
812            .unwrap();
813        assert!(coord.is_all_healthy().await);
814    }
815
816    // ── Active chains listing ─────────────────────────────────────────────
817
818    #[tokio::test]
819    async fn active_chains_filters_correctly() {
820        let coord =
821            make_coordinator(&[("eth", "ethereum"), ("arb", "arbitrum"), ("sol", "solana")]);
822        coord
823            .update_state("eth", IndexerState::Live, None)
824            .await
825            .unwrap();
826        coord
827            .update_state("arb", IndexerState::Backfilling, None)
828            .await
829            .unwrap();
830        // sol stays Idle
831
832        let active = coord.active_chains().await;
833        assert_eq!(active.len(), 2);
834        assert!(active.contains(&"eth".to_string()));
835        assert!(active.contains(&"arb".to_string()));
836        assert!(!active.contains(&"sol".to_string()));
837    }
838
839    // ── Error state handling ──────────────────────────────────────────────
840
841    #[tokio::test]
842    async fn error_state_records_message() {
843        let coord = make_coordinator(&[("eth", "ethereum")]);
844        coord
845            .update_state(
846                "eth",
847                IndexerState::Error,
848                Some("connection refused".into()),
849            )
850            .await
851            .unwrap();
852
853        let inst = coord.chain_state("eth").await.unwrap();
854        assert_eq!(inst.state, IndexerState::Error);
855        assert_eq!(inst.last_error.as_deref(), Some("connection refused"));
856    }
857
858    #[tokio::test]
859    async fn error_cleared_on_resume() {
860        let coord = make_coordinator(&[("eth", "ethereum")]);
861        // Set error state
862        coord
863            .update_state("eth", IndexerState::Error, Some("boom".into()))
864            .await
865            .unwrap();
866        // Resume (transitions to Backfilling, clears error)
867        coord.resume_chain("eth").await.unwrap();
868
869        let inst = coord.chain_state("eth").await.unwrap();
870        assert_eq!(inst.state, IndexerState::Backfilling);
871        assert!(inst.last_error.is_none());
872    }
873
874    // ── ChainInstance state transitions ───────────────────────────────────
875
876    #[test]
877    fn chain_instance_state_transitions() {
878        let cfg = make_config("eth", "ethereum");
879        let mut inst = ChainInstance::new(cfg);
880
881        assert_eq!(inst.state, IndexerState::Idle);
882        assert!(!inst.is_active());
883        assert!(!inst.is_error());
884
885        inst.transition(IndexerState::Backfilling, None);
886        assert!(inst.is_active());
887
888        inst.transition(IndexerState::Live, None);
889        assert!(inst.is_active());
890
891        inst.transition(IndexerState::ReorgRecovery, None);
892        assert!(inst.is_active());
893
894        inst.transition(IndexerState::Error, Some("test error".into()));
895        assert!(!inst.is_active());
896        assert!(inst.is_error());
897        assert_eq!(inst.last_error.as_deref(), Some("test error"));
898
899        // Transitioning back to Backfilling clears error
900        inst.transition(IndexerState::Backfilling, None);
901        assert!(inst.is_active());
902        assert!(inst.last_error.is_none());
903    }
904
905    // ── Cross-chain event bus ─────────────────────────────────────────────
906
907    #[tokio::test]
908    async fn event_bus_push_and_subscribe() {
909        let bus = CrossChainEventBus::new(64);
910        let mut rx = bus.subscribe();
911
912        let event = dummy_event("ethereum");
913        bus.push("ethereum", event.clone());
914
915        let received = rx.recv().await.unwrap();
916        assert_eq!(received.chain, "ethereum");
917        assert_eq!(received.event.schema, "ERC20Transfer");
918        assert_eq!(received.event.tx_hash, "0xdeadbeef");
919    }
920
921    #[tokio::test]
922    async fn event_bus_multiple_subscribers() {
923        let bus = CrossChainEventBus::new(64);
924        let mut rx1 = bus.subscribe();
925        let mut rx2 = bus.subscribe();
926
927        bus.push("arbitrum", dummy_event("arbitrum"));
928
929        let e1 = rx1.recv().await.unwrap();
930        let e2 = rx2.recv().await.unwrap();
931        assert_eq!(e1.chain, "arbitrum");
932        assert_eq!(e2.chain, "arbitrum");
933    }
934
935    #[tokio::test]
936    async fn event_bus_no_subscribers_does_not_panic() {
937        let bus = CrossChainEventBus::new(16);
938        // push with no subscribers — should silently succeed
939        let count = bus.push("ethereum", dummy_event("ethereum"));
940        assert_eq!(count, 0);
941    }
942
943    #[tokio::test]
944    async fn event_bus_received_at_is_populated() {
945        let bus = CrossChainEventBus::new(16);
946        let mut rx = bus.subscribe();
947        bus.push("ethereum", dummy_event("ethereum"));
948        let ev = rx.recv().await.unwrap();
949        assert!(ev.received_at > 0);
950    }
951
952    // ── ChainSyncStatus ───────────────────────────────────────────────────
953
954    #[test]
955    fn sync_status_update_and_query() {
956        let mut status = ChainSyncStatus::new();
957        status.update("ethereum", 1_000_000);
958        status.update("arbitrum", 200_000_000);
959
960        assert_eq!(status.head_of("ethereum"), Some(1_000_000));
961        assert_eq!(status.head_of("arbitrum"), Some(200_000_000));
962        assert_eq!(status.head_of("unknown"), None);
963    }
964
965    #[test]
966    fn sync_status_all_past_block() {
967        let mut status = ChainSyncStatus::new();
968        status.update("eth", 1000);
969        status.update("arb", 2000);
970        status.update("sol", 500);
971
972        // All past 400 → true
973        assert!(status.all_past_block("", 400));
974        // sol is at 500, not past 600 → false
975        assert!(!status.all_past_block("", 600));
976    }
977
978    #[test]
979    fn sync_status_all_caught_up() {
980        let mut status = ChainSyncStatus::new();
981        status.update("eth", 990);
982        status.update_tip("eth", 1000);
983        status.update("arb", 199_990);
984        status.update_tip("arb", 200_000);
985
986        // Both within 20 blocks → caught up with threshold 20
987        assert!(status.all_caught_up(20));
988        // Not within 5 blocks (lag=10 and lag=10 > 5)
989        assert!(!status.all_caught_up(5));
990    }
991
992    #[test]
993    fn sync_status_min_timestamp() {
994        let mut status = ChainSyncStatus::new();
995        status.update_with_timestamp("eth", 1000, 1_700_000_100);
996        status.update_with_timestamp("arb", 2000, 1_700_000_050);
997        status.update_with_timestamp("sol", 3000, 1_700_000_200);
998
999        assert_eq!(status.min_timestamp(), Some(1_700_000_050));
1000    }
1001
1002    #[test]
1003    fn sync_status_min_timestamp_none_when_empty() {
1004        let status = ChainSyncStatus::new();
1005        assert!(status.min_timestamp().is_none());
1006    }
1007
1008    #[test]
1009    fn sync_status_lag_of() {
1010        let mut status = ChainSyncStatus::new();
1011        status.update("eth", 990);
1012        status.update_tip("eth", 1000);
1013
1014        assert_eq!(status.lag_of("eth"), Some(10));
1015        assert_eq!(status.lag_of("unknown"), None);
1016    }
1017
1018    #[test]
1019    fn sync_status_all_caught_up_no_tips_returns_false() {
1020        let mut status = ChainSyncStatus::new();
1021        status.update("eth", 1000);
1022        // No tips recorded
1023        assert!(!status.all_caught_up(10));
1024    }
1025
1026    #[test]
1027    fn sync_status_is_empty() {
1028        let status = ChainSyncStatus::new();
1029        assert!(status.is_empty());
1030        let mut status = ChainSyncStatus::new();
1031        status.update("eth", 0);
1032        assert!(!status.is_empty());
1033        assert_eq!(status.len(), 1);
1034    }
1035}