1mod context;
11mod fallback;
12mod fallback_metrics;
13pub(crate) mod filters;
14mod handlers;
15mod http_trait;
16mod model_routing;
17mod model_routing_metrics;
18
19pub use context::{FallbackReason, RequestContext};
20pub use fallback::{FallbackDecision, FallbackEvaluator};
21pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
22pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
23pub use model_routing_metrics::{
24 get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
25};
26
27use anyhow::{Context, Result};
28use parking_lot::RwLock;
29use pingora::http::ResponseHeader;
30use pingora::prelude::*;
31use std::collections::HashMap;
32use std::sync::Arc;
33use std::time::Duration;
34use tracing::{debug, error, info, warn};
35use uuid::Uuid;
36
37use grapsus_common::ids::{QualifiedId, Scope};
38use grapsus_common::{Registry, ScopedMetrics, ScopedRegistry};
39
40use crate::agents::AgentManager;
41use crate::app::AppState;
42use crate::builtin_handlers::BuiltinHandlerState;
43use crate::cache::{CacheConfig, CacheManager};
44use crate::errors::ErrorHandler;
45use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
46use crate::health::PassiveHealthChecker;
47use crate::http_helpers;
48use crate::inference::InferenceRateLimitManager;
49use crate::logging::{LogManager, SharedLogManager};
50use crate::rate_limit::{RateLimitConfig, RateLimitManager};
51use crate::reload::{
52 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
53};
54use crate::routing::RouteMatcher;
55use crate::scoped_routing::ScopedRouteMatcher;
56use crate::static_files::StaticFileServer;
57use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
58use crate::validation::SchemaValidator;
59
60use grapsus_common::TraceIdFormat;
61use grapsus_config::{Config, FlattenedConfig};
62
63pub struct GrapsusProxy {
65 pub config_manager: Arc<ConfigManager>,
67 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
69 pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
71 pub(super) upstream_pools: Registry<UpstreamPool>,
73 pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
75 pub(super) agent_manager: Arc<AgentManager>,
77 pub(super) passive_health: Arc<PassiveHealthChecker>,
79 pub(super) metrics: Arc<grapsus_common::observability::RequestMetrics>,
81 pub(super) scoped_metrics: Arc<ScopedMetrics>,
83 pub(super) app_state: Arc<AppState>,
85 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
87 pub(super) error_handlers: Registry<ErrorHandler>,
89 pub(super) validators: Registry<SchemaValidator>,
91 pub(super) static_servers: Registry<StaticFileServer>,
93 pub(super) builtin_state: Arc<BuiltinHandlerState>,
95 pub(super) log_manager: SharedLogManager,
97 pub(super) trace_id_format: TraceIdFormat,
99 pub(super) health_check_runner: Arc<HealthCheckRunner>,
101 pub(super) rate_limit_manager: Arc<RateLimitManager>,
103 pub(super) cache_manager: Arc<CacheManager>,
105 pub(super) geo_filter_manager: Arc<GeoFilterManager>,
107 pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
109 pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
111 pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
113 pub acme_challenges: Option<Arc<crate::acme::ChallengeManager>>,
116 pub acme_client: Option<Arc<crate::acme::AcmeClient>>,
119}
120
121impl GrapsusProxy {
122 pub async fn new(config_path: Option<&str>) -> Result<Self> {
127 info!("Starting Grapsus Proxy");
128
129 let (config, effective_config_path) = match config_path {
131 Some(path) => {
132 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
133 (cfg, path.to_string())
134 }
135 None => {
136 let cfg = Config::default_embedded()
137 .context("Failed to load embedded default configuration")?;
138 (cfg, "_embedded_".to_string())
140 }
141 };
142
143 config
144 .validate()
145 .context("Initial configuration validation failed")?;
146
147 if let Some(ref cache_config) = config.cache {
149 info!(
150 max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
151 backend = ?cache_config.backend,
152 "Configuring HTTP cache storage"
153 );
154 crate::cache::configure_cache(cache_config.clone());
155 crate::cache::init_disk_cache_state().await;
156 }
157
158 let config_manager =
160 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
161
162 config_manager.add_validator(Box::new(RouteValidator)).await;
164 config_manager
165 .add_validator(Box::new(UpstreamValidator))
166 .await;
167
168 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
170
171 let flattened = config.flatten();
173
174 let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
176 ScopedRouteMatcher::from_flattened(&flattened)
177 .await
178 .context("Failed to create scoped route matcher")?,
179 ));
180
181 let mut pools = HashMap::new();
183 let mut health_check_runner = HealthCheckRunner::new();
184
185 for (upstream_id, upstream_config) in &config.upstreams {
186 let mut config_with_id = upstream_config.clone();
187 config_with_id.id = upstream_id.clone();
188 let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
189 pools.insert(upstream_id.clone(), pool);
190
191 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
193 health_check_runner.add_checker(checker);
194 }
195 }
196 let upstream_pools = Registry::from_map(pools);
197
198 let scoped_upstream_pools =
200 Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
201
202 let health_check_runner = Arc::new(health_check_runner);
203
204 let passive_health = Arc::new(PassiveHealthChecker::new(
206 0.5, 100, None, ));
210
211 let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
213 agent_manager.initialize().await?;
214
215 let metrics = Arc::new(grapsus_common::observability::RequestMetrics::new()?);
217 let scoped_metrics =
218 Arc::new(ScopedMetrics::new().context("Failed to create scoped metrics collector")?);
219
220 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
222
223 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
225 Duration::from_secs(30), ));
227
228 Self::setup_reload_handler(
230 config_manager.clone(),
231 route_matcher.clone(),
232 upstream_pools.clone(),
233 scoped_route_matcher.clone(),
234 scoped_upstream_pools.clone(),
235 )
236 .await;
237
238 let (error_handlers, validators, static_servers) =
240 Self::initialize_route_components(&config).await?;
241
242 let builtin_state = Arc::new(BuiltinHandlerState::new(
244 env!("CARGO_PKG_VERSION").to_string(),
245 app_state.instance_id.clone(),
246 ));
247
248 let log_manager = match LogManager::new(&config.observability.logging) {
250 Ok(manager) => {
251 if manager.access_log_enabled() {
252 info!("Access logging enabled");
253 }
254 if manager.error_log_enabled() {
255 info!("Error logging enabled");
256 }
257 if manager.audit_log_enabled() {
258 info!("Audit logging enabled");
259 }
260 Arc::new(manager)
261 }
262 Err(e) => {
263 warn!(
264 "Failed to initialize log manager, file logging disabled: {}",
265 e
266 );
267 Arc::new(LogManager::disabled())
268 }
269 };
270
271 {
273 use crate::reload::AuditReloadHook;
274 let audit_hook = AuditReloadHook::new(log_manager.clone());
275 config_manager.add_hook(Box::new(audit_hook)).await;
276 debug!("Registered audit reload hook");
277 }
278
279 if health_check_runner.checker_count() > 0 {
281 let runner = health_check_runner.clone();
282 tokio::spawn(async move {
283 runner.run().await;
284 });
285 info!(
286 "Started active health checking for {} upstreams",
287 health_check_runner.checker_count()
288 );
289 }
290
291 let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
293
294 let inference_rate_limit_manager =
296 Arc::new(Self::initialize_inference_rate_limiters(&config));
297
298 let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
300
301 let guardrail_processor = Arc::new(crate::inference::GuardrailProcessor::new(
303 agent_manager.clone(),
304 ));
305
306 let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
308
309 Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
311
312 Self::spawn_geo_database_watcher(geo_filter_manager.clone());
314
315 app_state.set_ready(true);
317
318 let trace_id_format = config.server.trace_id_format;
320
321 let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
323
324 if let Err(e) = init_fallback_metrics() {
326 warn!("Failed to initialize fallback metrics: {}", e);
327 }
328
329 if let Err(e) = init_model_routing_metrics() {
331 warn!("Failed to initialize model routing metrics: {}", e);
332 }
333
334 Ok(Self {
335 config_manager,
336 route_matcher,
337 scoped_route_matcher,
338 upstream_pools,
339 scoped_upstream_pools,
340 agent_manager,
341 passive_health,
342 metrics,
343 scoped_metrics,
344 app_state,
345 reload_coordinator,
346 error_handlers,
347 validators,
348 static_servers,
349 builtin_state,
350 log_manager,
351 trace_id_format,
352 health_check_runner,
353 rate_limit_manager,
354 cache_manager,
355 geo_filter_manager,
356 inference_rate_limit_manager,
357 warmth_tracker,
358 guardrail_processor,
359 acme_challenges: None,
361 acme_client: None,
362 })
363 }
364
365 async fn setup_reload_handler(
367 config_manager: Arc<ConfigManager>,
368 route_matcher: Arc<RwLock<RouteMatcher>>,
369 upstream_pools: Registry<UpstreamPool>,
370 scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
371 scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
372 ) {
373 let mut reload_rx = config_manager.subscribe();
374 let config_manager_clone = config_manager.clone();
375
376 tokio::spawn(async move {
377 while let Ok(event) = reload_rx.recv().await {
378 if let ReloadEvent::Applied { .. } = event {
379 let new_config = config_manager_clone.current();
381 let flattened = new_config.flatten();
382
383 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
385 *route_matcher.write() = new_matcher;
386 info!("Global routes reloaded successfully");
387 }
388
389 if let Err(e) = scoped_route_matcher
391 .write()
392 .await
393 .load_from_flattened(&flattened)
394 .await
395 {
396 error!("Failed to reload scoped routes: {}", e);
397 } else {
398 info!(
399 "Scoped routes reloaded ({} scopes)",
400 scoped_route_matcher.read().await.scope_count().await
401 );
402 }
403
404 let mut new_pools = HashMap::new();
406 for (upstream_id, upstream_config) in &new_config.upstreams {
407 let mut config_with_id = upstream_config.clone();
408 config_with_id.id = upstream_id.clone();
409 match UpstreamPool::new(config_with_id).await {
410 Ok(pool) => {
411 new_pools.insert(upstream_id.clone(), Arc::new(pool));
412 }
413 Err(e) => {
414 error!("Failed to create upstream pool {}: {}", upstream_id, e);
415 }
416 }
417 }
418
419 let old_pools = upstream_pools.replace(new_pools).await;
421
422 let new_scoped_pools = Self::build_scoped_pools_list(&flattened).await;
424 let old_scoped_pools =
425 scoped_upstream_pools.replace_all(new_scoped_pools).await;
426
427 info!(
428 "Scoped upstream pools reloaded ({} pools)",
429 scoped_upstream_pools.len().await
430 );
431
432 tokio::spawn(async move {
434 tokio::time::sleep(Duration::from_secs(60)).await;
435
436 for (name, pool) in old_pools {
438 info!("Shutting down old global pool: {}", name);
439 pool.shutdown().await;
440 }
441
442 for (name, pool) in old_scoped_pools {
444 info!("Shutting down old scoped pool: {}", name);
445 pool.shutdown().await;
446 }
447 });
448 }
449 }
450 });
451 }
452
453 async fn create_scoped_upstream_pools(
455 flattened: &FlattenedConfig,
456 health_check_runner: &mut HealthCheckRunner,
457 ) -> Result<ScopedRegistry<UpstreamPool>> {
458 let registry = ScopedRegistry::new();
459
460 for (qid, upstream_config) in &flattened.upstreams {
461 let mut config_with_id = upstream_config.clone();
462 config_with_id.id = qid.canonical();
463
464 let pool = Arc::new(
465 UpstreamPool::new(config_with_id.clone())
466 .await
467 .with_context(|| {
468 format!("Failed to create upstream pool '{}'", qid.canonical())
469 })?,
470 );
471
472 let is_exported = flattened
474 .exported_upstreams
475 .contains_key(&upstream_config.id);
476
477 if is_exported {
478 registry.insert_exported(qid.clone(), pool).await;
479 } else {
480 registry.insert(qid.clone(), pool).await;
481 }
482
483 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
485 health_check_runner.add_checker(checker);
486 }
487
488 debug!(
489 upstream_id = %qid.canonical(),
490 scope = ?qid.scope,
491 exported = is_exported,
492 "Created scoped upstream pool"
493 );
494 }
495
496 info!("Created {} scoped upstream pools", registry.len().await);
497
498 Ok(registry)
499 }
500
501 async fn build_scoped_pools_list(
503 flattened: &FlattenedConfig,
504 ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
505 let mut result = Vec::new();
506
507 for (qid, upstream_config) in &flattened.upstreams {
508 let mut config_with_id = upstream_config.clone();
509 config_with_id.id = qid.canonical();
510
511 match UpstreamPool::new(config_with_id).await {
512 Ok(pool) => {
513 let is_exported = flattened
514 .exported_upstreams
515 .contains_key(&upstream_config.id);
516 result.push((qid.clone(), Arc::new(pool), is_exported));
517 }
518 Err(e) => {
519 error!(
520 "Failed to create scoped upstream pool {}: {}",
521 qid.canonical(),
522 e
523 );
524 }
525 }
526 }
527
528 result
529 }
530
531 async fn initialize_route_components(
533 config: &Config,
534 ) -> Result<(
535 Registry<ErrorHandler>,
536 Registry<SchemaValidator>,
537 Registry<StaticFileServer>,
538 )> {
539 let mut error_handlers_map = HashMap::new();
540 let mut validators_map = HashMap::new();
541 let mut static_servers_map = HashMap::new();
542
543 for route in &config.routes {
544 info!(
545 "Initializing components for route: {} with service type: {:?}",
546 route.id, route.service_type
547 );
548
549 if let Some(ref error_config) = route.error_pages {
551 let handler =
552 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
553 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
554 debug!("Initialized error handler for route: {}", route.id);
555 } else {
556 let handler = ErrorHandler::new(route.service_type.clone(), None);
558 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
559 }
560
561 if route.service_type == grapsus_config::ServiceType::Api {
563 if let Some(ref api_schema) = route.api_schema {
564 match SchemaValidator::new(api_schema.clone()) {
565 Ok(validator) => {
566 validators_map.insert(route.id.clone(), Arc::new(validator));
567 info!("Initialized schema validator for route: {}", route.id);
568 }
569 Err(e) => {
570 warn!(
571 "Failed to initialize schema validator for route {}: {}",
572 route.id, e
573 );
574 }
575 }
576 }
577 }
578
579 if route.service_type == grapsus_config::ServiceType::Static {
581 if let Some(ref static_config) = route.static_files {
582 let server = StaticFileServer::new(static_config.clone());
583 static_servers_map.insert(route.id.clone(), Arc::new(server));
584 info!("Initialized static file server for route: {}", route.id);
585 } else {
586 warn!(
587 "Static route {} has no static_files configuration",
588 route.id
589 );
590 }
591 }
592 }
593
594 Ok((
595 Registry::from_map(error_handlers_map),
596 Registry::from_map(validators_map),
597 Registry::from_map(static_servers_map),
598 ))
599 }
600
601 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
603 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
604 }
605
606 fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
608 use grapsus_config::RateLimitAction;
609
610 let manager = if let Some(ref global) = config.rate_limits.global {
612 info!(
613 max_rps = global.max_rps,
614 burst = global.burst,
615 key = ?global.key,
616 "Initializing global rate limiter"
617 );
618 RateLimitManager::with_global_limit(global.max_rps, global.burst)
619 } else {
620 RateLimitManager::new()
621 };
622
623 for route in &config.routes {
624 if let Some(ref rate_limit) = route.policies.rate_limit {
626 let rl_config = RateLimitConfig {
627 max_rps: rate_limit.requests_per_second,
628 burst: rate_limit.burst,
629 key: rate_limit.key.clone(),
630 action: RateLimitAction::Reject,
631 status_code: 429,
632 message: None,
633 backend: grapsus_config::RateLimitBackend::Local,
634 max_delay_ms: 5000, };
636 manager.register_route(&route.id, rl_config);
637 info!(
638 route_id = %route.id,
639 max_rps = rate_limit.requests_per_second,
640 burst = rate_limit.burst,
641 key = ?rate_limit.key,
642 "Registered rate limiter for route"
643 );
644 }
645
646 for filter_id in &route.filters {
648 if let Some(filter_config) = config.filters.get(filter_id) {
649 if let grapsus_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
650 {
651 let rl_config = RateLimitConfig {
652 max_rps: rl_filter.max_rps,
653 burst: rl_filter.burst,
654 key: rl_filter.key.clone(),
655 action: rl_filter.on_limit.clone(),
656 status_code: rl_filter.status_code,
657 message: rl_filter.limit_message.clone(),
658 backend: rl_filter.backend.clone(),
659 max_delay_ms: rl_filter.max_delay_ms,
660 };
661 manager.register_route(&route.id, rl_config);
662 info!(
663 route_id = %route.id,
664 filter_id = %filter_id,
665 max_rps = rl_filter.max_rps,
666 backend = ?rl_filter.backend,
667 "Registered rate limiter from filter for route"
668 );
669 }
670 }
671 }
672 }
673
674 if manager.route_count() > 0 {
675 info!(
676 route_count = manager.route_count(),
677 "Rate limiting initialized"
678 );
679 }
680
681 manager
682 }
683
684 fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
689 let manager = InferenceRateLimitManager::new();
690
691 for route in &config.routes {
692 if route.service_type == grapsus_config::ServiceType::Inference {
694 if let Some(ref inference_config) = route.inference {
695 manager.register_route(&route.id, inference_config);
696 }
697 }
698 }
699
700 if manager.route_count() > 0 {
701 info!(
702 route_count = manager.route_count(),
703 "Inference rate limiting initialized"
704 );
705 }
706
707 manager
708 }
709
710 fn initialize_cache_manager(config: &Config) -> CacheManager {
712 let manager = CacheManager::new();
713
714 let mut enabled_count = 0;
715
716 for route in &config.routes {
717 let cache_config = if let Some(ref rc) = route.policies.cache {
719 let exclude_paths = rc
721 .exclude_paths
722 .iter()
723 .filter_map(|pattern| {
724 let regex_str = crate::cache::compile_glob_to_regex(pattern);
725 match regex::Regex::new(®ex_str) {
726 Ok(re) => Some(re),
727 Err(e) => {
728 warn!(
729 route_id = %route.id,
730 pattern = %pattern,
731 error = %e,
732 "Failed to compile cache exclude-path pattern"
733 );
734 None
735 }
736 }
737 })
738 .collect();
739
740 CacheConfig {
741 enabled: rc.enabled,
742 default_ttl_secs: rc.default_ttl_secs,
743 max_size_bytes: rc.max_size_bytes,
744 cache_private: rc.cache_private,
745 stale_while_revalidate_secs: rc.stale_while_revalidate_secs,
746 stale_if_error_secs: rc.stale_if_error_secs,
747 cacheable_methods: rc.cacheable_methods.clone(),
748 cacheable_status_codes: rc.cacheable_status_codes.clone(),
749 exclude_extensions: rc.exclude_extensions.clone(),
750 exclude_paths,
751 }
752 } else {
753 match route.service_type {
754 grapsus_config::ServiceType::Static => CacheConfig {
755 enabled: true,
756 default_ttl_secs: 3600,
757 max_size_bytes: 50 * 1024 * 1024, stale_while_revalidate_secs: 60,
759 stale_if_error_secs: 300,
760 ..Default::default()
761 },
762 grapsus_config::ServiceType::Api => CacheConfig {
763 enabled: false,
764 default_ttl_secs: 60,
765 ..Default::default()
766 },
767 grapsus_config::ServiceType::Web => CacheConfig {
768 enabled: false,
769 default_ttl_secs: 300,
770 ..Default::default()
771 },
772 _ => CacheConfig::default(),
773 }
774 };
775
776 if cache_config.enabled {
777 enabled_count += 1;
778 info!(
779 route_id = %route.id,
780 default_ttl_secs = cache_config.default_ttl_secs,
781 from_config = route.policies.cache.is_some(),
782 "HTTP caching enabled for route"
783 );
784 }
785 manager.register_route(&route.id, cache_config);
786 }
787
788 if enabled_count > 0 {
789 info!(enabled_routes = enabled_count, "HTTP caching initialized");
790 } else {
791 debug!("HTTP cache manager initialized (no routes with caching enabled)");
792 }
793
794 manager
795 }
796
797 fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
799 let manager = GeoFilterManager::new();
800
801 for (filter_id, filter_config) in &config.filters {
802 if let grapsus_config::Filter::Geo(ref geo_filter) = filter_config.filter {
803 match manager.register_filter(filter_id, geo_filter.clone()) {
804 Ok(_) => {
805 info!(
806 filter_id = %filter_id,
807 database_path = %geo_filter.database_path,
808 action = ?geo_filter.action,
809 countries_count = geo_filter.countries.len(),
810 "Registered geo filter"
811 );
812 }
813 Err(e) => {
814 error!(
815 filter_id = %filter_id,
816 error = %e,
817 "Failed to register geo filter"
818 );
819 }
820 }
821 }
822 }
823
824 let filter_ids = manager.filter_ids();
825 if !filter_ids.is_empty() {
826 info!(
827 filter_count = filter_ids.len(),
828 filter_ids = ?filter_ids,
829 "GeoIP filtering initialized"
830 );
831 }
832
833 manager
834 }
835
836 fn spawn_cleanup_task(
838 rate_limit_manager: Arc<RateLimitManager>,
839 geo_filter_manager: Arc<GeoFilterManager>,
840 ) {
841 const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
843
844 tokio::spawn(async move {
845 let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
846 interval.tick().await;
848
849 loop {
850 interval.tick().await;
851
852 rate_limit_manager.cleanup();
854
855 geo_filter_manager.clear_expired_caches();
857
858 debug!("Periodic cleanup completed");
859 }
860 });
861
862 info!(
863 interval_secs = CLEANUP_INTERVAL.as_secs(),
864 "Started periodic cleanup task"
865 );
866 }
867
868 fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
870 let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
871
872 match watcher.start_watching() {
874 Ok(mut rx) => {
875 let watcher_clone = watcher.clone();
876 tokio::spawn(async move {
877 const DEBOUNCE_MS: u64 = 500;
879
880 while let Some(path) = rx.recv().await {
881 tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
883
884 while rx.try_recv().is_ok() {}
886
887 watcher_clone.handle_change(&path);
889 }
890 });
891
892 info!("Started geo database file watcher");
893 }
894 Err(e) => {
895 warn!(
896 error = %e,
897 "Failed to start geo database file watcher, auto-reload disabled"
898 );
899 }
900 }
901 }
902}