sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use parking_lot::RwLock;
18use pingora::http::ResponseHeader;
19use pingora::prelude::*;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::ids::{QualifiedId, Scope};
27use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
28
29use crate::agents::AgentManager;
30use crate::app::AppState;
31use crate::builtin_handlers::BuiltinHandlerState;
32use crate::cache::{CacheConfig, CacheManager};
33use crate::errors::ErrorHandler;
34use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
35use crate::health::PassiveHealthChecker;
36use crate::http_helpers;
37use crate::logging::{LogManager, SharedLogManager};
38use crate::rate_limit::{RateLimitConfig, RateLimitManager};
39use crate::reload::{
40    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
41};
42use crate::routing::RouteMatcher;
43use crate::scoped_routing::ScopedRouteMatcher;
44use crate::static_files::StaticFileServer;
45use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
46use crate::validation::SchemaValidator;
47
48use sentinel_common::TraceIdFormat;
49use sentinel_config::{Config, FlattenedConfig};
50
51/// Main proxy service implementing Pingora's ProxyHttp trait
52pub struct SentinelProxy {
53    /// Configuration manager with hot reload
54    pub config_manager: Arc<ConfigManager>,
55    /// Route matcher (global routes only, for backward compatibility)
56    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
57    /// Scoped route matcher (namespace/service aware)
58    pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
59    /// Upstream pools (keyed by upstream ID, global only)
60    pub(super) upstream_pools: Registry<UpstreamPool>,
61    /// Scoped upstream pools (namespace/service aware)
62    pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
63    /// Agent manager for external processing
64    pub(super) agent_manager: Arc<AgentManager>,
65    /// Passive health checker
66    pub(super) passive_health: Arc<PassiveHealthChecker>,
67    /// Metrics collector
68    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
69    /// Scoped metrics collector (with namespace/service labels)
70    pub(super) scoped_metrics: Arc<ScopedMetrics>,
71    /// Application state
72    pub(super) app_state: Arc<AppState>,
73    /// Graceful reload coordinator
74    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
75    /// Error handlers per route (keyed by route ID)
76    pub(super) error_handlers: Registry<ErrorHandler>,
77    /// API schema validators per route (keyed by route ID)
78    pub(super) validators: Registry<SchemaValidator>,
79    /// Static file servers per route (keyed by route ID)
80    pub(super) static_servers: Registry<StaticFileServer>,
81    /// Builtin handler state
82    pub(super) builtin_state: Arc<BuiltinHandlerState>,
83    /// Log manager for file-based logging
84    pub(super) log_manager: SharedLogManager,
85    /// Trace ID format for request tracing
86    pub(super) trace_id_format: TraceIdFormat,
87    /// Active health check runner
88    pub(super) health_check_runner: Arc<HealthCheckRunner>,
89    /// Rate limit manager
90    pub(super) rate_limit_manager: Arc<RateLimitManager>,
91    /// HTTP cache manager
92    pub(super) cache_manager: Arc<CacheManager>,
93    /// GeoIP filter manager
94    pub(super) geo_filter_manager: Arc<GeoFilterManager>,
95}
96
97impl SentinelProxy {
98    /// Create new proxy instance
99    ///
100    /// If config_path is None, uses the embedded default configuration.
101    /// Note: Tracing must be initialized by the caller before calling this function.
102    pub async fn new(config_path: Option<&str>) -> Result<Self> {
103        info!("Starting Sentinel Proxy");
104
105        // Load initial configuration
106        let (config, effective_config_path) = match config_path {
107            Some(path) => {
108                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
109                (cfg, path.to_string())
110            }
111            None => {
112                let cfg = Config::default_embedded()
113                    .context("Failed to load embedded default configuration")?;
114                // Use a sentinel path to indicate embedded config
115                (cfg, "_embedded_".to_string())
116            }
117        };
118
119        config
120            .validate()
121            .context("Initial configuration validation failed")?;
122
123        // Configure global cache storage (must be done before cache is accessed)
124        if let Some(ref cache_config) = config.cache {
125            info!(
126                max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
127                backend = ?cache_config.backend,
128                "Configuring HTTP cache storage"
129            );
130            crate::cache::configure_cache(cache_config.clone());
131        }
132
133        // Create configuration manager
134        let config_manager =
135            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
136
137        // Add validators
138        config_manager.add_validator(Box::new(RouteValidator)).await;
139        config_manager
140            .add_validator(Box::new(UpstreamValidator))
141            .await;
142
143        // Create route matcher (global routes only)
144        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
145
146        // Flatten config for namespace/service resources
147        let flattened = config.flatten();
148
149        // Create scoped route matcher
150        let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
151            ScopedRouteMatcher::from_flattened(&flattened)
152                .await
153                .context("Failed to create scoped route matcher")?,
154        ));
155
156        // Create upstream pools and active health checkers (global only)
157        let mut pools = HashMap::new();
158        let mut health_check_runner = HealthCheckRunner::new();
159
160        for (upstream_id, upstream_config) in &config.upstreams {
161            let mut config_with_id = upstream_config.clone();
162            config_with_id.id = upstream_id.clone();
163            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
164            pools.insert(upstream_id.clone(), pool);
165
166            // Create active health checker if health check is configured
167            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
168                health_check_runner.add_checker(checker);
169            }
170        }
171        let upstream_pools = Registry::from_map(pools);
172
173        // Create scoped upstream pools from flattened config
174        let scoped_upstream_pools =
175            Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
176
177        let health_check_runner = Arc::new(health_check_runner);
178
179        // Create passive health checker
180        let passive_health = Arc::new(PassiveHealthChecker::new(
181            0.5,  // 50% failure rate threshold
182            100,  // Window size
183            None, // Will be linked to active health checkers
184        ));
185
186        // Create agent manager (per-agent queue isolation)
187        let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
188        agent_manager.initialize().await?;
189
190        // Create metrics collectors
191        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
192        let scoped_metrics = Arc::new(
193            ScopedMetrics::new().context("Failed to create scoped metrics collector")?,
194        );
195
196        // Create application state
197        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
198
199        // Create reload coordinator
200        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
201            Duration::from_secs(30), // Max drain time
202        ));
203
204        // Setup configuration reload subscription
205        Self::setup_reload_handler(
206            config_manager.clone(),
207            route_matcher.clone(),
208            upstream_pools.clone(),
209            scoped_route_matcher.clone(),
210            scoped_upstream_pools.clone(),
211        )
212        .await;
213
214        // Initialize service type components
215        let (error_handlers, validators, static_servers) =
216            Self::initialize_route_components(&config).await?;
217
218        // Create builtin handler state
219        let builtin_state = Arc::new(BuiltinHandlerState::new(
220            env!("CARGO_PKG_VERSION").to_string(),
221            app_state.instance_id.clone(),
222        ));
223
224        // Create log manager for file-based logging
225        let log_manager = match LogManager::new(&config.observability.logging) {
226            Ok(manager) => {
227                if manager.access_log_enabled() {
228                    info!("Access logging enabled");
229                }
230                if manager.error_log_enabled() {
231                    info!("Error logging enabled");
232                }
233                if manager.audit_log_enabled() {
234                    info!("Audit logging enabled");
235                }
236                Arc::new(manager)
237            }
238            Err(e) => {
239                warn!(
240                    "Failed to initialize log manager, file logging disabled: {}",
241                    e
242                );
243                Arc::new(LogManager::disabled())
244            }
245        };
246
247        // Register audit reload hook to log configuration changes
248        {
249            use crate::reload::AuditReloadHook;
250            let audit_hook = AuditReloadHook::new(log_manager.clone());
251            config_manager.add_hook(Box::new(audit_hook)).await;
252            debug!("Registered audit reload hook");
253        }
254
255        // Start active health check runner in background
256        if health_check_runner.checker_count() > 0 {
257            let runner = health_check_runner.clone();
258            tokio::spawn(async move {
259                runner.run().await;
260            });
261            info!(
262                "Started active health checking for {} upstreams",
263                health_check_runner.checker_count()
264            );
265        }
266
267        // Initialize rate limit manager
268        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
269
270        // Initialize geo filter manager
271        let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
272
273        // Start periodic cleanup task for rate limiters and geo caches
274        Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
275
276        // Start geo database file watcher for hot reload
277        Self::spawn_geo_database_watcher(geo_filter_manager.clone());
278
279        // Mark as ready
280        app_state.set_ready(true);
281
282        // Get trace ID format from config
283        let trace_id_format = config.server.trace_id_format;
284
285        // Initialize cache manager
286        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
287
288        Ok(Self {
289            config_manager,
290            route_matcher,
291            scoped_route_matcher,
292            upstream_pools,
293            scoped_upstream_pools,
294            agent_manager,
295            passive_health,
296            metrics,
297            scoped_metrics,
298            app_state,
299            reload_coordinator,
300            error_handlers,
301            validators,
302            static_servers,
303            builtin_state,
304            log_manager,
305            trace_id_format,
306            health_check_runner,
307            rate_limit_manager,
308            cache_manager,
309            geo_filter_manager,
310        })
311    }
312
313    /// Setup the configuration reload handler
314    async fn setup_reload_handler(
315        config_manager: Arc<ConfigManager>,
316        route_matcher: Arc<RwLock<RouteMatcher>>,
317        upstream_pools: Registry<UpstreamPool>,
318        scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
319        scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
320    ) {
321        let mut reload_rx = config_manager.subscribe();
322        let config_manager_clone = config_manager.clone();
323
324        tokio::spawn(async move {
325            while let Ok(event) = reload_rx.recv().await {
326                if let ReloadEvent::Applied { .. } = event {
327                    // Reload routes and upstreams
328                    let new_config = config_manager_clone.current();
329                    let flattened = new_config.flatten();
330
331                    // Update route matcher (sync parking_lot::RwLock)
332                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
333                        *route_matcher.write() = new_matcher;
334                        info!("Global routes reloaded successfully");
335                    }
336
337                    // Update scoped route matcher
338                    if let Err(e) = scoped_route_matcher
339                        .write()
340                        .await
341                        .load_from_flattened(&flattened)
342                        .await
343                    {
344                        error!("Failed to reload scoped routes: {}", e);
345                    } else {
346                        info!(
347                            "Scoped routes reloaded ({} scopes)",
348                            scoped_route_matcher.read().await.scope_count().await
349                        );
350                    }
351
352                    // Update global upstream pools
353                    let mut new_pools = HashMap::new();
354                    for (upstream_id, upstream_config) in &new_config.upstreams {
355                        let mut config_with_id = upstream_config.clone();
356                        config_with_id.id = upstream_id.clone();
357                        match UpstreamPool::new(config_with_id).await {
358                            Ok(pool) => {
359                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
360                            }
361                            Err(e) => {
362                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
363                            }
364                        }
365                    }
366
367                    // Gracefully swap global pools
368                    let old_pools = upstream_pools.replace(new_pools).await;
369
370                    // Update scoped upstream pools
371                    let new_scoped_pools =
372                        Self::build_scoped_pools_list(&flattened).await;
373                    let old_scoped_pools = scoped_upstream_pools.replace_all(new_scoped_pools).await;
374
375                    info!(
376                        "Scoped upstream pools reloaded ({} pools)",
377                        scoped_upstream_pools.len().await
378                    );
379
380                    // Shutdown old pools after delay
381                    tokio::spawn(async move {
382                        tokio::time::sleep(Duration::from_secs(60)).await;
383
384                        // Shutdown old global pools
385                        for (name, pool) in old_pools {
386                            info!("Shutting down old global pool: {}", name);
387                            pool.shutdown().await;
388                        }
389
390                        // Shutdown old scoped pools
391                        for (name, pool) in old_scoped_pools {
392                            info!("Shutting down old scoped pool: {}", name);
393                            pool.shutdown().await;
394                        }
395                    });
396                }
397            }
398        });
399    }
400
401    /// Create scoped upstream pools from flattened config
402    async fn create_scoped_upstream_pools(
403        flattened: &FlattenedConfig,
404        health_check_runner: &mut HealthCheckRunner,
405    ) -> Result<ScopedRegistry<UpstreamPool>> {
406        let registry = ScopedRegistry::new();
407
408        for (qid, upstream_config) in &flattened.upstreams {
409            let mut config_with_id = upstream_config.clone();
410            config_with_id.id = qid.canonical();
411
412            let pool = Arc::new(
413                UpstreamPool::new(config_with_id.clone())
414                    .await
415                    .with_context(|| format!("Failed to create upstream pool '{}'", qid.canonical()))?,
416            );
417
418            // Track exports
419            let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
420
421            if is_exported {
422                registry.insert_exported(qid.clone(), pool).await;
423            } else {
424                registry.insert(qid.clone(), pool).await;
425            }
426
427            // Create active health checker if configured
428            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
429                health_check_runner.add_checker(checker);
430            }
431
432            debug!(
433                upstream_id = %qid.canonical(),
434                scope = ?qid.scope,
435                exported = is_exported,
436                "Created scoped upstream pool"
437            );
438        }
439
440        info!(
441            "Created {} scoped upstream pools",
442            registry.len().await
443        );
444
445        Ok(registry)
446    }
447
448    /// Build list of scoped pools for atomic replacement
449    async fn build_scoped_pools_list(
450        flattened: &FlattenedConfig,
451    ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
452        let mut result = Vec::new();
453
454        for (qid, upstream_config) in &flattened.upstreams {
455            let mut config_with_id = upstream_config.clone();
456            config_with_id.id = qid.canonical();
457
458            match UpstreamPool::new(config_with_id).await {
459                Ok(pool) => {
460                    let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
461                    result.push((qid.clone(), Arc::new(pool), is_exported));
462                }
463                Err(e) => {
464                    error!(
465                        "Failed to create scoped upstream pool {}: {}",
466                        qid.canonical(),
467                        e
468                    );
469                }
470            }
471        }
472
473        result
474    }
475
476    /// Initialize route-specific components (error handlers, validators, static servers)
477    async fn initialize_route_components(
478        config: &Config,
479    ) -> Result<(
480        Registry<ErrorHandler>,
481        Registry<SchemaValidator>,
482        Registry<StaticFileServer>,
483    )> {
484        let mut error_handlers_map = HashMap::new();
485        let mut validators_map = HashMap::new();
486        let mut static_servers_map = HashMap::new();
487
488        for route in &config.routes {
489            info!(
490                "Initializing components for route: {} with service type: {:?}",
491                route.id, route.service_type
492            );
493
494            // Initialize error handler for each route
495            if let Some(ref error_config) = route.error_pages {
496                let handler =
497                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
498                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
499                debug!("Initialized error handler for route: {}", route.id);
500            } else {
501                // Use default error handler for the service type
502                let handler = ErrorHandler::new(route.service_type.clone(), None);
503                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
504            }
505
506            // Initialize schema validator for API routes
507            if route.service_type == sentinel_config::ServiceType::Api {
508                if let Some(ref api_schema) = route.api_schema {
509                    match SchemaValidator::new(api_schema.clone()) {
510                        Ok(validator) => {
511                            validators_map.insert(route.id.clone(), Arc::new(validator));
512                            info!("Initialized schema validator for route: {}", route.id);
513                        }
514                        Err(e) => {
515                            warn!(
516                                "Failed to initialize schema validator for route {}: {}",
517                                route.id, e
518                            );
519                        }
520                    }
521                }
522            }
523
524            // Initialize static file server for static routes
525            if route.service_type == sentinel_config::ServiceType::Static {
526                if let Some(ref static_config) = route.static_files {
527                    let server = StaticFileServer::new(static_config.clone());
528                    static_servers_map.insert(route.id.clone(), Arc::new(server));
529                    info!("Initialized static file server for route: {}", route.id);
530                } else {
531                    warn!(
532                        "Static route {} has no static_files configuration",
533                        route.id
534                    );
535                }
536            }
537        }
538
539        Ok((
540            Registry::from_map(error_handlers_map),
541            Registry::from_map(validators_map),
542            Registry::from_map(static_servers_map),
543        ))
544    }
545
546    /// Get or generate trace ID from session
547    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
548        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
549    }
550
551    /// Initialize rate limiters from configuration
552    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
553        use sentinel_config::RateLimitAction;
554
555        // Create manager with global rate limit if configured
556        let manager = if let Some(ref global) = config.rate_limits.global {
557            info!(
558                max_rps = global.max_rps,
559                burst = global.burst,
560                key = ?global.key,
561                "Initializing global rate limiter"
562            );
563            RateLimitManager::with_global_limit(global.max_rps, global.burst)
564        } else {
565            RateLimitManager::new()
566        };
567
568        for route in &config.routes {
569            // Check for rate limit in route policies
570            if let Some(ref rate_limit) = route.policies.rate_limit {
571                let rl_config = RateLimitConfig {
572                    max_rps: rate_limit.requests_per_second,
573                    burst: rate_limit.burst,
574                    key: rate_limit.key.clone(),
575                    action: RateLimitAction::Reject,
576                    status_code: 429,
577                    message: None,
578                    backend: sentinel_config::RateLimitBackend::Local,
579                    max_delay_ms: 5000, // Default for policy-based rate limits
580                };
581                manager.register_route(&route.id, rl_config);
582                info!(
583                    route_id = %route.id,
584                    max_rps = rate_limit.requests_per_second,
585                    burst = rate_limit.burst,
586                    key = ?rate_limit.key,
587                    "Registered rate limiter for route"
588                );
589            }
590
591            // Also check for rate limit filters in the filter chain
592            for filter_id in &route.filters {
593                if let Some(filter_config) = config.filters.get(filter_id) {
594                    if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
595                    {
596                        let rl_config = RateLimitConfig {
597                            max_rps: rl_filter.max_rps,
598                            burst: rl_filter.burst,
599                            key: rl_filter.key.clone(),
600                            action: rl_filter.on_limit.clone(),
601                            status_code: rl_filter.status_code,
602                            message: rl_filter.limit_message.clone(),
603                            backend: rl_filter.backend.clone(),
604                            max_delay_ms: rl_filter.max_delay_ms,
605                        };
606                        manager.register_route(&route.id, rl_config);
607                        info!(
608                            route_id = %route.id,
609                            filter_id = %filter_id,
610                            max_rps = rl_filter.max_rps,
611                            backend = ?rl_filter.backend,
612                            "Registered rate limiter from filter for route"
613                        );
614                    }
615                }
616            }
617        }
618
619        if manager.route_count() > 0 {
620            info!(
621                route_count = manager.route_count(),
622                "Rate limiting initialized"
623            );
624        }
625
626        manager
627    }
628
629    /// Initialize cache manager from configuration
630    fn initialize_cache_manager(config: &Config) -> CacheManager {
631        let manager = CacheManager::new();
632
633        let mut enabled_count = 0;
634
635        for route in &config.routes {
636            // API routes: caching disabled by default (responses often dynamic)
637            if route.service_type == sentinel_config::ServiceType::Api {
638                let cache_config = CacheConfig {
639                    enabled: false, // Disabled until explicitly configured via KDL
640                    default_ttl_secs: 60,
641                    ..Default::default()
642                };
643                manager.register_route(&route.id, cache_config);
644            }
645
646            // Static routes: enable caching by default (assets are typically cacheable)
647            if route.service_type == sentinel_config::ServiceType::Static {
648                let cache_config = CacheConfig {
649                    enabled: true, // Enable by default for static routes
650                    default_ttl_secs: 3600,
651                    max_size_bytes: 50 * 1024 * 1024, // 50MB for static
652                    stale_while_revalidate_secs: 60,
653                    stale_if_error_secs: 300,
654                    ..Default::default()
655                };
656                manager.register_route(&route.id, cache_config);
657                enabled_count += 1;
658                info!(
659                    route_id = %route.id,
660                    default_ttl_secs = 3600,
661                    "HTTP caching enabled for static route"
662                );
663            }
664
665            // Web routes: disable by default (HTML often personalized)
666            if route.service_type == sentinel_config::ServiceType::Web {
667                let cache_config = CacheConfig {
668                    enabled: false, // Disabled until explicitly configured
669                    default_ttl_secs: 300,
670                    ..Default::default()
671                };
672                manager.register_route(&route.id, cache_config);
673            }
674        }
675
676        if enabled_count > 0 {
677            info!(enabled_routes = enabled_count, "HTTP caching initialized");
678        } else {
679            debug!("HTTP cache manager initialized (no routes with caching enabled)");
680        }
681
682        manager
683    }
684
685    /// Initialize geo filters from configuration
686    fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
687        let manager = GeoFilterManager::new();
688
689        for (filter_id, filter_config) in &config.filters {
690            if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
691                match manager.register_filter(filter_id, geo_filter.clone()) {
692                    Ok(_) => {
693                        info!(
694                            filter_id = %filter_id,
695                            database_path = %geo_filter.database_path,
696                            action = ?geo_filter.action,
697                            countries_count = geo_filter.countries.len(),
698                            "Registered geo filter"
699                        );
700                    }
701                    Err(e) => {
702                        error!(
703                            filter_id = %filter_id,
704                            error = %e,
705                            "Failed to register geo filter"
706                        );
707                    }
708                }
709            }
710        }
711
712        let filter_ids = manager.filter_ids();
713        if !filter_ids.is_empty() {
714            info!(
715                filter_count = filter_ids.len(),
716                filter_ids = ?filter_ids,
717                "GeoIP filtering initialized"
718            );
719        }
720
721        manager
722    }
723
724    /// Apply security headers to response
725    pub(super) fn apply_security_headers(
726        &self,
727        header: &mut ResponseHeader,
728    ) -> Result<(), Box<Error>> {
729        header.insert_header("X-Content-Type-Options", "nosniff")?;
730        header.insert_header("X-Frame-Options", "DENY")?;
731        header.insert_header("X-XSS-Protection", "1; mode=block")?;
732        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
733        header.remove_header("Server");
734        header.remove_header("X-Powered-By");
735        Ok(())
736    }
737
738    /// Spawn background task to periodically clean up idle rate limiters and expired geo caches
739    fn spawn_cleanup_task(
740        rate_limit_manager: Arc<RateLimitManager>,
741        geo_filter_manager: Arc<GeoFilterManager>,
742    ) {
743        // Cleanup interval: 5 minutes
744        const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
745
746        tokio::spawn(async move {
747            let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
748            // First tick completes immediately; skip it
749            interval.tick().await;
750
751            loop {
752                interval.tick().await;
753
754                // Clean up rate limiters (removes entries when pool exceeds max size)
755                rate_limit_manager.cleanup();
756
757                // Clean up expired geo filter caches
758                geo_filter_manager.clear_expired_caches();
759
760                debug!("Periodic cleanup completed");
761            }
762        });
763
764        info!(
765            interval_secs = CLEANUP_INTERVAL.as_secs(),
766            "Started periodic cleanup task"
767        );
768    }
769
770    /// Spawn background task to watch geo database files for changes
771    fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
772        let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
773
774        // Try to start watching
775        match watcher.start_watching() {
776            Ok(mut rx) => {
777                let watcher_clone = watcher.clone();
778                tokio::spawn(async move {
779                    // Debounce interval
780                    const DEBOUNCE_MS: u64 = 500;
781
782                    while let Some(path) = rx.recv().await {
783                        // Debounce rapid changes (e.g., temp file then rename)
784                        tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
785
786                        // Drain any additional events for the same path during debounce
787                        while rx.try_recv().is_ok() {}
788
789                        // Handle the change
790                        watcher_clone.handle_change(&path);
791                    }
792                });
793
794                info!("Started geo database file watcher");
795            }
796            Err(e) => {
797                warn!(
798                    error = %e,
799                    "Failed to start geo database file watcher, auto-reload disabled"
800                );
801            }
802        }
803    }
804}