sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod fallback;
12mod fallback_metrics;
13mod handlers;
14mod http_trait;
15mod model_routing;
16mod model_routing_metrics;
17
18pub use context::{FallbackReason, RequestContext};
19pub use fallback::{FallbackDecision, FallbackEvaluator};
20pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
21pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
22pub use model_routing_metrics::{
23    get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
24};
25
26use anyhow::{Context, Result};
27use parking_lot::RwLock;
28use pingora::http::ResponseHeader;
29use pingora::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use sentinel_common::ids::{QualifiedId, Scope};
37use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
38
39use crate::agents::AgentManager;
40use crate::app::AppState;
41use crate::builtin_handlers::BuiltinHandlerState;
42use crate::cache::{CacheConfig, CacheManager};
43use crate::errors::ErrorHandler;
44use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
45use crate::health::PassiveHealthChecker;
46use crate::http_helpers;
47use crate::inference::InferenceRateLimitManager;
48use crate::logging::{LogManager, SharedLogManager};
49use crate::rate_limit::{RateLimitConfig, RateLimitManager};
50use crate::reload::{
51    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
52};
53use crate::routing::RouteMatcher;
54use crate::scoped_routing::ScopedRouteMatcher;
55use crate::static_files::StaticFileServer;
56use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
57use crate::validation::SchemaValidator;
58
59use sentinel_common::TraceIdFormat;
60use sentinel_config::{Config, FlattenedConfig};
61
62/// Main proxy service implementing Pingora's ProxyHttp trait
63pub struct SentinelProxy {
64    /// Configuration manager with hot reload
65    pub config_manager: Arc<ConfigManager>,
66    /// Route matcher (global routes only, for backward compatibility)
67    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
68    /// Scoped route matcher (namespace/service aware)
69    pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
70    /// Upstream pools (keyed by upstream ID, global only)
71    pub(super) upstream_pools: Registry<UpstreamPool>,
72    /// Scoped upstream pools (namespace/service aware)
73    pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
74    /// Agent manager for external processing
75    pub(super) agent_manager: Arc<AgentManager>,
76    /// Passive health checker
77    pub(super) passive_health: Arc<PassiveHealthChecker>,
78    /// Metrics collector
79    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
80    /// Scoped metrics collector (with namespace/service labels)
81    pub(super) scoped_metrics: Arc<ScopedMetrics>,
82    /// Application state
83    pub(super) app_state: Arc<AppState>,
84    /// Graceful reload coordinator
85    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
86    /// Error handlers per route (keyed by route ID)
87    pub(super) error_handlers: Registry<ErrorHandler>,
88    /// API schema validators per route (keyed by route ID)
89    pub(super) validators: Registry<SchemaValidator>,
90    /// Static file servers per route (keyed by route ID)
91    pub(super) static_servers: Registry<StaticFileServer>,
92    /// Builtin handler state
93    pub(super) builtin_state: Arc<BuiltinHandlerState>,
94    /// Log manager for file-based logging
95    pub(super) log_manager: SharedLogManager,
96    /// Trace ID format for request tracing
97    pub(super) trace_id_format: TraceIdFormat,
98    /// Active health check runner
99    pub(super) health_check_runner: Arc<HealthCheckRunner>,
100    /// Rate limit manager
101    pub(super) rate_limit_manager: Arc<RateLimitManager>,
102    /// HTTP cache manager
103    pub(super) cache_manager: Arc<CacheManager>,
104    /// GeoIP filter manager
105    pub(super) geo_filter_manager: Arc<GeoFilterManager>,
106    /// Inference rate limit manager (token-based rate limiting for LLM/AI routes)
107    pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
108    /// Warmth tracker for cold model detection on inference routes
109    pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
110    /// Guardrail processor for semantic inspection (prompt injection, PII detection)
111    pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
112}
113
114impl SentinelProxy {
115    /// Create new proxy instance
116    ///
117    /// If config_path is None, uses the embedded default configuration.
118    /// Note: Tracing must be initialized by the caller before calling this function.
119    pub async fn new(config_path: Option<&str>) -> Result<Self> {
120        info!("Starting Sentinel Proxy");
121
122        // Load initial configuration
123        let (config, effective_config_path) = match config_path {
124            Some(path) => {
125                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
126                (cfg, path.to_string())
127            }
128            None => {
129                let cfg = Config::default_embedded()
130                    .context("Failed to load embedded default configuration")?;
131                // Use a sentinel path to indicate embedded config
132                (cfg, "_embedded_".to_string())
133            }
134        };
135
136        config
137            .validate()
138            .context("Initial configuration validation failed")?;
139
140        // Configure global cache storage (must be done before cache is accessed)
141        if let Some(ref cache_config) = config.cache {
142            info!(
143                max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
144                backend = ?cache_config.backend,
145                "Configuring HTTP cache storage"
146            );
147            crate::cache::configure_cache(cache_config.clone());
148        }
149
150        // Create configuration manager
151        let config_manager =
152            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
153
154        // Add validators
155        config_manager.add_validator(Box::new(RouteValidator)).await;
156        config_manager
157            .add_validator(Box::new(UpstreamValidator))
158            .await;
159
160        // Create route matcher (global routes only)
161        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
162
163        // Flatten config for namespace/service resources
164        let flattened = config.flatten();
165
166        // Create scoped route matcher
167        let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
168            ScopedRouteMatcher::from_flattened(&flattened)
169                .await
170                .context("Failed to create scoped route matcher")?,
171        ));
172
173        // Create upstream pools and active health checkers (global only)
174        let mut pools = HashMap::new();
175        let mut health_check_runner = HealthCheckRunner::new();
176
177        for (upstream_id, upstream_config) in &config.upstreams {
178            let mut config_with_id = upstream_config.clone();
179            config_with_id.id = upstream_id.clone();
180            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
181            pools.insert(upstream_id.clone(), pool);
182
183            // Create active health checker if health check is configured
184            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
185                health_check_runner.add_checker(checker);
186            }
187        }
188        let upstream_pools = Registry::from_map(pools);
189
190        // Create scoped upstream pools from flattened config
191        let scoped_upstream_pools =
192            Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
193
194        let health_check_runner = Arc::new(health_check_runner);
195
196        // Create passive health checker
197        let passive_health = Arc::new(PassiveHealthChecker::new(
198            0.5,  // 50% failure rate threshold
199            100,  // Window size
200            None, // Will be linked to active health checkers
201        ));
202
203        // Create agent manager (per-agent queue isolation)
204        let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
205        agent_manager.initialize().await?;
206
207        // Create metrics collectors
208        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
209        let scoped_metrics = Arc::new(
210            ScopedMetrics::new().context("Failed to create scoped metrics collector")?,
211        );
212
213        // Create application state
214        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
215
216        // Create reload coordinator
217        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
218            Duration::from_secs(30), // Max drain time
219        ));
220
221        // Setup configuration reload subscription
222        Self::setup_reload_handler(
223            config_manager.clone(),
224            route_matcher.clone(),
225            upstream_pools.clone(),
226            scoped_route_matcher.clone(),
227            scoped_upstream_pools.clone(),
228        )
229        .await;
230
231        // Initialize service type components
232        let (error_handlers, validators, static_servers) =
233            Self::initialize_route_components(&config).await?;
234
235        // Create builtin handler state
236        let builtin_state = Arc::new(BuiltinHandlerState::new(
237            env!("CARGO_PKG_VERSION").to_string(),
238            app_state.instance_id.clone(),
239        ));
240
241        // Create log manager for file-based logging
242        let log_manager = match LogManager::new(&config.observability.logging) {
243            Ok(manager) => {
244                if manager.access_log_enabled() {
245                    info!("Access logging enabled");
246                }
247                if manager.error_log_enabled() {
248                    info!("Error logging enabled");
249                }
250                if manager.audit_log_enabled() {
251                    info!("Audit logging enabled");
252                }
253                Arc::new(manager)
254            }
255            Err(e) => {
256                warn!(
257                    "Failed to initialize log manager, file logging disabled: {}",
258                    e
259                );
260                Arc::new(LogManager::disabled())
261            }
262        };
263
264        // Register audit reload hook to log configuration changes
265        {
266            use crate::reload::AuditReloadHook;
267            let audit_hook = AuditReloadHook::new(log_manager.clone());
268            config_manager.add_hook(Box::new(audit_hook)).await;
269            debug!("Registered audit reload hook");
270        }
271
272        // Start active health check runner in background
273        if health_check_runner.checker_count() > 0 {
274            let runner = health_check_runner.clone();
275            tokio::spawn(async move {
276                runner.run().await;
277            });
278            info!(
279                "Started active health checking for {} upstreams",
280                health_check_runner.checker_count()
281            );
282        }
283
284        // Initialize rate limit manager
285        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
286
287        // Initialize inference rate limit manager (for token-based LLM rate limiting)
288        let inference_rate_limit_manager =
289            Arc::new(Self::initialize_inference_rate_limiters(&config));
290
291        // Initialize warmth tracker for cold model detection
292        let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
293
294        // Initialize guardrail processor for semantic inspection
295        let guardrail_processor =
296            Arc::new(crate::inference::GuardrailProcessor::new(agent_manager.clone()));
297
298        // Initialize geo filter manager
299        let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
300
301        // Start periodic cleanup task for rate limiters and geo caches
302        Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
303
304        // Start geo database file watcher for hot reload
305        Self::spawn_geo_database_watcher(geo_filter_manager.clone());
306
307        // Mark as ready
308        app_state.set_ready(true);
309
310        // Get trace ID format from config
311        let trace_id_format = config.server.trace_id_format;
312
313        // Initialize cache manager
314        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
315
316        // Initialize fallback metrics (best-effort, log warning if fails)
317        if let Err(e) = init_fallback_metrics() {
318            warn!("Failed to initialize fallback metrics: {}", e);
319        }
320
321        // Initialize model routing metrics (best-effort, log warning if fails)
322        if let Err(e) = init_model_routing_metrics() {
323            warn!("Failed to initialize model routing metrics: {}", e);
324        }
325
326        Ok(Self {
327            config_manager,
328            route_matcher,
329            scoped_route_matcher,
330            upstream_pools,
331            scoped_upstream_pools,
332            agent_manager,
333            passive_health,
334            metrics,
335            scoped_metrics,
336            app_state,
337            reload_coordinator,
338            error_handlers,
339            validators,
340            static_servers,
341            builtin_state,
342            log_manager,
343            trace_id_format,
344            health_check_runner,
345            rate_limit_manager,
346            cache_manager,
347            geo_filter_manager,
348            inference_rate_limit_manager,
349            warmth_tracker,
350            guardrail_processor,
351        })
352    }
353
354    /// Setup the configuration reload handler
355    async fn setup_reload_handler(
356        config_manager: Arc<ConfigManager>,
357        route_matcher: Arc<RwLock<RouteMatcher>>,
358        upstream_pools: Registry<UpstreamPool>,
359        scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
360        scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
361    ) {
362        let mut reload_rx = config_manager.subscribe();
363        let config_manager_clone = config_manager.clone();
364
365        tokio::spawn(async move {
366            while let Ok(event) = reload_rx.recv().await {
367                if let ReloadEvent::Applied { .. } = event {
368                    // Reload routes and upstreams
369                    let new_config = config_manager_clone.current();
370                    let flattened = new_config.flatten();
371
372                    // Update route matcher (sync parking_lot::RwLock)
373                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
374                        *route_matcher.write() = new_matcher;
375                        info!("Global routes reloaded successfully");
376                    }
377
378                    // Update scoped route matcher
379                    if let Err(e) = scoped_route_matcher
380                        .write()
381                        .await
382                        .load_from_flattened(&flattened)
383                        .await
384                    {
385                        error!("Failed to reload scoped routes: {}", e);
386                    } else {
387                        info!(
388                            "Scoped routes reloaded ({} scopes)",
389                            scoped_route_matcher.read().await.scope_count().await
390                        );
391                    }
392
393                    // Update global upstream pools
394                    let mut new_pools = HashMap::new();
395                    for (upstream_id, upstream_config) in &new_config.upstreams {
396                        let mut config_with_id = upstream_config.clone();
397                        config_with_id.id = upstream_id.clone();
398                        match UpstreamPool::new(config_with_id).await {
399                            Ok(pool) => {
400                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
401                            }
402                            Err(e) => {
403                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
404                            }
405                        }
406                    }
407
408                    // Gracefully swap global pools
409                    let old_pools = upstream_pools.replace(new_pools).await;
410
411                    // Update scoped upstream pools
412                    let new_scoped_pools =
413                        Self::build_scoped_pools_list(&flattened).await;
414                    let old_scoped_pools = scoped_upstream_pools.replace_all(new_scoped_pools).await;
415
416                    info!(
417                        "Scoped upstream pools reloaded ({} pools)",
418                        scoped_upstream_pools.len().await
419                    );
420
421                    // Shutdown old pools after delay
422                    tokio::spawn(async move {
423                        tokio::time::sleep(Duration::from_secs(60)).await;
424
425                        // Shutdown old global pools
426                        for (name, pool) in old_pools {
427                            info!("Shutting down old global pool: {}", name);
428                            pool.shutdown().await;
429                        }
430
431                        // Shutdown old scoped pools
432                        for (name, pool) in old_scoped_pools {
433                            info!("Shutting down old scoped pool: {}", name);
434                            pool.shutdown().await;
435                        }
436                    });
437                }
438            }
439        });
440    }
441
442    /// Create scoped upstream pools from flattened config
443    async fn create_scoped_upstream_pools(
444        flattened: &FlattenedConfig,
445        health_check_runner: &mut HealthCheckRunner,
446    ) -> Result<ScopedRegistry<UpstreamPool>> {
447        let registry = ScopedRegistry::new();
448
449        for (qid, upstream_config) in &flattened.upstreams {
450            let mut config_with_id = upstream_config.clone();
451            config_with_id.id = qid.canonical();
452
453            let pool = Arc::new(
454                UpstreamPool::new(config_with_id.clone())
455                    .await
456                    .with_context(|| format!("Failed to create upstream pool '{}'", qid.canonical()))?,
457            );
458
459            // Track exports
460            let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
461
462            if is_exported {
463                registry.insert_exported(qid.clone(), pool).await;
464            } else {
465                registry.insert(qid.clone(), pool).await;
466            }
467
468            // Create active health checker if configured
469            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
470                health_check_runner.add_checker(checker);
471            }
472
473            debug!(
474                upstream_id = %qid.canonical(),
475                scope = ?qid.scope,
476                exported = is_exported,
477                "Created scoped upstream pool"
478            );
479        }
480
481        info!(
482            "Created {} scoped upstream pools",
483            registry.len().await
484        );
485
486        Ok(registry)
487    }
488
489    /// Build list of scoped pools for atomic replacement
490    async fn build_scoped_pools_list(
491        flattened: &FlattenedConfig,
492    ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
493        let mut result = Vec::new();
494
495        for (qid, upstream_config) in &flattened.upstreams {
496            let mut config_with_id = upstream_config.clone();
497            config_with_id.id = qid.canonical();
498
499            match UpstreamPool::new(config_with_id).await {
500                Ok(pool) => {
501                    let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
502                    result.push((qid.clone(), Arc::new(pool), is_exported));
503                }
504                Err(e) => {
505                    error!(
506                        "Failed to create scoped upstream pool {}: {}",
507                        qid.canonical(),
508                        e
509                    );
510                }
511            }
512        }
513
514        result
515    }
516
517    /// Initialize route-specific components (error handlers, validators, static servers)
518    async fn initialize_route_components(
519        config: &Config,
520    ) -> Result<(
521        Registry<ErrorHandler>,
522        Registry<SchemaValidator>,
523        Registry<StaticFileServer>,
524    )> {
525        let mut error_handlers_map = HashMap::new();
526        let mut validators_map = HashMap::new();
527        let mut static_servers_map = HashMap::new();
528
529        for route in &config.routes {
530            info!(
531                "Initializing components for route: {} with service type: {:?}",
532                route.id, route.service_type
533            );
534
535            // Initialize error handler for each route
536            if let Some(ref error_config) = route.error_pages {
537                let handler =
538                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
539                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
540                debug!("Initialized error handler for route: {}", route.id);
541            } else {
542                // Use default error handler for the service type
543                let handler = ErrorHandler::new(route.service_type.clone(), None);
544                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
545            }
546
547            // Initialize schema validator for API routes
548            if route.service_type == sentinel_config::ServiceType::Api {
549                if let Some(ref api_schema) = route.api_schema {
550                    match SchemaValidator::new(api_schema.clone()) {
551                        Ok(validator) => {
552                            validators_map.insert(route.id.clone(), Arc::new(validator));
553                            info!("Initialized schema validator for route: {}", route.id);
554                        }
555                        Err(e) => {
556                            warn!(
557                                "Failed to initialize schema validator for route {}: {}",
558                                route.id, e
559                            );
560                        }
561                    }
562                }
563            }
564
565            // Initialize static file server for static routes
566            if route.service_type == sentinel_config::ServiceType::Static {
567                if let Some(ref static_config) = route.static_files {
568                    let server = StaticFileServer::new(static_config.clone());
569                    static_servers_map.insert(route.id.clone(), Arc::new(server));
570                    info!("Initialized static file server for route: {}", route.id);
571                } else {
572                    warn!(
573                        "Static route {} has no static_files configuration",
574                        route.id
575                    );
576                }
577            }
578        }
579
580        Ok((
581            Registry::from_map(error_handlers_map),
582            Registry::from_map(validators_map),
583            Registry::from_map(static_servers_map),
584        ))
585    }
586
587    /// Get or generate trace ID from session
588    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
589        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
590    }
591
592    /// Initialize rate limiters from configuration
593    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
594        use sentinel_config::RateLimitAction;
595
596        // Create manager with global rate limit if configured
597        let manager = if let Some(ref global) = config.rate_limits.global {
598            info!(
599                max_rps = global.max_rps,
600                burst = global.burst,
601                key = ?global.key,
602                "Initializing global rate limiter"
603            );
604            RateLimitManager::with_global_limit(global.max_rps, global.burst)
605        } else {
606            RateLimitManager::new()
607        };
608
609        for route in &config.routes {
610            // Check for rate limit in route policies
611            if let Some(ref rate_limit) = route.policies.rate_limit {
612                let rl_config = RateLimitConfig {
613                    max_rps: rate_limit.requests_per_second,
614                    burst: rate_limit.burst,
615                    key: rate_limit.key.clone(),
616                    action: RateLimitAction::Reject,
617                    status_code: 429,
618                    message: None,
619                    backend: sentinel_config::RateLimitBackend::Local,
620                    max_delay_ms: 5000, // Default for policy-based rate limits
621                };
622                manager.register_route(&route.id, rl_config);
623                info!(
624                    route_id = %route.id,
625                    max_rps = rate_limit.requests_per_second,
626                    burst = rate_limit.burst,
627                    key = ?rate_limit.key,
628                    "Registered rate limiter for route"
629                );
630            }
631
632            // Also check for rate limit filters in the filter chain
633            for filter_id in &route.filters {
634                if let Some(filter_config) = config.filters.get(filter_id) {
635                    if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
636                    {
637                        let rl_config = RateLimitConfig {
638                            max_rps: rl_filter.max_rps,
639                            burst: rl_filter.burst,
640                            key: rl_filter.key.clone(),
641                            action: rl_filter.on_limit.clone(),
642                            status_code: rl_filter.status_code,
643                            message: rl_filter.limit_message.clone(),
644                            backend: rl_filter.backend.clone(),
645                            max_delay_ms: rl_filter.max_delay_ms,
646                        };
647                        manager.register_route(&route.id, rl_config);
648                        info!(
649                            route_id = %route.id,
650                            filter_id = %filter_id,
651                            max_rps = rl_filter.max_rps,
652                            backend = ?rl_filter.backend,
653                            "Registered rate limiter from filter for route"
654                        );
655                    }
656                }
657            }
658        }
659
660        if manager.route_count() > 0 {
661            info!(
662                route_count = manager.route_count(),
663                "Rate limiting initialized"
664            );
665        }
666
667        manager
668    }
669
670    /// Initialize inference rate limiters from configuration
671    ///
672    /// This creates token-based rate limiters for routes with `service-type "inference"`
673    /// and inference config blocks.
674    fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
675        let manager = InferenceRateLimitManager::new();
676
677        for route in &config.routes {
678            // Only initialize for inference service type routes with inference config
679            if route.service_type == sentinel_config::ServiceType::Inference {
680                if let Some(ref inference_config) = route.inference {
681                    manager.register_route(&route.id, inference_config);
682                }
683            }
684        }
685
686        if manager.route_count() > 0 {
687            info!(
688                route_count = manager.route_count(),
689                "Inference rate limiting initialized"
690            );
691        }
692
693        manager
694    }
695
696    /// Initialize cache manager from configuration
697    fn initialize_cache_manager(config: &Config) -> CacheManager {
698        let manager = CacheManager::new();
699
700        let mut enabled_count = 0;
701
702        for route in &config.routes {
703            // API routes: caching disabled by default (responses often dynamic)
704            if route.service_type == sentinel_config::ServiceType::Api {
705                let cache_config = CacheConfig {
706                    enabled: false, // Disabled until explicitly configured via KDL
707                    default_ttl_secs: 60,
708                    ..Default::default()
709                };
710                manager.register_route(&route.id, cache_config);
711            }
712
713            // Static routes: enable caching by default (assets are typically cacheable)
714            if route.service_type == sentinel_config::ServiceType::Static {
715                let cache_config = CacheConfig {
716                    enabled: true, // Enable by default for static routes
717                    default_ttl_secs: 3600,
718                    max_size_bytes: 50 * 1024 * 1024, // 50MB for static
719                    stale_while_revalidate_secs: 60,
720                    stale_if_error_secs: 300,
721                    ..Default::default()
722                };
723                manager.register_route(&route.id, cache_config);
724                enabled_count += 1;
725                info!(
726                    route_id = %route.id,
727                    default_ttl_secs = 3600,
728                    "HTTP caching enabled for static route"
729                );
730            }
731
732            // Web routes: disable by default (HTML often personalized)
733            if route.service_type == sentinel_config::ServiceType::Web {
734                let cache_config = CacheConfig {
735                    enabled: false, // Disabled until explicitly configured
736                    default_ttl_secs: 300,
737                    ..Default::default()
738                };
739                manager.register_route(&route.id, cache_config);
740            }
741        }
742
743        if enabled_count > 0 {
744            info!(enabled_routes = enabled_count, "HTTP caching initialized");
745        } else {
746            debug!("HTTP cache manager initialized (no routes with caching enabled)");
747        }
748
749        manager
750    }
751
752    /// Initialize geo filters from configuration
753    fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
754        let manager = GeoFilterManager::new();
755
756        for (filter_id, filter_config) in &config.filters {
757            if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
758                match manager.register_filter(filter_id, geo_filter.clone()) {
759                    Ok(_) => {
760                        info!(
761                            filter_id = %filter_id,
762                            database_path = %geo_filter.database_path,
763                            action = ?geo_filter.action,
764                            countries_count = geo_filter.countries.len(),
765                            "Registered geo filter"
766                        );
767                    }
768                    Err(e) => {
769                        error!(
770                            filter_id = %filter_id,
771                            error = %e,
772                            "Failed to register geo filter"
773                        );
774                    }
775                }
776            }
777        }
778
779        let filter_ids = manager.filter_ids();
780        if !filter_ids.is_empty() {
781            info!(
782                filter_count = filter_ids.len(),
783                filter_ids = ?filter_ids,
784                "GeoIP filtering initialized"
785            );
786        }
787
788        manager
789    }
790
791    /// Apply security headers to response
792    pub(super) fn apply_security_headers(
793        &self,
794        header: &mut ResponseHeader,
795    ) -> Result<(), Box<Error>> {
796        header.insert_header("X-Content-Type-Options", "nosniff")?;
797        header.insert_header("X-Frame-Options", "DENY")?;
798        header.insert_header("X-XSS-Protection", "1; mode=block")?;
799        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
800        header.remove_header("Server");
801        header.remove_header("X-Powered-By");
802        Ok(())
803    }
804
805    /// Spawn background task to periodically clean up idle rate limiters and expired geo caches
806    fn spawn_cleanup_task(
807        rate_limit_manager: Arc<RateLimitManager>,
808        geo_filter_manager: Arc<GeoFilterManager>,
809    ) {
810        // Cleanup interval: 5 minutes
811        const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
812
813        tokio::spawn(async move {
814            let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
815            // First tick completes immediately; skip it
816            interval.tick().await;
817
818            loop {
819                interval.tick().await;
820
821                // Clean up rate limiters (removes entries when pool exceeds max size)
822                rate_limit_manager.cleanup();
823
824                // Clean up expired geo filter caches
825                geo_filter_manager.clear_expired_caches();
826
827                debug!("Periodic cleanup completed");
828            }
829        });
830
831        info!(
832            interval_secs = CLEANUP_INTERVAL.as_secs(),
833            "Started periodic cleanup task"
834        );
835    }
836
837    /// Spawn background task to watch geo database files for changes
838    fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
839        let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
840
841        // Try to start watching
842        match watcher.start_watching() {
843            Ok(mut rx) => {
844                let watcher_clone = watcher.clone();
845                tokio::spawn(async move {
846                    // Debounce interval
847                    const DEBOUNCE_MS: u64 = 500;
848
849                    while let Some(path) = rx.recv().await {
850                        // Debounce rapid changes (e.g., temp file then rename)
851                        tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
852
853                        // Drain any additional events for the same path during debounce
854                        while rx.try_recv().is_ok() {}
855
856                        // Handle the change
857                        watcher_clone.handle_change(&path);
858                    }
859                });
860
861                info!("Started geo database file watcher");
862            }
863            Err(e) => {
864                warn!(
865                    error = %e,
866                    "Failed to start geo database file watcher, auto-reload disabled"
867                );
868            }
869        }
870    }
871}