1mod context;
11mod fallback;
12mod fallback_metrics;
13mod handlers;
14mod http_trait;
15mod model_routing;
16mod model_routing_metrics;
17
18pub use context::{FallbackReason, RequestContext};
19pub use fallback::{FallbackDecision, FallbackEvaluator};
20pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
21pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
22pub use model_routing_metrics::{
23 get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
24};
25
26use anyhow::{Context, Result};
27use parking_lot::RwLock;
28use pingora::http::ResponseHeader;
29use pingora::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use sentinel_common::ids::{QualifiedId, Scope};
37use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
38
39use crate::agents::AgentManager;
40use crate::app::AppState;
41use crate::builtin_handlers::BuiltinHandlerState;
42use crate::cache::{CacheConfig, CacheManager};
43use crate::errors::ErrorHandler;
44use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
45use crate::health::PassiveHealthChecker;
46use crate::http_helpers;
47use crate::inference::InferenceRateLimitManager;
48use crate::logging::{LogManager, SharedLogManager};
49use crate::rate_limit::{RateLimitConfig, RateLimitManager};
50use crate::reload::{
51 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
52};
53use crate::routing::RouteMatcher;
54use crate::scoped_routing::ScopedRouteMatcher;
55use crate::static_files::StaticFileServer;
56use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
57use crate::validation::SchemaValidator;
58
59use sentinel_common::TraceIdFormat;
60use sentinel_config::{Config, FlattenedConfig};
61
62pub struct SentinelProxy {
64 pub config_manager: Arc<ConfigManager>,
66 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
68 pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
70 pub(super) upstream_pools: Registry<UpstreamPool>,
72 pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
74 pub(super) agent_manager: Arc<AgentManager>,
76 pub(super) passive_health: Arc<PassiveHealthChecker>,
78 pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
80 pub(super) scoped_metrics: Arc<ScopedMetrics>,
82 pub(super) app_state: Arc<AppState>,
84 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
86 pub(super) error_handlers: Registry<ErrorHandler>,
88 pub(super) validators: Registry<SchemaValidator>,
90 pub(super) static_servers: Registry<StaticFileServer>,
92 pub(super) builtin_state: Arc<BuiltinHandlerState>,
94 pub(super) log_manager: SharedLogManager,
96 pub(super) trace_id_format: TraceIdFormat,
98 pub(super) health_check_runner: Arc<HealthCheckRunner>,
100 pub(super) rate_limit_manager: Arc<RateLimitManager>,
102 pub(super) cache_manager: Arc<CacheManager>,
104 pub(super) geo_filter_manager: Arc<GeoFilterManager>,
106 pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
108 pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
110 pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
112 pub acme_challenges: Option<Arc<crate::acme::ChallengeManager>>,
115 pub acme_client: Option<Arc<crate::acme::AcmeClient>>,
118}
119
120impl SentinelProxy {
121 pub async fn new(config_path: Option<&str>) -> Result<Self> {
126 info!("Starting Sentinel Proxy");
127
128 let (config, effective_config_path) = match config_path {
130 Some(path) => {
131 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
132 (cfg, path.to_string())
133 }
134 None => {
135 let cfg = Config::default_embedded()
136 .context("Failed to load embedded default configuration")?;
137 (cfg, "_embedded_".to_string())
139 }
140 };
141
142 config
143 .validate()
144 .context("Initial configuration validation failed")?;
145
146 if let Some(ref cache_config) = config.cache {
148 info!(
149 max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
150 backend = ?cache_config.backend,
151 "Configuring HTTP cache storage"
152 );
153 crate::cache::configure_cache(cache_config.clone());
154 }
155
156 let config_manager =
158 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
159
160 config_manager.add_validator(Box::new(RouteValidator)).await;
162 config_manager
163 .add_validator(Box::new(UpstreamValidator))
164 .await;
165
166 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
168
169 let flattened = config.flatten();
171
172 let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
174 ScopedRouteMatcher::from_flattened(&flattened)
175 .await
176 .context("Failed to create scoped route matcher")?,
177 ));
178
179 let mut pools = HashMap::new();
181 let mut health_check_runner = HealthCheckRunner::new();
182
183 for (upstream_id, upstream_config) in &config.upstreams {
184 let mut config_with_id = upstream_config.clone();
185 config_with_id.id = upstream_id.clone();
186 let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
187 pools.insert(upstream_id.clone(), pool);
188
189 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
191 health_check_runner.add_checker(checker);
192 }
193 }
194 let upstream_pools = Registry::from_map(pools);
195
196 let scoped_upstream_pools =
198 Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
199
200 let health_check_runner = Arc::new(health_check_runner);
201
202 let passive_health = Arc::new(PassiveHealthChecker::new(
204 0.5, 100, None, ));
208
209 let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
211 agent_manager.initialize().await?;
212
213 let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
215 let scoped_metrics =
216 Arc::new(ScopedMetrics::new().context("Failed to create scoped metrics collector")?);
217
218 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
220
221 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
223 Duration::from_secs(30), ));
225
226 Self::setup_reload_handler(
228 config_manager.clone(),
229 route_matcher.clone(),
230 upstream_pools.clone(),
231 scoped_route_matcher.clone(),
232 scoped_upstream_pools.clone(),
233 )
234 .await;
235
236 let (error_handlers, validators, static_servers) =
238 Self::initialize_route_components(&config).await?;
239
240 let builtin_state = Arc::new(BuiltinHandlerState::new(
242 env!("CARGO_PKG_VERSION").to_string(),
243 app_state.instance_id.clone(),
244 ));
245
246 let log_manager = match LogManager::new(&config.observability.logging) {
248 Ok(manager) => {
249 if manager.access_log_enabled() {
250 info!("Access logging enabled");
251 }
252 if manager.error_log_enabled() {
253 info!("Error logging enabled");
254 }
255 if manager.audit_log_enabled() {
256 info!("Audit logging enabled");
257 }
258 Arc::new(manager)
259 }
260 Err(e) => {
261 warn!(
262 "Failed to initialize log manager, file logging disabled: {}",
263 e
264 );
265 Arc::new(LogManager::disabled())
266 }
267 };
268
269 {
271 use crate::reload::AuditReloadHook;
272 let audit_hook = AuditReloadHook::new(log_manager.clone());
273 config_manager.add_hook(Box::new(audit_hook)).await;
274 debug!("Registered audit reload hook");
275 }
276
277 if health_check_runner.checker_count() > 0 {
279 let runner = health_check_runner.clone();
280 tokio::spawn(async move {
281 runner.run().await;
282 });
283 info!(
284 "Started active health checking for {} upstreams",
285 health_check_runner.checker_count()
286 );
287 }
288
289 let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
291
292 let inference_rate_limit_manager =
294 Arc::new(Self::initialize_inference_rate_limiters(&config));
295
296 let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
298
299 let guardrail_processor = Arc::new(crate::inference::GuardrailProcessor::new(
301 agent_manager.clone(),
302 ));
303
304 let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
306
307 Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
309
310 Self::spawn_geo_database_watcher(geo_filter_manager.clone());
312
313 app_state.set_ready(true);
315
316 let trace_id_format = config.server.trace_id_format;
318
319 let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
321
322 if let Err(e) = init_fallback_metrics() {
324 warn!("Failed to initialize fallback metrics: {}", e);
325 }
326
327 if let Err(e) = init_model_routing_metrics() {
329 warn!("Failed to initialize model routing metrics: {}", e);
330 }
331
332 Ok(Self {
333 config_manager,
334 route_matcher,
335 scoped_route_matcher,
336 upstream_pools,
337 scoped_upstream_pools,
338 agent_manager,
339 passive_health,
340 metrics,
341 scoped_metrics,
342 app_state,
343 reload_coordinator,
344 error_handlers,
345 validators,
346 static_servers,
347 builtin_state,
348 log_manager,
349 trace_id_format,
350 health_check_runner,
351 rate_limit_manager,
352 cache_manager,
353 geo_filter_manager,
354 inference_rate_limit_manager,
355 warmth_tracker,
356 guardrail_processor,
357 acme_challenges: None,
359 acme_client: None,
360 })
361 }
362
363 async fn setup_reload_handler(
365 config_manager: Arc<ConfigManager>,
366 route_matcher: Arc<RwLock<RouteMatcher>>,
367 upstream_pools: Registry<UpstreamPool>,
368 scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
369 scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
370 ) {
371 let mut reload_rx = config_manager.subscribe();
372 let config_manager_clone = config_manager.clone();
373
374 tokio::spawn(async move {
375 while let Ok(event) = reload_rx.recv().await {
376 if let ReloadEvent::Applied { .. } = event {
377 let new_config = config_manager_clone.current();
379 let flattened = new_config.flatten();
380
381 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
383 *route_matcher.write() = new_matcher;
384 info!("Global routes reloaded successfully");
385 }
386
387 if let Err(e) = scoped_route_matcher
389 .write()
390 .await
391 .load_from_flattened(&flattened)
392 .await
393 {
394 error!("Failed to reload scoped routes: {}", e);
395 } else {
396 info!(
397 "Scoped routes reloaded ({} scopes)",
398 scoped_route_matcher.read().await.scope_count().await
399 );
400 }
401
402 let mut new_pools = HashMap::new();
404 for (upstream_id, upstream_config) in &new_config.upstreams {
405 let mut config_with_id = upstream_config.clone();
406 config_with_id.id = upstream_id.clone();
407 match UpstreamPool::new(config_with_id).await {
408 Ok(pool) => {
409 new_pools.insert(upstream_id.clone(), Arc::new(pool));
410 }
411 Err(e) => {
412 error!("Failed to create upstream pool {}: {}", upstream_id, e);
413 }
414 }
415 }
416
417 let old_pools = upstream_pools.replace(new_pools).await;
419
420 let new_scoped_pools = Self::build_scoped_pools_list(&flattened).await;
422 let old_scoped_pools =
423 scoped_upstream_pools.replace_all(new_scoped_pools).await;
424
425 info!(
426 "Scoped upstream pools reloaded ({} pools)",
427 scoped_upstream_pools.len().await
428 );
429
430 tokio::spawn(async move {
432 tokio::time::sleep(Duration::from_secs(60)).await;
433
434 for (name, pool) in old_pools {
436 info!("Shutting down old global pool: {}", name);
437 pool.shutdown().await;
438 }
439
440 for (name, pool) in old_scoped_pools {
442 info!("Shutting down old scoped pool: {}", name);
443 pool.shutdown().await;
444 }
445 });
446 }
447 }
448 });
449 }
450
451 async fn create_scoped_upstream_pools(
453 flattened: &FlattenedConfig,
454 health_check_runner: &mut HealthCheckRunner,
455 ) -> Result<ScopedRegistry<UpstreamPool>> {
456 let registry = ScopedRegistry::new();
457
458 for (qid, upstream_config) in &flattened.upstreams {
459 let mut config_with_id = upstream_config.clone();
460 config_with_id.id = qid.canonical();
461
462 let pool = Arc::new(
463 UpstreamPool::new(config_with_id.clone())
464 .await
465 .with_context(|| {
466 format!("Failed to create upstream pool '{}'", qid.canonical())
467 })?,
468 );
469
470 let is_exported = flattened
472 .exported_upstreams
473 .contains_key(&upstream_config.id);
474
475 if is_exported {
476 registry.insert_exported(qid.clone(), pool).await;
477 } else {
478 registry.insert(qid.clone(), pool).await;
479 }
480
481 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
483 health_check_runner.add_checker(checker);
484 }
485
486 debug!(
487 upstream_id = %qid.canonical(),
488 scope = ?qid.scope,
489 exported = is_exported,
490 "Created scoped upstream pool"
491 );
492 }
493
494 info!("Created {} scoped upstream pools", registry.len().await);
495
496 Ok(registry)
497 }
498
499 async fn build_scoped_pools_list(
501 flattened: &FlattenedConfig,
502 ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
503 let mut result = Vec::new();
504
505 for (qid, upstream_config) in &flattened.upstreams {
506 let mut config_with_id = upstream_config.clone();
507 config_with_id.id = qid.canonical();
508
509 match UpstreamPool::new(config_with_id).await {
510 Ok(pool) => {
511 let is_exported = flattened
512 .exported_upstreams
513 .contains_key(&upstream_config.id);
514 result.push((qid.clone(), Arc::new(pool), is_exported));
515 }
516 Err(e) => {
517 error!(
518 "Failed to create scoped upstream pool {}: {}",
519 qid.canonical(),
520 e
521 );
522 }
523 }
524 }
525
526 result
527 }
528
529 async fn initialize_route_components(
531 config: &Config,
532 ) -> Result<(
533 Registry<ErrorHandler>,
534 Registry<SchemaValidator>,
535 Registry<StaticFileServer>,
536 )> {
537 let mut error_handlers_map = HashMap::new();
538 let mut validators_map = HashMap::new();
539 let mut static_servers_map = HashMap::new();
540
541 for route in &config.routes {
542 info!(
543 "Initializing components for route: {} with service type: {:?}",
544 route.id, route.service_type
545 );
546
547 if let Some(ref error_config) = route.error_pages {
549 let handler =
550 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
551 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
552 debug!("Initialized error handler for route: {}", route.id);
553 } else {
554 let handler = ErrorHandler::new(route.service_type.clone(), None);
556 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
557 }
558
559 if route.service_type == sentinel_config::ServiceType::Api {
561 if let Some(ref api_schema) = route.api_schema {
562 match SchemaValidator::new(api_schema.clone()) {
563 Ok(validator) => {
564 validators_map.insert(route.id.clone(), Arc::new(validator));
565 info!("Initialized schema validator for route: {}", route.id);
566 }
567 Err(e) => {
568 warn!(
569 "Failed to initialize schema validator for route {}: {}",
570 route.id, e
571 );
572 }
573 }
574 }
575 }
576
577 if route.service_type == sentinel_config::ServiceType::Static {
579 if let Some(ref static_config) = route.static_files {
580 let server = StaticFileServer::new(static_config.clone());
581 static_servers_map.insert(route.id.clone(), Arc::new(server));
582 info!("Initialized static file server for route: {}", route.id);
583 } else {
584 warn!(
585 "Static route {} has no static_files configuration",
586 route.id
587 );
588 }
589 }
590 }
591
592 Ok((
593 Registry::from_map(error_handlers_map),
594 Registry::from_map(validators_map),
595 Registry::from_map(static_servers_map),
596 ))
597 }
598
599 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
601 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
602 }
603
604 fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
606 use sentinel_config::RateLimitAction;
607
608 let manager = if let Some(ref global) = config.rate_limits.global {
610 info!(
611 max_rps = global.max_rps,
612 burst = global.burst,
613 key = ?global.key,
614 "Initializing global rate limiter"
615 );
616 RateLimitManager::with_global_limit(global.max_rps, global.burst)
617 } else {
618 RateLimitManager::new()
619 };
620
621 for route in &config.routes {
622 if let Some(ref rate_limit) = route.policies.rate_limit {
624 let rl_config = RateLimitConfig {
625 max_rps: rate_limit.requests_per_second,
626 burst: rate_limit.burst,
627 key: rate_limit.key.clone(),
628 action: RateLimitAction::Reject,
629 status_code: 429,
630 message: None,
631 backend: sentinel_config::RateLimitBackend::Local,
632 max_delay_ms: 5000, };
634 manager.register_route(&route.id, rl_config);
635 info!(
636 route_id = %route.id,
637 max_rps = rate_limit.requests_per_second,
638 burst = rate_limit.burst,
639 key = ?rate_limit.key,
640 "Registered rate limiter for route"
641 );
642 }
643
644 for filter_id in &route.filters {
646 if let Some(filter_config) = config.filters.get(filter_id) {
647 if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
648 {
649 let rl_config = RateLimitConfig {
650 max_rps: rl_filter.max_rps,
651 burst: rl_filter.burst,
652 key: rl_filter.key.clone(),
653 action: rl_filter.on_limit.clone(),
654 status_code: rl_filter.status_code,
655 message: rl_filter.limit_message.clone(),
656 backend: rl_filter.backend.clone(),
657 max_delay_ms: rl_filter.max_delay_ms,
658 };
659 manager.register_route(&route.id, rl_config);
660 info!(
661 route_id = %route.id,
662 filter_id = %filter_id,
663 max_rps = rl_filter.max_rps,
664 backend = ?rl_filter.backend,
665 "Registered rate limiter from filter for route"
666 );
667 }
668 }
669 }
670 }
671
672 if manager.route_count() > 0 {
673 info!(
674 route_count = manager.route_count(),
675 "Rate limiting initialized"
676 );
677 }
678
679 manager
680 }
681
682 fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
687 let manager = InferenceRateLimitManager::new();
688
689 for route in &config.routes {
690 if route.service_type == sentinel_config::ServiceType::Inference {
692 if let Some(ref inference_config) = route.inference {
693 manager.register_route(&route.id, inference_config);
694 }
695 }
696 }
697
698 if manager.route_count() > 0 {
699 info!(
700 route_count = manager.route_count(),
701 "Inference rate limiting initialized"
702 );
703 }
704
705 manager
706 }
707
708 fn initialize_cache_manager(config: &Config) -> CacheManager {
710 let manager = CacheManager::new();
711
712 let mut enabled_count = 0;
713
714 for route in &config.routes {
715 if route.service_type == sentinel_config::ServiceType::Api {
717 let cache_config = CacheConfig {
718 enabled: false, default_ttl_secs: 60,
720 ..Default::default()
721 };
722 manager.register_route(&route.id, cache_config);
723 }
724
725 if route.service_type == sentinel_config::ServiceType::Static {
727 let cache_config = CacheConfig {
728 enabled: true, default_ttl_secs: 3600,
730 max_size_bytes: 50 * 1024 * 1024, stale_while_revalidate_secs: 60,
732 stale_if_error_secs: 300,
733 ..Default::default()
734 };
735 manager.register_route(&route.id, cache_config);
736 enabled_count += 1;
737 info!(
738 route_id = %route.id,
739 default_ttl_secs = 3600,
740 "HTTP caching enabled for static route"
741 );
742 }
743
744 if route.service_type == sentinel_config::ServiceType::Web {
746 let cache_config = CacheConfig {
747 enabled: false, default_ttl_secs: 300,
749 ..Default::default()
750 };
751 manager.register_route(&route.id, cache_config);
752 }
753 }
754
755 if enabled_count > 0 {
756 info!(enabled_routes = enabled_count, "HTTP caching initialized");
757 } else {
758 debug!("HTTP cache manager initialized (no routes with caching enabled)");
759 }
760
761 manager
762 }
763
764 fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
766 let manager = GeoFilterManager::new();
767
768 for (filter_id, filter_config) in &config.filters {
769 if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
770 match manager.register_filter(filter_id, geo_filter.clone()) {
771 Ok(_) => {
772 info!(
773 filter_id = %filter_id,
774 database_path = %geo_filter.database_path,
775 action = ?geo_filter.action,
776 countries_count = geo_filter.countries.len(),
777 "Registered geo filter"
778 );
779 }
780 Err(e) => {
781 error!(
782 filter_id = %filter_id,
783 error = %e,
784 "Failed to register geo filter"
785 );
786 }
787 }
788 }
789 }
790
791 let filter_ids = manager.filter_ids();
792 if !filter_ids.is_empty() {
793 info!(
794 filter_count = filter_ids.len(),
795 filter_ids = ?filter_ids,
796 "GeoIP filtering initialized"
797 );
798 }
799
800 manager
801 }
802
803 pub(super) fn apply_security_headers(
805 &self,
806 header: &mut ResponseHeader,
807 ) -> Result<(), Box<Error>> {
808 header.insert_header("X-Content-Type-Options", "nosniff")?;
809 header.insert_header("X-Frame-Options", "DENY")?;
810 header.insert_header("X-XSS-Protection", "1; mode=block")?;
811 header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
812 header.remove_header("Server");
813 header.remove_header("X-Powered-By");
814 Ok(())
815 }
816
817 fn spawn_cleanup_task(
819 rate_limit_manager: Arc<RateLimitManager>,
820 geo_filter_manager: Arc<GeoFilterManager>,
821 ) {
822 const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
824
825 tokio::spawn(async move {
826 let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
827 interval.tick().await;
829
830 loop {
831 interval.tick().await;
832
833 rate_limit_manager.cleanup();
835
836 geo_filter_manager.clear_expired_caches();
838
839 debug!("Periodic cleanup completed");
840 }
841 });
842
843 info!(
844 interval_secs = CLEANUP_INTERVAL.as_secs(),
845 "Started periodic cleanup task"
846 );
847 }
848
849 fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
851 let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
852
853 match watcher.start_watching() {
855 Ok(mut rx) => {
856 let watcher_clone = watcher.clone();
857 tokio::spawn(async move {
858 const DEBOUNCE_MS: u64 = 500;
860
861 while let Some(path) = rx.recv().await {
862 tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
864
865 while rx.try_recv().is_ok() {}
867
868 watcher_clone.handle_change(&path);
870 }
871 });
872
873 info!("Started geo database file watcher");
874 }
875 Err(e) => {
876 warn!(
877 error = %e,
878 "Failed to start geo database file watcher, auto-reload disabled"
879 );
880 }
881 }
882 }
883}