1use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Duration;
22
23use serde::{Deserialize, Serialize};
24use tokio::sync::{watch, RwLock};
25use tracing::{debug, info, warn};
26
27use crate::error::IndexerError;
28use crate::indexer::IndexerConfig;
29use crate::types::EventFilter;
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub enum ConfigSource {
36 Default,
38 File(String),
40 Environment,
42 Api,
44 Manual,
46}
47
48impl std::fmt::Display for ConfigSource {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::Default => write!(f, "default"),
52 Self::File(p) => write!(f, "file:{p}"),
53 Self::Environment => write!(f, "environment"),
54 Self::Api => write!(f, "api"),
55 Self::Manual => write!(f, "manual"),
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ReloadableConfig<T> {
65 pub inner: T,
67 pub version: u64,
69 pub updated_at: i64,
71 pub source: ConfigSource,
73}
74
75impl<T: Clone + Serialize> ReloadableConfig<T> {
76 pub fn new(inner: T) -> Self {
78 Self {
79 inner,
80 version: 1,
81 updated_at: chrono::Utc::now().timestamp(),
82 source: ConfigSource::Default,
83 }
84 }
85
86 pub fn with_source(inner: T, source: ConfigSource) -> Self {
88 Self {
89 inner,
90 version: 1,
91 updated_at: chrono::Utc::now().timestamp(),
92 source,
93 }
94 }
95
96 pub fn update(&mut self, inner: T) -> u64 {
98 self.inner = inner;
99 self.version += 1;
100 self.updated_at = chrono::Utc::now().timestamp();
101 self.version
102 }
103
104 pub fn update_with_source(&mut self, inner: T, source: ConfigSource) -> u64 {
107 self.inner = inner;
108 self.version += 1;
109 self.updated_at = chrono::Utc::now().timestamp();
110 self.source = source;
111 self.version
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ConfigDiff {
120 pub field: String,
122 pub old_value: serde_json::Value,
124 pub new_value: serde_json::Value,
126}
127
128impl ConfigDiff {
129 fn new(
130 field: impl Into<String>,
131 old_value: serde_json::Value,
132 new_value: serde_json::Value,
133 ) -> Self {
134 Self {
135 field: field.into(),
136 old_value,
137 new_value,
138 }
139 }
140}
141
142pub fn diff_configs(old: &IndexerConfig, new: &IndexerConfig) -> Vec<ConfigDiff> {
146 let mut diffs = Vec::new();
147
148 macro_rules! check {
149 ($field:ident) => {
150 if old.$field != new.$field {
151 diffs.push(ConfigDiff::new(
152 stringify!($field),
153 serde_json::to_value(&old.$field).unwrap_or(serde_json::Value::Null),
154 serde_json::to_value(&new.$field).unwrap_or(serde_json::Value::Null),
155 ));
156 }
157 };
158 }
159
160 check!(id);
161 check!(chain);
162 check!(from_block);
163 check!(to_block);
164 check!(confirmation_depth);
165 check!(batch_size);
166 check!(checkpoint_interval);
167 check!(poll_interval_ms);
168
169 if old.filter.addresses != new.filter.addresses {
171 diffs.push(ConfigDiff::new(
172 "filter.addresses",
173 serde_json::to_value(&old.filter.addresses).unwrap_or(serde_json::Value::Null),
174 serde_json::to_value(&new.filter.addresses).unwrap_or(serde_json::Value::Null),
175 ));
176 }
177 if old.filter.topic0_values != new.filter.topic0_values {
178 diffs.push(ConfigDiff::new(
179 "filter.topic0_values",
180 serde_json::to_value(&old.filter.topic0_values).unwrap_or(serde_json::Value::Null),
181 serde_json::to_value(&new.filter.topic0_values).unwrap_or(serde_json::Value::Null),
182 ));
183 }
184 if old.filter.from_block != new.filter.from_block {
185 diffs.push(ConfigDiff::new(
186 "filter.from_block",
187 serde_json::to_value(old.filter.from_block).unwrap_or(serde_json::Value::Null),
188 serde_json::to_value(new.filter.from_block).unwrap_or(serde_json::Value::Null),
189 ));
190 }
191 if old.filter.to_block != new.filter.to_block {
192 diffs.push(ConfigDiff::new(
193 "filter.to_block",
194 serde_json::to_value(old.filter.to_block).unwrap_or(serde_json::Value::Null),
195 serde_json::to_value(new.filter.to_block).unwrap_or(serde_json::Value::Null),
196 ));
197 }
198
199 diffs
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
206pub enum WarningSeverity {
207 Info,
209 Warning,
211 Critical,
213}
214
215impl std::fmt::Display for WarningSeverity {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 match self {
218 Self::Info => write!(f, "INFO"),
219 Self::Warning => write!(f, "WARNING"),
220 Self::Critical => write!(f, "CRITICAL"),
221 }
222 }
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct ConfigWarning {
230 pub field: String,
232 pub message: String,
234 pub severity: WarningSeverity,
236}
237
238impl ConfigWarning {
239 fn new(
240 field: impl Into<String>,
241 message: impl Into<String>,
242 severity: WarningSeverity,
243 ) -> Self {
244 Self {
245 field: field.into(),
246 message: message.into(),
247 severity,
248 }
249 }
250}
251
252pub struct ConfigValidator;
256
257impl ConfigValidator {
258 pub fn validate(
264 old: &IndexerConfig,
265 new: &IndexerConfig,
266 ) -> Result<Vec<ConfigWarning>, IndexerError> {
267 let mut warnings = Vec::new();
268
269 if old.chain != new.chain {
272 return Err(IndexerError::Other(format!(
273 "hot-reload: cannot change chain from '{}' to '{}' — stop and reconfigure the indexer",
274 old.chain, new.chain
275 )));
276 }
277
278 if old.from_block != new.from_block {
279 return Err(IndexerError::Other(format!(
280 "hot-reload: cannot change from_block from {} to {} — use a checkpoint to rewind instead",
281 old.from_block, new.from_block
282 )));
283 }
284
285 if new.confirmation_depth < old.confirmation_depth {
288 warnings.push(ConfigWarning::new(
289 "confirmation_depth",
290 format!(
291 "Decreasing confirmation_depth from {} to {} may cause premature finality and missed reorgs",
292 old.confirmation_depth, new.confirmation_depth
293 ),
294 WarningSeverity::Warning,
295 ));
296 }
297
298 if new.batch_size > old.batch_size * 10 {
299 warnings.push(ConfigWarning::new(
300 "batch_size",
301 format!(
302 "batch_size increased more than 10x (from {} to {}); RPC node may reject large eth_getLogs ranges",
303 old.batch_size, new.batch_size
304 ),
305 WarningSeverity::Warning,
306 ));
307 }
308
309 if new.poll_interval_ms < 500 {
310 warnings.push(ConfigWarning::new(
311 "poll_interval_ms",
312 format!(
313 "poll_interval_ms={} is very aggressive; may overwhelm the RPC endpoint",
314 new.poll_interval_ms
315 ),
316 WarningSeverity::Warning,
317 ));
318 }
319
320 if new.checkpoint_interval == 0 {
321 warnings.push(ConfigWarning::new(
322 "checkpoint_interval",
323 "checkpoint_interval=0 disables checkpointing; crash recovery will be impaired",
324 WarningSeverity::Critical,
325 ));
326 }
327
328 if old.id != new.id {
329 warnings.push(ConfigWarning::new(
330 "id",
331 format!(
332 "Changing indexer id from '{}' to '{}' will break checkpoint continuity",
333 old.id, new.id
334 ),
335 WarningSeverity::Critical,
336 ));
337 }
338
339 Ok(warnings)
340 }
341
342 pub fn is_safe_reload(old: &IndexerConfig, new: &IndexerConfig) -> bool {
346 match Self::validate(old, new) {
347 Err(_) => false,
348 Ok(warnings) => !warnings
349 .iter()
350 .any(|w| w.severity == WarningSeverity::Critical),
351 }
352 }
353}
354
355#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct ReloadResult {
360 pub version: u64,
362 pub diffs: Vec<ConfigDiff>,
364 pub warnings: Vec<ConfigWarning>,
366 pub applied_at: i64,
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct ReloadRecord {
375 pub version: u64,
377 pub diffs: Vec<ConfigDiff>,
379 pub applied_at: i64,
381 pub source: ConfigSource,
383}
384
385struct ManagedConfig {
388 config: Arc<RwLock<ReloadableConfig<IndexerConfig>>>,
389 sender: watch::Sender<u64>,
390 history: Vec<ReloadRecord>,
391}
392
393pub struct HotReloadManager {
401 configs: RwLock<HashMap<String, ManagedConfig>>,
402}
403
404impl HotReloadManager {
405 pub fn new() -> Self {
407 Self {
408 configs: RwLock::new(HashMap::new()),
409 }
410 }
411
412 pub async fn register_config(
417 &self,
418 id: &str,
419 config: IndexerConfig,
420 ) -> Arc<RwLock<ReloadableConfig<IndexerConfig>>> {
421 let reloadable = ReloadableConfig::new(config);
422 let version = reloadable.version;
423 let arc = Arc::new(RwLock::new(reloadable));
424 let (tx, _rx) = watch::channel(version);
425
426 let managed = ManagedConfig {
427 config: Arc::clone(&arc),
428 sender: tx,
429 history: Vec::new(),
430 };
431
432 self.configs.write().await.insert(id.to_string(), managed);
433 info!("hot-reload: registered config '{id}' at version {version}");
434 arc
435 }
436
437 pub async fn update_config(
442 &self,
443 id: &str,
444 new_config: IndexerConfig,
445 ) -> Result<ReloadResult, IndexerError> {
446 let mut guard = self.configs.write().await;
447 let managed = guard.get_mut(id).ok_or_else(|| {
448 IndexerError::Other(format!("hot-reload: no config registered for id '{id}'"))
449 })?;
450
451 let old_config = {
452 let r = managed.config.read().await;
453 r.inner.clone()
454 };
455
456 let warnings = ConfigValidator::validate(&old_config, &new_config)?;
457 let diffs = diff_configs(&old_config, &new_config);
458
459 let new_version = {
460 let mut w = managed.config.write().await;
461 w.update_with_source(new_config, ConfigSource::Manual)
462 };
463
464 let applied_at = chrono::Utc::now().timestamp();
465
466 managed.history.push(ReloadRecord {
467 version: new_version,
468 diffs: diffs.clone(),
469 applied_at,
470 source: ConfigSource::Manual,
471 });
472
473 let _ = managed.sender.send(new_version);
475
476 for w in &warnings {
477 warn!(
478 "hot-reload[{id}] v{new_version} {} [{}]: {}",
479 w.field, w.severity, w.message
480 );
481 }
482 debug!(
483 "hot-reload[{id}] bumped to v{new_version} ({} diffs)",
484 diffs.len()
485 );
486
487 Ok(ReloadResult {
488 version: new_version,
489 diffs,
490 warnings,
491 applied_at,
492 })
493 }
494
495 pub async fn get_config(&self, id: &str) -> Option<IndexerConfig> {
498 let guard = self.configs.read().await;
499 let managed = guard.get(id)?;
500 let r = managed.config.read().await;
501 Some(r.inner.clone())
502 }
503
504 pub async fn get_version(&self, id: &str) -> Option<u64> {
506 let guard = self.configs.read().await;
507 let managed = guard.get(id)?;
508 let r = managed.config.read().await;
509 Some(r.version)
510 }
511
512 pub async fn subscribe(&self, id: &str) -> Option<watch::Receiver<u64>> {
516 let guard = self.configs.read().await;
517 let managed = guard.get(id)?;
518 Some(managed.sender.subscribe())
519 }
520
521 pub async fn configs(&self) -> Vec<String> {
523 let guard = self.configs.read().await;
524 guard.keys().cloned().collect()
525 }
526
527 pub async fn history(&self, id: &str) -> Vec<ReloadRecord> {
529 let guard = self.configs.read().await;
530 match guard.get(id) {
531 Some(m) => m.history.clone(),
532 None => Vec::new(),
533 }
534 }
535}
536
537impl Default for HotReloadManager {
538 fn default() -> Self {
539 Self::new()
540 }
541}
542
543type ChangeCallback = Box<dyn Fn(Vec<ConfigDiff>) + Send + Sync>;
546
547pub struct ConfigWatcher {
550 interval: Duration,
551 callbacks: Arc<RwLock<Vec<ChangeCallback>>>,
552 stop_tx: watch::Sender<bool>,
553}
554
555impl ConfigWatcher {
556 pub fn new(interval: Duration) -> Self {
558 let (stop_tx, _) = watch::channel(false);
559 Self {
560 interval,
561 callbacks: Arc::new(RwLock::new(Vec::new())),
562 stop_tx,
563 }
564 }
565
566 pub fn watch(
572 &self,
573 config: Arc<RwLock<ReloadableConfig<IndexerConfig>>>,
574 _source: ConfigSource,
575 ) {
576 let interval = self.interval;
577 let callbacks = Arc::clone(&self.callbacks);
578 let mut stop_rx = self.stop_tx.subscribe();
579
580 tokio::spawn(async move {
581 let mut last_version = {
582 let r = config.read().await;
583 r.version
584 };
585 let mut last_inner = {
586 let r = config.read().await;
587 r.inner.clone()
588 };
589
590 let mut ticker = tokio::time::interval(interval);
591 ticker.tick().await; loop {
594 tokio::select! {
595 _ = ticker.tick() => {
596 let (cur_version, cur_inner) = {
597 let r = config.read().await;
598 (r.version, r.inner.clone())
599 };
600
601 if cur_version != last_version {
602 let diffs = diff_configs(&last_inner, &cur_inner);
603 debug!("config-watcher: version {} → {}, {} diffs", last_version, cur_version, diffs.len());
604
605 let cbs = callbacks.read().await;
606 for cb in cbs.iter() {
607 cb(diffs.clone());
608 }
609
610 last_version = cur_version;
611 last_inner = cur_inner;
612 }
613 }
614 _ = stop_rx.changed() => {
615 if *stop_rx.borrow() {
616 debug!("config-watcher: stopped");
617 break;
618 }
619 }
620 }
621 }
622 });
623 }
624
625 pub async fn on_change(&self, callback: ChangeCallback) {
627 self.callbacks.write().await.push(callback);
628 }
629
630 pub fn stop(&self) {
632 let _ = self.stop_tx.send(true);
633 }
634}
635
636pub struct FilterReloader {
643 filter: Arc<RwLock<EventFilter>>,
644}
645
646impl FilterReloader {
647 pub fn new(filter: EventFilter) -> Self {
649 Self {
650 filter: Arc::new(RwLock::new(filter)),
651 }
652 }
653
654 pub async fn update(&self, new_filter: EventFilter) -> Vec<ConfigDiff> {
656 let old_filter = self.filter.read().await.clone();
657
658 let old_cfg = stub_config_with_filter(old_filter);
660 let new_cfg = stub_config_with_filter(new_filter.clone());
661 let diffs = diff_configs(&old_cfg, &new_cfg);
662
663 *self.filter.write().await = new_filter;
664 diffs
665 }
666
667 pub async fn current(&self) -> EventFilter {
669 self.filter.read().await.clone()
670 }
671
672 pub async fn add_address(&self, addr: &str) {
674 let mut f = self.filter.write().await;
675 let addr = addr.to_string();
676 if !f.addresses.contains(&addr) {
677 f.addresses.push(addr);
678 }
679 }
680
681 pub async fn remove_address(&self, addr: &str) {
683 let mut f = self.filter.write().await;
684 f.addresses.retain(|a| a != addr);
685 }
686
687 pub async fn add_topic0(&self, topic: &str) {
689 let mut f = self.filter.write().await;
690 let topic = topic.to_string();
691 if !f.topic0_values.contains(&topic) {
692 f.topic0_values.push(topic);
693 }
694 }
695
696 pub async fn remove_topic0(&self, topic: &str) {
698 let mut f = self.filter.write().await;
699 f.topic0_values.retain(|t| t != topic);
700 }
701}
702
703fn stub_config_with_filter(filter: EventFilter) -> IndexerConfig {
705 IndexerConfig {
706 id: "stub".into(),
707 chain: "ethereum".into(),
708 from_block: 0,
709 to_block: None,
710 confirmation_depth: 12,
711 batch_size: 1000,
712 checkpoint_interval: 100,
713 poll_interval_ms: 2000,
714 filter,
715 }
716}
717
718#[cfg(test)]
721mod tests {
722 use super::*;
723 use crate::indexer::IndexerConfig;
724 use crate::types::EventFilter;
725
726 fn base_config() -> IndexerConfig {
729 IndexerConfig {
730 id: "my-indexer".into(),
731 chain: "ethereum".into(),
732 from_block: 1_000_000,
733 to_block: None,
734 confirmation_depth: 12,
735 batch_size: 500,
736 checkpoint_interval: 100,
737 poll_interval_ms: 2_000,
738 filter: EventFilter::default(),
739 }
740 }
741
742 #[test]
745 fn reloadable_config_starts_at_version_1() {
746 let cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
747 assert_eq!(cfg.version, 1);
748 }
749
750 #[test]
751 fn reloadable_config_update_increments_version() {
752 let mut cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
753 let v2 = cfg.update(base_config());
754 assert_eq!(v2, 2);
755 assert_eq!(cfg.version, 2);
756
757 let v3 = cfg.update(base_config());
758 assert_eq!(v3, 3);
759 assert_eq!(cfg.version, 3);
760 }
761
762 #[test]
763 fn reloadable_config_update_replaces_inner() {
764 let mut cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
765 let mut new_inner = base_config();
766 new_inner.batch_size = 9_999;
767 cfg.update(new_inner);
768 assert_eq!(cfg.inner.batch_size, 9_999);
769 }
770
771 #[test]
772 fn reloadable_config_updated_at_is_set() {
773 let cfg: ReloadableConfig<IndexerConfig> = ReloadableConfig::new(base_config());
774 assert!(cfg.updated_at > 0);
775 }
776
777 #[test]
780 fn config_source_display() {
781 assert_eq!(ConfigSource::Default.to_string(), "default");
782 assert_eq!(ConfigSource::Environment.to_string(), "environment");
783 assert_eq!(ConfigSource::Api.to_string(), "api");
784 assert_eq!(ConfigSource::Manual.to_string(), "manual");
785 assert_eq!(
786 ConfigSource::File("/etc/chainindex.yaml".into()).to_string(),
787 "file:/etc/chainindex.yaml"
788 );
789 }
790
791 #[test]
792 fn config_source_equality() {
793 assert_eq!(ConfigSource::Manual, ConfigSource::Manual);
794 assert_ne!(ConfigSource::Api, ConfigSource::Manual);
795 assert_eq!(
796 ConfigSource::File("a.yaml".into()),
797 ConfigSource::File("a.yaml".into())
798 );
799 assert_ne!(
800 ConfigSource::File("a.yaml".into()),
801 ConfigSource::File("b.yaml".into())
802 );
803 }
804
805 #[test]
808 fn diff_configs_empty_when_identical() {
809 let cfg = base_config();
810 let diffs = diff_configs(&cfg, &cfg);
811 assert!(
812 diffs.is_empty(),
813 "identical configs should produce no diffs"
814 );
815 }
816
817 #[test]
818 fn diff_configs_detects_batch_size_change() {
819 let old = base_config();
820 let mut new = base_config();
821 new.batch_size = 2_000;
822
823 let diffs = diff_configs(&old, &new);
824 assert_eq!(diffs.len(), 1);
825 assert_eq!(diffs[0].field, "batch_size");
826 assert_eq!(diffs[0].old_value, serde_json::json!(500u64));
827 assert_eq!(diffs[0].new_value, serde_json::json!(2_000u64));
828 }
829
830 #[test]
831 fn diff_configs_detects_poll_interval_change() {
832 let old = base_config();
833 let mut new = base_config();
834 new.poll_interval_ms = 500;
835
836 let diffs = diff_configs(&old, &new);
837 assert_eq!(diffs.len(), 1);
838 assert_eq!(diffs[0].field, "poll_interval_ms");
839 }
840
841 #[test]
842 fn diff_configs_detects_multiple_changes() {
843 let old = base_config();
844 let mut new = base_config();
845 new.batch_size = 10;
846 new.checkpoint_interval = 50;
847 new.poll_interval_ms = 1_000;
848
849 let diffs = diff_configs(&old, &new);
850 assert_eq!(diffs.len(), 3);
851 let fields: Vec<_> = diffs.iter().map(|d| d.field.as_str()).collect();
852 assert!(fields.contains(&"batch_size"));
853 assert!(fields.contains(&"checkpoint_interval"));
854 assert!(fields.contains(&"poll_interval_ms"));
855 }
856
857 #[test]
858 fn diff_configs_detects_filter_address_change() {
859 let old = base_config();
860 let mut new = base_config();
861 new.filter.addresses.push("0xDEAD".into());
862
863 let diffs = diff_configs(&old, &new);
864 assert_eq!(diffs.len(), 1);
865 assert_eq!(diffs[0].field, "filter.addresses");
866 }
867
868 #[test]
871 fn validator_rejects_chain_change() {
872 let old = base_config();
873 let mut new = base_config();
874 new.chain = "polygon".into();
875
876 let result = ConfigValidator::validate(&old, &new);
877 assert!(result.is_err(), "chain change must be rejected");
878 let err = result.unwrap_err().to_string();
879 assert!(
880 err.contains("chain"),
881 "error message should mention 'chain'"
882 );
883 }
884
885 #[test]
886 fn validator_rejects_from_block_change() {
887 let old = base_config();
888 let mut new = base_config();
889 new.from_block = 999_999;
890
891 let result = ConfigValidator::validate(&old, &new);
892 assert!(result.is_err(), "from_block change must be rejected");
893 let err = result.unwrap_err().to_string();
894 assert!(err.contains("from_block"));
895 }
896
897 #[test]
898 fn validator_allows_safe_reload() {
899 let old = base_config();
900 let mut new = base_config();
901 new.batch_size = 1_000;
902 new.poll_interval_ms = 3_000;
903
904 let result = ConfigValidator::validate(&old, &new);
905 assert!(result.is_ok());
906 let warnings = result.unwrap();
907 assert!(warnings.is_empty());
908 }
909
910 #[test]
911 fn validator_warns_on_confirmation_depth_decrease() {
912 let old = base_config();
913 let mut new = base_config();
914 new.confirmation_depth = 3; let result = ConfigValidator::validate(&old, &new);
917 assert!(result.is_ok());
918 let warnings = result.unwrap();
919 assert_eq!(warnings.len(), 1);
920 assert_eq!(warnings[0].field, "confirmation_depth");
921 assert_eq!(warnings[0].severity, WarningSeverity::Warning);
922 }
923
924 #[test]
925 fn validator_is_safe_reload_false_for_chain_change() {
926 let old = base_config();
927 let mut new = base_config();
928 new.chain = "arbitrum".into();
929
930 assert!(!ConfigValidator::is_safe_reload(&old, &new));
931 }
932
933 #[test]
934 fn validator_is_safe_reload_true_for_batch_size_change() {
935 let old = base_config();
936 let mut new = base_config();
937 new.batch_size = 250;
938
939 assert!(ConfigValidator::is_safe_reload(&old, &new));
940 }
941
942 #[test]
945 fn warning_severity_display() {
946 assert_eq!(WarningSeverity::Info.to_string(), "INFO");
947 assert_eq!(WarningSeverity::Warning.to_string(), "WARNING");
948 assert_eq!(WarningSeverity::Critical.to_string(), "CRITICAL");
949 }
950
951 #[test]
952 fn config_warning_checkpoint_interval_zero_is_critical() {
953 let old = base_config();
954 let mut new = base_config();
955 new.checkpoint_interval = 0;
956
957 let warnings = ConfigValidator::validate(&old, &new).unwrap();
958 let critical: Vec<_> = warnings
959 .iter()
960 .filter(|w| w.severity == WarningSeverity::Critical)
961 .collect();
962 assert!(
963 !critical.is_empty(),
964 "checkpoint_interval=0 should raise Critical"
965 );
966 }
967
968 #[tokio::test]
971 async fn manager_register_and_get() {
972 let mgr = HotReloadManager::new();
973 mgr.register_config("idx-1", base_config()).await;
974
975 let cfg = mgr.get_config("idx-1").await;
976 assert!(cfg.is_some());
977 assert_eq!(cfg.unwrap().id, "my-indexer");
978 }
979
980 #[tokio::test]
981 async fn manager_update_config_bumps_version() {
982 let mgr = HotReloadManager::new();
983 mgr.register_config("idx-1", base_config()).await;
984
985 let mut new_cfg = base_config();
986 new_cfg.batch_size = 777;
987
988 let result = mgr.update_config("idx-1", new_cfg).await.unwrap();
989 assert_eq!(result.version, 2);
990 assert_eq!(result.diffs.len(), 1);
991 assert_eq!(result.diffs[0].field, "batch_size");
992 }
993
994 #[tokio::test]
995 async fn manager_subscribe_receives_version_bump() {
996 let mgr = HotReloadManager::new();
997 mgr.register_config("idx-1", base_config()).await;
998
999 let mut rx = mgr.subscribe("idx-1").await.unwrap();
1000 assert_eq!(*rx.borrow(), 1);
1001
1002 let mut new_cfg = base_config();
1003 new_cfg.poll_interval_ms = 500;
1004 mgr.update_config("idx-1", new_cfg).await.unwrap();
1005
1006 rx.changed().await.unwrap();
1008 assert_eq!(*rx.borrow(), 2);
1009 }
1010
1011 #[tokio::test]
1012 async fn manager_history_tracks_reloads() {
1013 let mgr = HotReloadManager::new();
1014 mgr.register_config("idx-1", base_config()).await;
1015
1016 let mut c1 = base_config();
1018 c1.batch_size = 100;
1019 mgr.update_config("idx-1", c1).await.unwrap();
1020
1021 let mut c2 = base_config();
1022 c2.batch_size = 200;
1023 mgr.update_config("idx-1", c2).await.unwrap();
1024
1025 let history = mgr.history("idx-1").await;
1026 assert_eq!(history.len(), 2);
1027 assert_eq!(history[0].version, 2);
1028 assert_eq!(history[1].version, 3);
1029 }
1030
1031 #[tokio::test]
1032 async fn manager_unknown_config_returns_none() {
1033 let mgr = HotReloadManager::new();
1034 assert!(mgr.get_config("does-not-exist").await.is_none());
1035 assert!(mgr.get_version("does-not-exist").await.is_none());
1036 assert!(mgr.subscribe("does-not-exist").await.is_none());
1037 }
1038
1039 #[tokio::test]
1040 async fn manager_update_rejects_chain_change() {
1041 let mgr = HotReloadManager::new();
1042 mgr.register_config("idx-1", base_config()).await;
1043
1044 let mut bad = base_config();
1045 bad.chain = "solana".into();
1046
1047 let result = mgr.update_config("idx-1", bad).await;
1048 assert!(result.is_err());
1049 assert_eq!(mgr.get_version("idx-1").await.unwrap(), 1);
1051 }
1052
1053 #[tokio::test]
1054 async fn manager_multiple_registrations() {
1055 let mgr = HotReloadManager::new();
1056
1057 let mut cfg_a = base_config();
1058 cfg_a.id = "a".into();
1059 let mut cfg_b = base_config();
1060 cfg_b.id = "b".into();
1061 let mut cfg_c = base_config();
1062 cfg_c.id = "c".into();
1063
1064 mgr.register_config("a", cfg_a).await;
1065 mgr.register_config("b", cfg_b).await;
1066 mgr.register_config("c", cfg_c).await;
1067
1068 let mut ids = mgr.configs().await;
1069 ids.sort();
1070 assert_eq!(ids, vec!["a", "b", "c"]);
1071 }
1072
1073 #[tokio::test]
1074 async fn manager_get_version_initial() {
1075 let mgr = HotReloadManager::new();
1076 mgr.register_config("v-test", base_config()).await;
1077 assert_eq!(mgr.get_version("v-test").await.unwrap(), 1);
1078 }
1079
1080 #[tokio::test]
1083 async fn reload_result_fields_populated() {
1084 let mgr = HotReloadManager::new();
1085 mgr.register_config("r", base_config()).await;
1086
1087 let mut new_cfg = base_config();
1088 new_cfg.checkpoint_interval = 50;
1089 new_cfg.poll_interval_ms = 1_000;
1090
1091 let result = mgr.update_config("r", new_cfg).await.unwrap();
1092
1093 assert_eq!(result.version, 2);
1094 assert_eq!(result.diffs.len(), 2);
1095 assert!(result.applied_at > 0);
1096 assert!(result.warnings.is_empty());
1098 }
1099
1100 #[tokio::test]
1103 async fn filter_reloader_add_address() {
1104 let fr = FilterReloader::new(EventFilter::default());
1105 fr.add_address("0xABCD").await;
1106 fr.add_address("0x1234").await;
1107
1108 let f = fr.current().await;
1109 assert_eq!(f.addresses.len(), 2);
1110 assert!(f.addresses.contains(&"0xABCD".to_string()));
1111 assert!(f.addresses.contains(&"0x1234".to_string()));
1112 }
1113
1114 #[tokio::test]
1115 async fn filter_reloader_add_address_no_duplicates() {
1116 let fr = FilterReloader::new(EventFilter::default());
1117 fr.add_address("0xABCD").await;
1118 fr.add_address("0xABCD").await; let f = fr.current().await;
1120 assert_eq!(f.addresses.len(), 1);
1121 }
1122
1123 #[tokio::test]
1124 async fn filter_reloader_remove_address() {
1125 let fr = FilterReloader::new(EventFilter {
1126 addresses: vec!["0xAAAA".into(), "0xBBBB".into()],
1127 ..Default::default()
1128 });
1129 fr.remove_address("0xAAAA").await;
1130
1131 let f = fr.current().await;
1132 assert_eq!(f.addresses, vec!["0xBBBB".to_string()]);
1133 }
1134
1135 #[tokio::test]
1136 async fn filter_reloader_add_topic0() {
1137 let fr = FilterReloader::new(EventFilter::default());
1138 fr.add_topic0("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
1139 .await;
1140
1141 let f = fr.current().await;
1142 assert_eq!(f.topic0_values.len(), 1);
1143 }
1144
1145 #[tokio::test]
1146 async fn filter_reloader_current_returns_latest() {
1147 let fr = FilterReloader::new(EventFilter::default());
1148
1149 let new_filter = EventFilter {
1150 addresses: vec!["0xCafe".into()],
1151 topic0_values: vec!["0xdead".into()],
1152 from_block: Some(100),
1153 to_block: None,
1154 };
1155 fr.update(new_filter).await;
1156
1157 let current = fr.current().await;
1158 assert_eq!(current.addresses, vec!["0xCafe".to_string()]);
1159 assert_eq!(current.topic0_values, vec!["0xdead".to_string()]);
1160 assert_eq!(current.from_block, Some(100));
1161 }
1162
1163 #[tokio::test]
1164 async fn filter_reloader_update_returns_diffs() {
1165 let fr = FilterReloader::new(EventFilter::default());
1166
1167 let new_filter = EventFilter {
1168 addresses: vec!["0xFeed".into()],
1169 ..Default::default()
1170 };
1171 let diffs = fr.update(new_filter).await;
1172
1173 assert!(
1175 diffs.iter().any(|d| d.field == "filter.addresses"),
1176 "expected filter.addresses diff"
1177 );
1178 }
1179}