Skip to main content

chainindex_core/
hotreload.rs

1//! Hot-reload configuration system for the chainindex engine.
2//!
3//! Allows indexer configs and handler registrations to be updated at runtime
4//! without restarting the indexer process.
5//!
6//! # Overview
7//!
8//! ```text
9//! HotReloadManager
10//!   ├── register_config(id, config)  → Arc<RwLock<ReloadableConfig>>
11//!   ├── update_config(id, new)       → ReloadResult { diffs, warnings, version }
12//!   ├── subscribe(id)                → watch::Receiver<u64>   (version bump)
13//!   └── history(id)                  → Vec<ReloadRecord>
14//!
15//! ConfigWatcher  — polls a source on a fixed interval, fires callbacks on change
16//! FilterReloader — fine-grained add/remove for EventFilter addresses & topic0s
17//! ```
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Duration;
22
23use serde::{Deserialize, Serialize};
24use tokio::sync::{watch, RwLock};
25use tracing::{debug, info, warn};
26
27use crate::error::IndexerError;
28use crate::indexer::IndexerConfig;
29use crate::types::EventFilter;
30
31// ─── ConfigSource ─────────────────────────────────────────────────────────────
32
33/// Where a configuration value originated.
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub enum ConfigSource {
36    /// Built-in defaults — no explicit configuration was provided.
37    Default,
38    /// Loaded from a file at the given path.
39    File(String),
40    /// Derived from environment variables.
41    Environment,
42    /// Pushed via an API call (e.g. HTTP control plane).
43    Api,
44    /// Set directly in code or via a test.
45    Manual,
46}
47
48impl std::fmt::Display for ConfigSource {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::Default => write!(f, "default"),
52            Self::File(p) => write!(f, "file:{p}"),
53            Self::Environment => write!(f, "environment"),
54            Self::Api => write!(f, "api"),
55            Self::Manual => write!(f, "manual"),
56        }
57    }
58}
59
60// ─── ReloadableConfig ─────────────────────────────────────────────────────────
61
62/// Wraps any config `T` with version tracking and provenance metadata.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ReloadableConfig<T> {
65    /// The actual configuration value.
66    pub inner: T,
67    /// Monotonically increasing version number.  Starts at `1`.
68    pub version: u64,
69    /// Unix timestamp (seconds) of the last update.
70    pub updated_at: i64,
71    /// Where this configuration version came from.
72    pub source: ConfigSource,
73}
74
75impl<T: Clone + Serialize> ReloadableConfig<T> {
76    /// Create a new `ReloadableConfig` at version `1`.
77    pub fn new(inner: T) -> Self {
78        Self {
79            inner,
80            version: 1,
81            updated_at: chrono::Utc::now().timestamp(),
82            source: ConfigSource::Default,
83        }
84    }
85
86    /// Create a new `ReloadableConfig` at version `1` with an explicit source.
87    pub fn with_source(inner: T, source: ConfigSource) -> Self {
88        Self {
89            inner,
90            version: 1,
91            updated_at: chrono::Utc::now().timestamp(),
92            source,
93        }
94    }
95
96    /// Replace the inner config, increment the version, and return the new version.
97    pub fn update(&mut self, inner: T) -> u64 {
98        self.inner = inner;
99        self.version += 1;
100        self.updated_at = chrono::Utc::now().timestamp();
101        self.version
102    }
103
104    /// Replace the inner config with an explicit source, increment the version,
105    /// and return the new version.
106    pub fn update_with_source(&mut self, inner: T, source: ConfigSource) -> u64 {
107        self.inner = inner;
108        self.version += 1;
109        self.updated_at = chrono::Utc::now().timestamp();
110        self.source = source;
111        self.version
112    }
113}
114
115// ─── ConfigDiff ───────────────────────────────────────────────────────────────
116
117/// Describes a single field-level change between two `IndexerConfig` values.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ConfigDiff {
120    /// The name of the field that changed.
121    pub field: String,
122    /// The previous value (serialized as JSON).
123    pub old_value: serde_json::Value,
124    /// The new value (serialized as JSON).
125    pub new_value: serde_json::Value,
126}
127
128impl ConfigDiff {
129    fn new(
130        field: impl Into<String>,
131        old_value: serde_json::Value,
132        new_value: serde_json::Value,
133    ) -> Self {
134        Self {
135            field: field.into(),
136            old_value,
137            new_value,
138        }
139    }
140}
141
142/// Compare two `IndexerConfig` values and return a list of field-level diffs.
143///
144/// Returns an empty `Vec` when the two configs are identical.
145pub fn diff_configs(old: &IndexerConfig, new: &IndexerConfig) -> Vec<ConfigDiff> {
146    let mut diffs = Vec::new();
147
148    macro_rules! check {
149        ($field:ident) => {
150            if old.$field != new.$field {
151                diffs.push(ConfigDiff::new(
152                    stringify!($field),
153                    serde_json::to_value(&old.$field).unwrap_or(serde_json::Value::Null),
154                    serde_json::to_value(&new.$field).unwrap_or(serde_json::Value::Null),
155                ));
156            }
157        };
158    }
159
160    check!(id);
161    check!(chain);
162    check!(from_block);
163    check!(to_block);
164    check!(confirmation_depth);
165    check!(batch_size);
166    check!(checkpoint_interval);
167    check!(poll_interval_ms);
168
169    // EventFilter sub-fields
170    if old.filter.addresses != new.filter.addresses {
171        diffs.push(ConfigDiff::new(
172            "filter.addresses",
173            serde_json::to_value(&old.filter.addresses).unwrap_or(serde_json::Value::Null),
174            serde_json::to_value(&new.filter.addresses).unwrap_or(serde_json::Value::Null),
175        ));
176    }
177    if old.filter.topic0_values != new.filter.topic0_values {
178        diffs.push(ConfigDiff::new(
179            "filter.topic0_values",
180            serde_json::to_value(&old.filter.topic0_values).unwrap_or(serde_json::Value::Null),
181            serde_json::to_value(&new.filter.topic0_values).unwrap_or(serde_json::Value::Null),
182        ));
183    }
184    if old.filter.from_block != new.filter.from_block {
185        diffs.push(ConfigDiff::new(
186            "filter.from_block",
187            serde_json::to_value(old.filter.from_block).unwrap_or(serde_json::Value::Null),
188            serde_json::to_value(new.filter.from_block).unwrap_or(serde_json::Value::Null),
189        ));
190    }
191    if old.filter.to_block != new.filter.to_block {
192        diffs.push(ConfigDiff::new(
193            "filter.to_block",
194            serde_json::to_value(old.filter.to_block).unwrap_or(serde_json::Value::Null),
195            serde_json::to_value(new.filter.to_block).unwrap_or(serde_json::Value::Null),
196        ));
197    }
198
199    diffs
200}
201
202// ─── WarningSeverity ──────────────────────────────────────────────────────────
203
204/// Severity level of a configuration warning.
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
206pub enum WarningSeverity {
207    /// Informational — no action required.
208    Info,
209    /// Potential issue — the change may have unintended consequences.
210    Warning,
211    /// The change is highly risky and may break the indexer.
212    Critical,
213}
214
215impl std::fmt::Display for WarningSeverity {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        match self {
218            Self::Info => write!(f, "INFO"),
219            Self::Warning => write!(f, "WARNING"),
220            Self::Critical => write!(f, "CRITICAL"),
221        }
222    }
223}
224
225// ─── ConfigWarning ────────────────────────────────────────────────────────────
226
227/// A non-fatal advisory raised by `ConfigValidator`.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct ConfigWarning {
230    /// The field or aspect that triggered the warning.
231    pub field: String,
232    /// Human-readable explanation.
233    pub message: String,
234    /// How serious this warning is.
235    pub severity: WarningSeverity,
236}
237
238impl ConfigWarning {
239    fn new(
240        field: impl Into<String>,
241        message: impl Into<String>,
242        severity: WarningSeverity,
243    ) -> Self {
244        Self {
245            field: field.into(),
246            message: message.into(),
247            severity,
248        }
249    }
250}
251
252// ─── ConfigValidator ──────────────────────────────────────────────────────────
253
254/// Validates that a proposed `IndexerConfig` update is safe to apply.
255pub struct ConfigValidator;
256
257impl ConfigValidator {
258    /// Validate the transition from `old` to `new`.
259    ///
260    /// Returns `Ok(warnings)` when the change is permitted (possibly with
261    /// advisory warnings), or `Err(IndexerError)` when the change is rejected
262    /// outright (breaking change).
263    pub fn validate(
264        old: &IndexerConfig,
265        new: &IndexerConfig,
266    ) -> Result<Vec<ConfigWarning>, IndexerError> {
267        let mut warnings = Vec::new();
268
269        // --- Hard rejections ---
270
271        if old.chain != new.chain {
272            return Err(IndexerError::Other(format!(
273                "hot-reload: cannot change chain from '{}' to '{}' — stop and reconfigure the indexer",
274                old.chain, new.chain
275            )));
276        }
277
278        if old.from_block != new.from_block {
279            return Err(IndexerError::Other(format!(
280                "hot-reload: cannot change from_block from {} to {} — use a checkpoint to rewind instead",
281                old.from_block, new.from_block
282            )));
283        }
284
285        // --- Advisory warnings ---
286
287        if new.confirmation_depth < old.confirmation_depth {
288            warnings.push(ConfigWarning::new(
289                "confirmation_depth",
290                format!(
291                    "Decreasing confirmation_depth from {} to {} may cause premature finality and missed reorgs",
292                    old.confirmation_depth, new.confirmation_depth
293                ),
294                WarningSeverity::Warning,
295            ));
296        }
297
298        if new.batch_size > old.batch_size * 10 {
299            warnings.push(ConfigWarning::new(
300                "batch_size",
301                format!(
302                    "batch_size increased more than 10x (from {} to {}); RPC node may reject large eth_getLogs ranges",
303                    old.batch_size, new.batch_size
304                ),
305                WarningSeverity::Warning,
306            ));
307        }
308
309        if new.poll_interval_ms < 500 {
310            warnings.push(ConfigWarning::new(
311                "poll_interval_ms",
312                format!(
313                    "poll_interval_ms={} is very aggressive; may overwhelm the RPC endpoint",
314                    new.poll_interval_ms
315                ),
316                WarningSeverity::Warning,
317            ));
318        }
319
320        if new.checkpoint_interval == 0 {
321            warnings.push(ConfigWarning::new(
322                "checkpoint_interval",
323                "checkpoint_interval=0 disables checkpointing; crash recovery will be impaired",
324                WarningSeverity::Critical,
325            ));
326        }
327
328        if old.id != new.id {
329            warnings.push(ConfigWarning::new(
330                "id",
331                format!(
332                    "Changing indexer id from '{}' to '{}' will break checkpoint continuity",
333                    old.id, new.id
334                ),
335                WarningSeverity::Critical,
336            ));
337        }
338
339        Ok(warnings)
340    }
341
342    /// Returns `true` when the transition from `old` to `new` contains no
343    /// breaking changes (i.e. `validate` would succeed with no `Critical`
344    /// warnings).
345    pub fn is_safe_reload(old: &IndexerConfig, new: &IndexerConfig) -> bool {
346        match Self::validate(old, new) {
347            Err(_) => false,
348            Ok(warnings) => !warnings
349                .iter()
350                .any(|w| w.severity == WarningSeverity::Critical),
351        }
352    }
353}
354
355// ─── ReloadResult ─────────────────────────────────────────────────────────────
356
357/// The outcome of a successful hot-reload operation.
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct ReloadResult {
360    /// The new version number after the update.
361    pub version: u64,
362    /// Field-level diffs between the old and new config.
363    pub diffs: Vec<ConfigDiff>,
364    /// Advisory warnings raised by the validator.
365    pub warnings: Vec<ConfigWarning>,
366    /// Unix timestamp (seconds) when the reload was applied.
367    pub applied_at: i64,
368}
369
370// ─── ReloadRecord ─────────────────────────────────────────────────────────────
371
372/// An immutable record of a single hot-reload event kept in the history log.
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct ReloadRecord {
375    /// The version number assigned to this reload.
376    pub version: u64,
377    /// Field-level diffs at the time of this reload.
378    pub diffs: Vec<ConfigDiff>,
379    /// Unix timestamp (seconds) when this reload was applied.
380    pub applied_at: i64,
381    /// Source that triggered this reload.
382    pub source: ConfigSource,
383}
384
385// ─── Internal per-config state ────────────────────────────────────────────────
386
387struct ManagedConfig {
388    config: Arc<RwLock<ReloadableConfig<IndexerConfig>>>,
389    sender: watch::Sender<u64>,
390    history: Vec<ReloadRecord>,
391}
392
393// ─── HotReloadManager ─────────────────────────────────────────────────────────
394
395/// Central coordinator for all hot-reload operations.
396///
397/// Holds one `ReloadableConfig<IndexerConfig>` per registered indexer ID,
398/// and a `watch` channel per config so subscribers are notified on every
399/// version bump.
400pub struct HotReloadManager {
401    configs: RwLock<HashMap<String, ManagedConfig>>,
402}
403
404impl HotReloadManager {
405    /// Create a new, empty `HotReloadManager`.
406    pub fn new() -> Self {
407        Self {
408            configs: RwLock::new(HashMap::new()),
409        }
410    }
411
412    /// Register a new indexer config under `id`.
413    ///
414    /// Returns a shared handle to the `ReloadableConfig` so the caller can
415    /// read the current value cheaply without going through the manager.
416    pub async fn register_config(
417        &self,
418        id: &str,
419        config: IndexerConfig,
420    ) -> Arc<RwLock<ReloadableConfig<IndexerConfig>>> {
421        let reloadable = ReloadableConfig::new(config);
422        let version = reloadable.version;
423        let arc = Arc::new(RwLock::new(reloadable));
424        let (tx, _rx) = watch::channel(version);
425
426        let managed = ManagedConfig {
427            config: Arc::clone(&arc),
428            sender: tx,
429            history: Vec::new(),
430        };
431
432        self.configs.write().await.insert(id.to_string(), managed);
433        info!("hot-reload: registered config '{id}' at version {version}");
434        arc
435    }
436
437    /// Apply `new_config` to the indexer identified by `id`.
438    ///
439    /// The update is validated first; if validation fails the existing config
440    /// is left unchanged and an `Err` is returned.
441    pub async fn update_config(
442        &self,
443        id: &str,
444        new_config: IndexerConfig,
445    ) -> Result<ReloadResult, IndexerError> {
446        let mut guard = self.configs.write().await;
447        let managed = guard.get_mut(id).ok_or_else(|| {
448            IndexerError::Other(format!("hot-reload: no config registered for id '{id}'"))
449        })?;
450
451        let old_config = {
452            let r = managed.config.read().await;
453            r.inner.clone()
454        };
455
456        let warnings = ConfigValidator::validate(&old_config, &new_config)?;
457        let diffs = diff_configs(&old_config, &new_config);
458
459        let new_version = {
460            let mut w = managed.config.write().await;
461            w.update_with_source(new_config, ConfigSource::Manual)
462        };
463
464        let applied_at = chrono::Utc::now().timestamp();
465
466        managed.history.push(ReloadRecord {
467            version: new_version,
468            diffs: diffs.clone(),
469            applied_at,
470            source: ConfigSource::Manual,
471        });
472
473        // Notify subscribers.
474        let _ = managed.sender.send(new_version);
475
476        for w in &warnings {
477            warn!(
478                "hot-reload[{id}] v{new_version} {} [{}]: {}",
479                w.field, w.severity, w.message
480            );
481        }
482        debug!(
483            "hot-reload[{id}] bumped to v{new_version} ({} diffs)",
484            diffs.len()
485        );
486
487        Ok(ReloadResult {
488            version: new_version,
489            diffs,
490            warnings,
491            applied_at,
492        })
493    }
494
495    /// Return a clone of the current `IndexerConfig` for `id`, or `None` if
496    /// `id` is not registered.
497    pub async fn get_config(&self, id: &str) -> Option<IndexerConfig> {
498        let guard = self.configs.read().await;
499        let managed = guard.get(id)?;
500        let r = managed.config.read().await;
501        Some(r.inner.clone())
502    }
503
504    /// Return the current version number for `id`, or `None` if not registered.
505    pub async fn get_version(&self, id: &str) -> Option<u64> {
506        let guard = self.configs.read().await;
507        let managed = guard.get(id)?;
508        let r = managed.config.read().await;
509        Some(r.version)
510    }
511
512    /// Return a `watch::Receiver` that yields the new version on every reload.
513    ///
514    /// Returns `None` if `id` is not registered.
515    pub async fn subscribe(&self, id: &str) -> Option<watch::Receiver<u64>> {
516        let guard = self.configs.read().await;
517        let managed = guard.get(id)?;
518        Some(managed.sender.subscribe())
519    }
520
521    /// List all registered config IDs.
522    pub async fn configs(&self) -> Vec<String> {
523        let guard = self.configs.read().await;
524        guard.keys().cloned().collect()
525    }
526
527    /// Return the full reload history for `id`.
528    pub async fn history(&self, id: &str) -> Vec<ReloadRecord> {
529        let guard = self.configs.read().await;
530        match guard.get(id) {
531            Some(m) => m.history.clone(),
532            None => Vec::new(),
533        }
534    }
535}
536
537impl Default for HotReloadManager {
538    fn default() -> Self {
539        Self::new()
540    }
541}
542
543// ─── ConfigWatcher ────────────────────────────────────────────────────────────
544
545type ChangeCallback = Box<dyn Fn(Vec<ConfigDiff>) + Send + Sync>;
546
547/// Watches a `ReloadableConfig<IndexerConfig>` on a fixed polling interval and
548/// fires registered callbacks whenever the config changes.
549pub struct ConfigWatcher {
550    interval: Duration,
551    callbacks: Arc<RwLock<Vec<ChangeCallback>>>,
552    stop_tx: watch::Sender<bool>,
553}
554
555impl ConfigWatcher {
556    /// Create a new `ConfigWatcher` that polls every `interval`.
557    pub fn new(interval: Duration) -> Self {
558        let (stop_tx, _) = watch::channel(false);
559        Self {
560            interval,
561            callbacks: Arc::new(RwLock::new(Vec::new())),
562            stop_tx,
563        }
564    }
565
566    /// Start watching `config` for changes originating from `source`.
567    ///
568    /// Spawns a background `tokio` task that compares version numbers and
569    /// invokes all registered callbacks with the diffs when a change is
570    /// detected.
571    pub fn watch(
572        &self,
573        config: Arc<RwLock<ReloadableConfig<IndexerConfig>>>,
574        _source: ConfigSource,
575    ) {
576        let interval = self.interval;
577        let callbacks = Arc::clone(&self.callbacks);
578        let mut stop_rx = self.stop_tx.subscribe();
579
580        tokio::spawn(async move {
581            let mut last_version = {
582                let r = config.read().await;
583                r.version
584            };
585            let mut last_inner = {
586                let r = config.read().await;
587                r.inner.clone()
588            };
589
590            let mut ticker = tokio::time::interval(interval);
591            ticker.tick().await; // consume the immediate first tick
592
593            loop {
594                tokio::select! {
595                    _ = ticker.tick() => {
596                        let (cur_version, cur_inner) = {
597                            let r = config.read().await;
598                            (r.version, r.inner.clone())
599                        };
600
601                        if cur_version != last_version {
602                            let diffs = diff_configs(&last_inner, &cur_inner);
603                            debug!("config-watcher: version {} → {}, {} diffs", last_version, cur_version, diffs.len());
604
605                            let cbs = callbacks.read().await;
606                            for cb in cbs.iter() {
607                                cb(diffs.clone());
608                            }
609
610                            last_version = cur_version;
611                            last_inner = cur_inner;
612                        }
613                    }
614                    _ = stop_rx.changed() => {
615                        if *stop_rx.borrow() {
616                            debug!("config-watcher: stopped");
617                            break;
618                        }
619                    }
620                }
621            }
622        });
623    }
624
625    /// Register a callback to be invoked with the diffs on every detected change.
626    pub async fn on_change(&self, callback: ChangeCallback) {
627        self.callbacks.write().await.push(callback);
628    }
629
630    /// Stop the background watcher task.
631    pub fn stop(&self) {
632        let _ = self.stop_tx.send(true);
633    }
634}
635
636// ─── FilterReloader ───────────────────────────────────────────────────────────
637
638/// Fine-grained hot-reload helper for `EventFilter`.
639///
640/// Wraps the filter in a `RwLock` so individual addresses and topic0 values
641/// can be added/removed without replacing the whole config.
642pub struct FilterReloader {
643    filter: Arc<RwLock<EventFilter>>,
644}
645
646impl FilterReloader {
647    /// Create a new `FilterReloader` from an initial `EventFilter`.
648    pub fn new(filter: EventFilter) -> Self {
649        Self {
650            filter: Arc::new(RwLock::new(filter)),
651        }
652    }
653
654    /// Replace the entire filter and return the field-level diffs.
655    pub async fn update(&self, new_filter: EventFilter) -> Vec<ConfigDiff> {
656        let old_filter = self.filter.read().await.clone();
657
658        // Build a pair of stub configs so we can reuse diff_configs.
659        let old_cfg = stub_config_with_filter(old_filter);
660        let new_cfg = stub_config_with_filter(new_filter.clone());
661        let diffs = diff_configs(&old_cfg, &new_cfg);
662
663        *self.filter.write().await = new_filter;
664        diffs
665    }
666
667    /// Return a snapshot of the current `EventFilter`.
668    pub async fn current(&self) -> EventFilter {
669        self.filter.read().await.clone()
670    }
671
672    /// Add `addr` to the filter's address list (if not already present).
673    pub async fn add_address(&self, addr: &str) {
674        let mut f = self.filter.write().await;
675        let addr = addr.to_string();
676        if !f.addresses.contains(&addr) {
677            f.addresses.push(addr);
678        }
679    }
680
681    /// Remove `addr` from the filter's address list (case-sensitive).
682    pub async fn remove_address(&self, addr: &str) {
683        let mut f = self.filter.write().await;
684        f.addresses.retain(|a| a != addr);
685    }
686
687    /// Add `topic` to the filter's topic0 list (if not already present).
688    pub async fn add_topic0(&self, topic: &str) {
689        let mut f = self.filter.write().await;
690        let topic = topic.to_string();
691        if !f.topic0_values.contains(&topic) {
692            f.topic0_values.push(topic);
693        }
694    }
695
696    /// Remove `topic` from the filter's topic0 list (case-sensitive).
697    pub async fn remove_topic0(&self, topic: &str) {
698        let mut f = self.filter.write().await;
699        f.topic0_values.retain(|t| t != topic);
700    }
701}
702
703// Helper: build a minimal IndexerConfig that only differs in its filter.
704fn stub_config_with_filter(filter: EventFilter) -> IndexerConfig {
705    IndexerConfig {
706        id: "stub".into(),
707        chain: "ethereum".into(),
708        from_block: 0,
709        to_block: None,
710        confirmation_depth: 12,
711        batch_size: 1000,
712        checkpoint_interval: 100,
713        poll_interval_ms: 2000,
714        filter,
715    }
716}
717
718// ─── Tests ────────────────────────────────────────────────────────────────────
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use crate::indexer::IndexerConfig;
724    use crate::types::EventFilter;
725
726    // ── helpers ──────────────────────────────────────────────────────────────
727
728    fn base_config() -> IndexerConfig {
729        IndexerConfig {
730            id: "my-indexer".into(),
731            chain: "ethereum".into(),
732            from_block: 1_000_000,
733            to_block: None,
734            confirmation_depth: 12,
735            batch_size: 500,
736            checkpoint_interval: 100,
737            poll_interval_ms: 2_000,
738            filter: EventFilter::default(),
739        }
740    }
741
742    // ── ReloadableConfig versioning ───────────────────────────────────────────
743
744    #[test]
745    fn reloadable_config_starts_at_version_1() {
746        let cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
747        assert_eq!(cfg.version, 1);
748    }
749
750    #[test]
751    fn reloadable_config_update_increments_version() {
752        let mut cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
753        let v2 = cfg.update(base_config());
754        assert_eq!(v2, 2);
755        assert_eq!(cfg.version, 2);
756
757        let v3 = cfg.update(base_config());
758        assert_eq!(v3, 3);
759        assert_eq!(cfg.version, 3);
760    }
761
762    #[test]
763    fn reloadable_config_update_replaces_inner() {
764        let mut cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
765        let mut new_inner = base_config();
766        new_inner.batch_size = 9_999;
767        cfg.update(new_inner);
768        assert_eq!(cfg.inner.batch_size, 9_999);
769    }
770
771    #[test]
772    fn reloadable_config_updated_at_is_set() {
773        let cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
774        assert!(cfg.updated_at > 0);
775    }
776
777    // ── ConfigSource variants ─────────────────────────────────────────────────
778
779    #[test]
780    fn config_source_display() {
781        assert_eq!(ConfigSource::Default.to_string(), "default");
782        assert_eq!(ConfigSource::Environment.to_string(), "environment");
783        assert_eq!(ConfigSource::Api.to_string(), "api");
784        assert_eq!(ConfigSource::Manual.to_string(), "manual");
785        assert_eq!(
786            ConfigSource::File("/etc/chainindex.yaml".into()).to_string(),
787            "file:/etc/chainindex.yaml"
788        );
789    }
790
791    #[test]
792    fn config_source_equality() {
793        assert_eq!(ConfigSource::Manual, ConfigSource::Manual);
794        assert_ne!(ConfigSource::Api, ConfigSource::Manual);
795        assert_eq!(
796            ConfigSource::File("a.yaml".into()),
797            ConfigSource::File("a.yaml".into())
798        );
799        assert_ne!(
800            ConfigSource::File("a.yaml".into()),
801            ConfigSource::File("b.yaml".into())
802        );
803    }
804
805    // ── ConfigDiff ────────────────────────────────────────────────────────────
806
807    #[test]
808    fn diff_configs_empty_when_identical() {
809        let cfg = base_config();
810        let diffs = diff_configs(&cfg, &cfg);
811        assert!(
812            diffs.is_empty(),
813            "identical configs should produce no diffs"
814        );
815    }
816
817    #[test]
818    fn diff_configs_detects_batch_size_change() {
819        let old = base_config();
820        let mut new = base_config();
821        new.batch_size = 2_000;
822
823        let diffs = diff_configs(&old, &new);
824        assert_eq!(diffs.len(), 1);
825        assert_eq!(diffs[0].field, "batch_size");
826        assert_eq!(diffs[0].old_value, serde_json::json!(500u64));
827        assert_eq!(diffs[0].new_value, serde_json::json!(2_000u64));
828    }
829
830    #[test]
831    fn diff_configs_detects_poll_interval_change() {
832        let old = base_config();
833        let mut new = base_config();
834        new.poll_interval_ms = 500;
835
836        let diffs = diff_configs(&old, &new);
837        assert_eq!(diffs.len(), 1);
838        assert_eq!(diffs[0].field, "poll_interval_ms");
839    }
840
841    #[test]
842    fn diff_configs_detects_multiple_changes() {
843        let old = base_config();
844        let mut new = base_config();
845        new.batch_size = 10;
846        new.checkpoint_interval = 50;
847        new.poll_interval_ms = 1_000;
848
849        let diffs = diff_configs(&old, &new);
850        assert_eq!(diffs.len(), 3);
851        let fields: Vec<_> = diffs.iter().map(|d| d.field.as_str()).collect();
852        assert!(fields.contains(&"batch_size"));
853        assert!(fields.contains(&"checkpoint_interval"));
854        assert!(fields.contains(&"poll_interval_ms"));
855    }
856
857    #[test]
858    fn diff_configs_detects_filter_address_change() {
859        let old = base_config();
860        let mut new = base_config();
861        new.filter.addresses.push("0xDEAD".into());
862
863        let diffs = diff_configs(&old, &new);
864        assert_eq!(diffs.len(), 1);
865        assert_eq!(diffs[0].field, "filter.addresses");
866    }
867
868    // ── ConfigValidator ───────────────────────────────────────────────────────
869
870    #[test]
871    fn validator_rejects_chain_change() {
872        let old = base_config();
873        let mut new = base_config();
874        new.chain = "polygon".into();
875
876        let result = ConfigValidator::validate(&old, &new);
877        assert!(result.is_err(), "chain change must be rejected");
878        let err = result.unwrap_err().to_string();
879        assert!(
880            err.contains("chain"),
881            "error message should mention 'chain'"
882        );
883    }
884
885    #[test]
886    fn validator_rejects_from_block_change() {
887        let old = base_config();
888        let mut new = base_config();
889        new.from_block = 999_999;
890
891        let result = ConfigValidator::validate(&old, &new);
892        assert!(result.is_err(), "from_block change must be rejected");
893        let err = result.unwrap_err().to_string();
894        assert!(err.contains("from_block"));
895    }
896
897    #[test]
898    fn validator_allows_safe_reload() {
899        let old = base_config();
900        let mut new = base_config();
901        new.batch_size = 1_000;
902        new.poll_interval_ms = 3_000;
903
904        let result = ConfigValidator::validate(&old, &new);
905        assert!(result.is_ok());
906        let warnings = result.unwrap();
907        assert!(warnings.is_empty());
908    }
909
910    #[test]
911    fn validator_warns_on_confirmation_depth_decrease() {
912        let old = base_config();
913        let mut new = base_config();
914        new.confirmation_depth = 3; // was 12
915
916        let result = ConfigValidator::validate(&old, &new);
917        assert!(result.is_ok());
918        let warnings = result.unwrap();
919        assert_eq!(warnings.len(), 1);
920        assert_eq!(warnings[0].field, "confirmation_depth");
921        assert_eq!(warnings[0].severity, WarningSeverity::Warning);
922    }
923
924    #[test]
925    fn validator_is_safe_reload_false_for_chain_change() {
926        let old = base_config();
927        let mut new = base_config();
928        new.chain = "arbitrum".into();
929
930        assert!(!ConfigValidator::is_safe_reload(&old, &new));
931    }
932
933    #[test]
934    fn validator_is_safe_reload_true_for_batch_size_change() {
935        let old = base_config();
936        let mut new = base_config();
937        new.batch_size = 250;
938
939        assert!(ConfigValidator::is_safe_reload(&old, &new));
940    }
941
942    // ── WarningSeverity ───────────────────────────────────────────────────────
943
944    #[test]
945    fn warning_severity_display() {
946        assert_eq!(WarningSeverity::Info.to_string(), "INFO");
947        assert_eq!(WarningSeverity::Warning.to_string(), "WARNING");
948        assert_eq!(WarningSeverity::Critical.to_string(), "CRITICAL");
949    }
950
951    #[test]
952    fn config_warning_checkpoint_interval_zero_is_critical() {
953        let old = base_config();
954        let mut new = base_config();
955        new.checkpoint_interval = 0;
956
957        let warnings = ConfigValidator::validate(&old, &new).unwrap();
958        let critical: Vec<_> = warnings
959            .iter()
960            .filter(|w| w.severity == WarningSeverity::Critical)
961            .collect();
962        assert!(
963            !critical.is_empty(),
964            "checkpoint_interval=0 should raise Critical"
965        );
966    }
967
968    // ── HotReloadManager ─────────────────────────────────────────────────────
969
970    #[tokio::test]
971    async fn manager_register_and_get() {
972        let mgr = HotReloadManager::new();
973        mgr.register_config("idx-1", base_config()).await;
974
975        let cfg = mgr.get_config("idx-1").await;
976        assert!(cfg.is_some());
977        assert_eq!(cfg.unwrap().id, "my-indexer");
978    }
979
980    #[tokio::test]
981    async fn manager_update_config_bumps_version() {
982        let mgr = HotReloadManager::new();
983        mgr.register_config("idx-1", base_config()).await;
984
985        let mut new_cfg = base_config();
986        new_cfg.batch_size = 777;
987
988        let result = mgr.update_config("idx-1", new_cfg).await.unwrap();
989        assert_eq!(result.version, 2);
990        assert_eq!(result.diffs.len(), 1);
991        assert_eq!(result.diffs[0].field, "batch_size");
992    }
993
994    #[tokio::test]
995    async fn manager_subscribe_receives_version_bump() {
996        let mgr = HotReloadManager::new();
997        mgr.register_config("idx-1", base_config()).await;
998
999        let mut rx = mgr.subscribe("idx-1").await.unwrap();
1000        assert_eq!(*rx.borrow(), 1);
1001
1002        let mut new_cfg = base_config();
1003        new_cfg.poll_interval_ms = 500;
1004        mgr.update_config("idx-1", new_cfg).await.unwrap();
1005
1006        // The receiver should now observe version 2.
1007        rx.changed().await.unwrap();
1008        assert_eq!(*rx.borrow(), 2);
1009    }
1010
1011    #[tokio::test]
1012    async fn manager_history_tracks_reloads() {
1013        let mgr = HotReloadManager::new();
1014        mgr.register_config("idx-1", base_config()).await;
1015
1016        // Two successive updates.
1017        let mut c1 = base_config();
1018        c1.batch_size = 100;
1019        mgr.update_config("idx-1", c1).await.unwrap();
1020
1021        let mut c2 = base_config();
1022        c2.batch_size = 200;
1023        mgr.update_config("idx-1", c2).await.unwrap();
1024
1025        let history = mgr.history("idx-1").await;
1026        assert_eq!(history.len(), 2);
1027        assert_eq!(history[0].version, 2);
1028        assert_eq!(history[1].version, 3);
1029    }
1030
1031    #[tokio::test]
1032    async fn manager_unknown_config_returns_none() {
1033        let mgr = HotReloadManager::new();
1034        assert!(mgr.get_config("does-not-exist").await.is_none());
1035        assert!(mgr.get_version("does-not-exist").await.is_none());
1036        assert!(mgr.subscribe("does-not-exist").await.is_none());
1037    }
1038
1039    #[tokio::test]
1040    async fn manager_update_rejects_chain_change() {
1041        let mgr = HotReloadManager::new();
1042        mgr.register_config("idx-1", base_config()).await;
1043
1044        let mut bad = base_config();
1045        bad.chain = "solana".into();
1046
1047        let result = mgr.update_config("idx-1", bad).await;
1048        assert!(result.is_err());
1049        // Version must remain at 1.
1050        assert_eq!(mgr.get_version("idx-1").await.unwrap(), 1);
1051    }
1052
1053    #[tokio::test]
1054    async fn manager_multiple_registrations() {
1055        let mgr = HotReloadManager::new();
1056
1057        let mut cfg_a = base_config();
1058        cfg_a.id = "a".into();
1059        let mut cfg_b = base_config();
1060        cfg_b.id = "b".into();
1061        let mut cfg_c = base_config();
1062        cfg_c.id = "c".into();
1063
1064        mgr.register_config("a", cfg_a).await;
1065        mgr.register_config("b", cfg_b).await;
1066        mgr.register_config("c", cfg_c).await;
1067
1068        let mut ids = mgr.configs().await;
1069        ids.sort();
1070        assert_eq!(ids, vec!["a", "b", "c"]);
1071    }
1072
1073    #[tokio::test]
1074    async fn manager_get_version_initial() {
1075        let mgr = HotReloadManager::new();
1076        mgr.register_config("v-test", base_config()).await;
1077        assert_eq!(mgr.get_version("v-test").await.unwrap(), 1);
1078    }
1079
1080    // ── ReloadResult fields ───────────────────────────────────────────────────
1081
1082    #[tokio::test]
1083    async fn reload_result_fields_populated() {
1084        let mgr = HotReloadManager::new();
1085        mgr.register_config("r", base_config()).await;
1086
1087        let mut new_cfg = base_config();
1088        new_cfg.checkpoint_interval = 50;
1089        new_cfg.poll_interval_ms = 1_000;
1090
1091        let result = mgr.update_config("r", new_cfg).await.unwrap();
1092
1093        assert_eq!(result.version, 2);
1094        assert_eq!(result.diffs.len(), 2);
1095        assert!(result.applied_at > 0);
1096        // No warnings for these safe changes.
1097        assert!(result.warnings.is_empty());
1098    }
1099
1100    // ── FilterReloader ────────────────────────────────────────────────────────
1101
1102    #[tokio::test]
1103    async fn filter_reloader_add_address() {
1104        let fr = FilterReloader::new(EventFilter::default());
1105        fr.add_address("0xABCD").await;
1106        fr.add_address("0x1234").await;
1107
1108        let f = fr.current().await;
1109        assert_eq!(f.addresses.len(), 2);
1110        assert!(f.addresses.contains(&"0xABCD".to_string()));
1111        assert!(f.addresses.contains(&"0x1234".to_string()));
1112    }
1113
1114    #[tokio::test]
1115    async fn filter_reloader_add_address_no_duplicates() {
1116        let fr = FilterReloader::new(EventFilter::default());
1117        fr.add_address("0xABCD").await;
1118        fr.add_address("0xABCD").await; // duplicate — should be ignored
1119        let f = fr.current().await;
1120        assert_eq!(f.addresses.len(), 1);
1121    }
1122
1123    #[tokio::test]
1124    async fn filter_reloader_remove_address() {
1125        let fr = FilterReloader::new(EventFilter {
1126            addresses: vec!["0xAAAA".into(), "0xBBBB".into()],
1127            ..Default::default()
1128        });
1129        fr.remove_address("0xAAAA").await;
1130
1131        let f = fr.current().await;
1132        assert_eq!(f.addresses, vec!["0xBBBB".to_string()]);
1133    }
1134
1135    #[tokio::test]
1136    async fn filter_reloader_add_topic0() {
1137        let fr = FilterReloader::new(EventFilter::default());
1138        fr.add_topic0("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
1139            .await;
1140
1141        let f = fr.current().await;
1142        assert_eq!(f.topic0_values.len(), 1);
1143    }
1144
1145    #[tokio::test]
1146    async fn filter_reloader_current_returns_latest() {
1147        let fr = FilterReloader::new(EventFilter::default());
1148
1149        let new_filter = EventFilter {
1150            addresses: vec!["0xCafe".into()],
1151            topic0_values: vec!["0xdead".into()],
1152            from_block: Some(100),
1153            to_block: None,
1154        };
1155        fr.update(new_filter).await;
1156
1157        let current = fr.current().await;
1158        assert_eq!(current.addresses, vec!["0xCafe".to_string()]);
1159        assert_eq!(current.topic0_values, vec!["0xdead".to_string()]);
1160        assert_eq!(current.from_block, Some(100));
1161    }
1162
1163    #[tokio::test]
1164    async fn filter_reloader_update_returns_diffs() {
1165        let fr = FilterReloader::new(EventFilter::default());
1166
1167        let new_filter = EventFilter {
1168            addresses: vec!["0xFeed".into()],
1169            ..Default::default()
1170        };
1171        let diffs = fr.update(new_filter).await;
1172
1173        // Should detect the change in filter.addresses.
1174        assert!(
1175            diffs.iter().any(|d| d.field == "filter.addresses"),
1176            "expected filter.addresses diff"
1177        );
1178    }
1179}