1mod context;
11mod fallback;
12mod fallback_metrics;
13mod handlers;
14mod http_trait;
15mod model_routing;
16mod model_routing_metrics;
17
18pub use context::{FallbackReason, RequestContext};
19pub use fallback::{FallbackDecision, FallbackEvaluator};
20pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
21pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
22pub use model_routing_metrics::{
23 get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
24};
25
26use anyhow::{Context, Result};
27use parking_lot::RwLock;
28use pingora::http::ResponseHeader;
29use pingora::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use sentinel_common::ids::{QualifiedId, Scope};
37use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
38
39use crate::agents::AgentManager;
40use crate::app::AppState;
41use crate::builtin_handlers::BuiltinHandlerState;
42use crate::cache::{CacheConfig, CacheManager};
43use crate::errors::ErrorHandler;
44use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
45use crate::health::PassiveHealthChecker;
46use crate::http_helpers;
47use crate::inference::InferenceRateLimitManager;
48use crate::logging::{LogManager, SharedLogManager};
49use crate::rate_limit::{RateLimitConfig, RateLimitManager};
50use crate::reload::{
51 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
52};
53use crate::routing::RouteMatcher;
54use crate::scoped_routing::ScopedRouteMatcher;
55use crate::static_files::StaticFileServer;
56use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
57use crate::validation::SchemaValidator;
58
59use sentinel_common::TraceIdFormat;
60use sentinel_config::{Config, FlattenedConfig};
61
62pub struct SentinelProxy {
64 pub config_manager: Arc<ConfigManager>,
66 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
68 pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
70 pub(super) upstream_pools: Registry<UpstreamPool>,
72 pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
74 pub(super) agent_manager: Arc<AgentManager>,
76 pub(super) passive_health: Arc<PassiveHealthChecker>,
78 pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
80 pub(super) scoped_metrics: Arc<ScopedMetrics>,
82 pub(super) app_state: Arc<AppState>,
84 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
86 pub(super) error_handlers: Registry<ErrorHandler>,
88 pub(super) validators: Registry<SchemaValidator>,
90 pub(super) static_servers: Registry<StaticFileServer>,
92 pub(super) builtin_state: Arc<BuiltinHandlerState>,
94 pub(super) log_manager: SharedLogManager,
96 pub(super) trace_id_format: TraceIdFormat,
98 pub(super) health_check_runner: Arc<HealthCheckRunner>,
100 pub(super) rate_limit_manager: Arc<RateLimitManager>,
102 pub(super) cache_manager: Arc<CacheManager>,
104 pub(super) geo_filter_manager: Arc<GeoFilterManager>,
106 pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
108 pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
110 pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
112 pub acme_challenges: Option<Arc<crate::acme::ChallengeManager>>,
115 pub acme_client: Option<Arc<crate::acme::AcmeClient>>,
118}
119
120impl SentinelProxy {
121 pub async fn new(config_path: Option<&str>) -> Result<Self> {
126 info!("Starting Sentinel Proxy");
127
128 let (config, effective_config_path) = match config_path {
130 Some(path) => {
131 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
132 (cfg, path.to_string())
133 }
134 None => {
135 let cfg = Config::default_embedded()
136 .context("Failed to load embedded default configuration")?;
137 (cfg, "_embedded_".to_string())
139 }
140 };
141
142 config
143 .validate()
144 .context("Initial configuration validation failed")?;
145
146 if let Some(ref cache_config) = config.cache {
148 info!(
149 max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
150 backend = ?cache_config.backend,
151 "Configuring HTTP cache storage"
152 );
153 crate::cache::configure_cache(cache_config.clone());
154 }
155
156 let config_manager =
158 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
159
160 config_manager.add_validator(Box::new(RouteValidator)).await;
162 config_manager
163 .add_validator(Box::new(UpstreamValidator))
164 .await;
165
166 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
168
169 let flattened = config.flatten();
171
172 let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
174 ScopedRouteMatcher::from_flattened(&flattened)
175 .await
176 .context("Failed to create scoped route matcher")?,
177 ));
178
179 let mut pools = HashMap::new();
181 let mut health_check_runner = HealthCheckRunner::new();
182
183 for (upstream_id, upstream_config) in &config.upstreams {
184 let mut config_with_id = upstream_config.clone();
185 config_with_id.id = upstream_id.clone();
186 let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
187 pools.insert(upstream_id.clone(), pool);
188
189 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
191 health_check_runner.add_checker(checker);
192 }
193 }
194 let upstream_pools = Registry::from_map(pools);
195
196 let scoped_upstream_pools =
198 Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
199
200 let health_check_runner = Arc::new(health_check_runner);
201
202 let passive_health = Arc::new(PassiveHealthChecker::new(
204 0.5, 100, None, ));
208
209 let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
211 agent_manager.initialize().await?;
212
213 let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
215 let scoped_metrics = Arc::new(
216 ScopedMetrics::new().context("Failed to create scoped metrics collector")?,
217 );
218
219 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
221
222 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
224 Duration::from_secs(30), ));
226
227 Self::setup_reload_handler(
229 config_manager.clone(),
230 route_matcher.clone(),
231 upstream_pools.clone(),
232 scoped_route_matcher.clone(),
233 scoped_upstream_pools.clone(),
234 )
235 .await;
236
237 let (error_handlers, validators, static_servers) =
239 Self::initialize_route_components(&config).await?;
240
241 let builtin_state = Arc::new(BuiltinHandlerState::new(
243 env!("CARGO_PKG_VERSION").to_string(),
244 app_state.instance_id.clone(),
245 ));
246
247 let log_manager = match LogManager::new(&config.observability.logging) {
249 Ok(manager) => {
250 if manager.access_log_enabled() {
251 info!("Access logging enabled");
252 }
253 if manager.error_log_enabled() {
254 info!("Error logging enabled");
255 }
256 if manager.audit_log_enabled() {
257 info!("Audit logging enabled");
258 }
259 Arc::new(manager)
260 }
261 Err(e) => {
262 warn!(
263 "Failed to initialize log manager, file logging disabled: {}",
264 e
265 );
266 Arc::new(LogManager::disabled())
267 }
268 };
269
270 {
272 use crate::reload::AuditReloadHook;
273 let audit_hook = AuditReloadHook::new(log_manager.clone());
274 config_manager.add_hook(Box::new(audit_hook)).await;
275 debug!("Registered audit reload hook");
276 }
277
278 if health_check_runner.checker_count() > 0 {
280 let runner = health_check_runner.clone();
281 tokio::spawn(async move {
282 runner.run().await;
283 });
284 info!(
285 "Started active health checking for {} upstreams",
286 health_check_runner.checker_count()
287 );
288 }
289
290 let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
292
293 let inference_rate_limit_manager =
295 Arc::new(Self::initialize_inference_rate_limiters(&config));
296
297 let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
299
300 let guardrail_processor =
302 Arc::new(crate::inference::GuardrailProcessor::new(agent_manager.clone()));
303
304 let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
306
307 Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
309
310 Self::spawn_geo_database_watcher(geo_filter_manager.clone());
312
313 app_state.set_ready(true);
315
316 let trace_id_format = config.server.trace_id_format;
318
319 let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
321
322 if let Err(e) = init_fallback_metrics() {
324 warn!("Failed to initialize fallback metrics: {}", e);
325 }
326
327 if let Err(e) = init_model_routing_metrics() {
329 warn!("Failed to initialize model routing metrics: {}", e);
330 }
331
332 Ok(Self {
333 config_manager,
334 route_matcher,
335 scoped_route_matcher,
336 upstream_pools,
337 scoped_upstream_pools,
338 agent_manager,
339 passive_health,
340 metrics,
341 scoped_metrics,
342 app_state,
343 reload_coordinator,
344 error_handlers,
345 validators,
346 static_servers,
347 builtin_state,
348 log_manager,
349 trace_id_format,
350 health_check_runner,
351 rate_limit_manager,
352 cache_manager,
353 geo_filter_manager,
354 inference_rate_limit_manager,
355 warmth_tracker,
356 guardrail_processor,
357 acme_challenges: None,
359 acme_client: None,
360 })
361 }
362
363 async fn setup_reload_handler(
365 config_manager: Arc<ConfigManager>,
366 route_matcher: Arc<RwLock<RouteMatcher>>,
367 upstream_pools: Registry<UpstreamPool>,
368 scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
369 scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
370 ) {
371 let mut reload_rx = config_manager.subscribe();
372 let config_manager_clone = config_manager.clone();
373
374 tokio::spawn(async move {
375 while let Ok(event) = reload_rx.recv().await {
376 if let ReloadEvent::Applied { .. } = event {
377 let new_config = config_manager_clone.current();
379 let flattened = new_config.flatten();
380
381 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
383 *route_matcher.write() = new_matcher;
384 info!("Global routes reloaded successfully");
385 }
386
387 if let Err(e) = scoped_route_matcher
389 .write()
390 .await
391 .load_from_flattened(&flattened)
392 .await
393 {
394 error!("Failed to reload scoped routes: {}", e);
395 } else {
396 info!(
397 "Scoped routes reloaded ({} scopes)",
398 scoped_route_matcher.read().await.scope_count().await
399 );
400 }
401
402 let mut new_pools = HashMap::new();
404 for (upstream_id, upstream_config) in &new_config.upstreams {
405 let mut config_with_id = upstream_config.clone();
406 config_with_id.id = upstream_id.clone();
407 match UpstreamPool::new(config_with_id).await {
408 Ok(pool) => {
409 new_pools.insert(upstream_id.clone(), Arc::new(pool));
410 }
411 Err(e) => {
412 error!("Failed to create upstream pool {}: {}", upstream_id, e);
413 }
414 }
415 }
416
417 let old_pools = upstream_pools.replace(new_pools).await;
419
420 let new_scoped_pools =
422 Self::build_scoped_pools_list(&flattened).await;
423 let old_scoped_pools = scoped_upstream_pools.replace_all(new_scoped_pools).await;
424
425 info!(
426 "Scoped upstream pools reloaded ({} pools)",
427 scoped_upstream_pools.len().await
428 );
429
430 tokio::spawn(async move {
432 tokio::time::sleep(Duration::from_secs(60)).await;
433
434 for (name, pool) in old_pools {
436 info!("Shutting down old global pool: {}", name);
437 pool.shutdown().await;
438 }
439
440 for (name, pool) in old_scoped_pools {
442 info!("Shutting down old scoped pool: {}", name);
443 pool.shutdown().await;
444 }
445 });
446 }
447 }
448 });
449 }
450
451 async fn create_scoped_upstream_pools(
453 flattened: &FlattenedConfig,
454 health_check_runner: &mut HealthCheckRunner,
455 ) -> Result<ScopedRegistry<UpstreamPool>> {
456 let registry = ScopedRegistry::new();
457
458 for (qid, upstream_config) in &flattened.upstreams {
459 let mut config_with_id = upstream_config.clone();
460 config_with_id.id = qid.canonical();
461
462 let pool = Arc::new(
463 UpstreamPool::new(config_with_id.clone())
464 .await
465 .with_context(|| format!("Failed to create upstream pool '{}'", qid.canonical()))?,
466 );
467
468 let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
470
471 if is_exported {
472 registry.insert_exported(qid.clone(), pool).await;
473 } else {
474 registry.insert(qid.clone(), pool).await;
475 }
476
477 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
479 health_check_runner.add_checker(checker);
480 }
481
482 debug!(
483 upstream_id = %qid.canonical(),
484 scope = ?qid.scope,
485 exported = is_exported,
486 "Created scoped upstream pool"
487 );
488 }
489
490 info!(
491 "Created {} scoped upstream pools",
492 registry.len().await
493 );
494
495 Ok(registry)
496 }
497
498 async fn build_scoped_pools_list(
500 flattened: &FlattenedConfig,
501 ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
502 let mut result = Vec::new();
503
504 for (qid, upstream_config) in &flattened.upstreams {
505 let mut config_with_id = upstream_config.clone();
506 config_with_id.id = qid.canonical();
507
508 match UpstreamPool::new(config_with_id).await {
509 Ok(pool) => {
510 let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
511 result.push((qid.clone(), Arc::new(pool), is_exported));
512 }
513 Err(e) => {
514 error!(
515 "Failed to create scoped upstream pool {}: {}",
516 qid.canonical(),
517 e
518 );
519 }
520 }
521 }
522
523 result
524 }
525
526 async fn initialize_route_components(
528 config: &Config,
529 ) -> Result<(
530 Registry<ErrorHandler>,
531 Registry<SchemaValidator>,
532 Registry<StaticFileServer>,
533 )> {
534 let mut error_handlers_map = HashMap::new();
535 let mut validators_map = HashMap::new();
536 let mut static_servers_map = HashMap::new();
537
538 for route in &config.routes {
539 info!(
540 "Initializing components for route: {} with service type: {:?}",
541 route.id, route.service_type
542 );
543
544 if let Some(ref error_config) = route.error_pages {
546 let handler =
547 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
548 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
549 debug!("Initialized error handler for route: {}", route.id);
550 } else {
551 let handler = ErrorHandler::new(route.service_type.clone(), None);
553 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
554 }
555
556 if route.service_type == sentinel_config::ServiceType::Api {
558 if let Some(ref api_schema) = route.api_schema {
559 match SchemaValidator::new(api_schema.clone()) {
560 Ok(validator) => {
561 validators_map.insert(route.id.clone(), Arc::new(validator));
562 info!("Initialized schema validator for route: {}", route.id);
563 }
564 Err(e) => {
565 warn!(
566 "Failed to initialize schema validator for route {}: {}",
567 route.id, e
568 );
569 }
570 }
571 }
572 }
573
574 if route.service_type == sentinel_config::ServiceType::Static {
576 if let Some(ref static_config) = route.static_files {
577 let server = StaticFileServer::new(static_config.clone());
578 static_servers_map.insert(route.id.clone(), Arc::new(server));
579 info!("Initialized static file server for route: {}", route.id);
580 } else {
581 warn!(
582 "Static route {} has no static_files configuration",
583 route.id
584 );
585 }
586 }
587 }
588
589 Ok((
590 Registry::from_map(error_handlers_map),
591 Registry::from_map(validators_map),
592 Registry::from_map(static_servers_map),
593 ))
594 }
595
596 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
598 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
599 }
600
601 fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
603 use sentinel_config::RateLimitAction;
604
605 let manager = if let Some(ref global) = config.rate_limits.global {
607 info!(
608 max_rps = global.max_rps,
609 burst = global.burst,
610 key = ?global.key,
611 "Initializing global rate limiter"
612 );
613 RateLimitManager::with_global_limit(global.max_rps, global.burst)
614 } else {
615 RateLimitManager::new()
616 };
617
618 for route in &config.routes {
619 if let Some(ref rate_limit) = route.policies.rate_limit {
621 let rl_config = RateLimitConfig {
622 max_rps: rate_limit.requests_per_second,
623 burst: rate_limit.burst,
624 key: rate_limit.key.clone(),
625 action: RateLimitAction::Reject,
626 status_code: 429,
627 message: None,
628 backend: sentinel_config::RateLimitBackend::Local,
629 max_delay_ms: 5000, };
631 manager.register_route(&route.id, rl_config);
632 info!(
633 route_id = %route.id,
634 max_rps = rate_limit.requests_per_second,
635 burst = rate_limit.burst,
636 key = ?rate_limit.key,
637 "Registered rate limiter for route"
638 );
639 }
640
641 for filter_id in &route.filters {
643 if let Some(filter_config) = config.filters.get(filter_id) {
644 if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
645 {
646 let rl_config = RateLimitConfig {
647 max_rps: rl_filter.max_rps,
648 burst: rl_filter.burst,
649 key: rl_filter.key.clone(),
650 action: rl_filter.on_limit.clone(),
651 status_code: rl_filter.status_code,
652 message: rl_filter.limit_message.clone(),
653 backend: rl_filter.backend.clone(),
654 max_delay_ms: rl_filter.max_delay_ms,
655 };
656 manager.register_route(&route.id, rl_config);
657 info!(
658 route_id = %route.id,
659 filter_id = %filter_id,
660 max_rps = rl_filter.max_rps,
661 backend = ?rl_filter.backend,
662 "Registered rate limiter from filter for route"
663 );
664 }
665 }
666 }
667 }
668
669 if manager.route_count() > 0 {
670 info!(
671 route_count = manager.route_count(),
672 "Rate limiting initialized"
673 );
674 }
675
676 manager
677 }
678
679 fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
684 let manager = InferenceRateLimitManager::new();
685
686 for route in &config.routes {
687 if route.service_type == sentinel_config::ServiceType::Inference {
689 if let Some(ref inference_config) = route.inference {
690 manager.register_route(&route.id, inference_config);
691 }
692 }
693 }
694
695 if manager.route_count() > 0 {
696 info!(
697 route_count = manager.route_count(),
698 "Inference rate limiting initialized"
699 );
700 }
701
702 manager
703 }
704
705 fn initialize_cache_manager(config: &Config) -> CacheManager {
707 let manager = CacheManager::new();
708
709 let mut enabled_count = 0;
710
711 for route in &config.routes {
712 if route.service_type == sentinel_config::ServiceType::Api {
714 let cache_config = CacheConfig {
715 enabled: false, default_ttl_secs: 60,
717 ..Default::default()
718 };
719 manager.register_route(&route.id, cache_config);
720 }
721
722 if route.service_type == sentinel_config::ServiceType::Static {
724 let cache_config = CacheConfig {
725 enabled: true, default_ttl_secs: 3600,
727 max_size_bytes: 50 * 1024 * 1024, stale_while_revalidate_secs: 60,
729 stale_if_error_secs: 300,
730 ..Default::default()
731 };
732 manager.register_route(&route.id, cache_config);
733 enabled_count += 1;
734 info!(
735 route_id = %route.id,
736 default_ttl_secs = 3600,
737 "HTTP caching enabled for static route"
738 );
739 }
740
741 if route.service_type == sentinel_config::ServiceType::Web {
743 let cache_config = CacheConfig {
744 enabled: false, default_ttl_secs: 300,
746 ..Default::default()
747 };
748 manager.register_route(&route.id, cache_config);
749 }
750 }
751
752 if enabled_count > 0 {
753 info!(enabled_routes = enabled_count, "HTTP caching initialized");
754 } else {
755 debug!("HTTP cache manager initialized (no routes with caching enabled)");
756 }
757
758 manager
759 }
760
761 fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
763 let manager = GeoFilterManager::new();
764
765 for (filter_id, filter_config) in &config.filters {
766 if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
767 match manager.register_filter(filter_id, geo_filter.clone()) {
768 Ok(_) => {
769 info!(
770 filter_id = %filter_id,
771 database_path = %geo_filter.database_path,
772 action = ?geo_filter.action,
773 countries_count = geo_filter.countries.len(),
774 "Registered geo filter"
775 );
776 }
777 Err(e) => {
778 error!(
779 filter_id = %filter_id,
780 error = %e,
781 "Failed to register geo filter"
782 );
783 }
784 }
785 }
786 }
787
788 let filter_ids = manager.filter_ids();
789 if !filter_ids.is_empty() {
790 info!(
791 filter_count = filter_ids.len(),
792 filter_ids = ?filter_ids,
793 "GeoIP filtering initialized"
794 );
795 }
796
797 manager
798 }
799
800 pub(super) fn apply_security_headers(
802 &self,
803 header: &mut ResponseHeader,
804 ) -> Result<(), Box<Error>> {
805 header.insert_header("X-Content-Type-Options", "nosniff")?;
806 header.insert_header("X-Frame-Options", "DENY")?;
807 header.insert_header("X-XSS-Protection", "1; mode=block")?;
808 header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
809 header.remove_header("Server");
810 header.remove_header("X-Powered-By");
811 Ok(())
812 }
813
814 fn spawn_cleanup_task(
816 rate_limit_manager: Arc<RateLimitManager>,
817 geo_filter_manager: Arc<GeoFilterManager>,
818 ) {
819 const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
821
822 tokio::spawn(async move {
823 let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
824 interval.tick().await;
826
827 loop {
828 interval.tick().await;
829
830 rate_limit_manager.cleanup();
832
833 geo_filter_manager.clear_expired_caches();
835
836 debug!("Periodic cleanup completed");
837 }
838 });
839
840 info!(
841 interval_secs = CLEANUP_INTERVAL.as_secs(),
842 "Started periodic cleanup task"
843 );
844 }
845
846 fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
848 let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
849
850 match watcher.start_watching() {
852 Ok(mut rx) => {
853 let watcher_clone = watcher.clone();
854 tokio::spawn(async move {
855 const DEBOUNCE_MS: u64 = 500;
857
858 while let Some(path) = rx.recv().await {
859 tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
861
862 while rx.try_recv().is_ok() {}
864
865 watcher_clone.handle_change(&path);
867 }
868 });
869
870 info!("Started geo database file watcher");
871 }
872 Err(e) => {
873 warn!(
874 error = %e,
875 "Failed to start geo database file watcher, auto-reload disabled"
876 );
877 }
878 }
879 }
880}