Skip to main content

sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod fallback;
12mod fallback_metrics;
13mod handlers;
14mod http_trait;
15mod model_routing;
16mod model_routing_metrics;
17
18pub use context::{FallbackReason, RequestContext};
19pub use fallback::{FallbackDecision, FallbackEvaluator};
20pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
21pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
22pub use model_routing_metrics::{
23    get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
24};
25
26use anyhow::{Context, Result};
27use parking_lot::RwLock;
28use pingora::http::ResponseHeader;
29use pingora::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use sentinel_common::ids::{QualifiedId, Scope};
37use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
38
39use crate::agents::AgentManager;
40use crate::app::AppState;
41use crate::builtin_handlers::BuiltinHandlerState;
42use crate::cache::{CacheConfig, CacheManager};
43use crate::errors::ErrorHandler;
44use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
45use crate::health::PassiveHealthChecker;
46use crate::http_helpers;
47use crate::inference::InferenceRateLimitManager;
48use crate::logging::{LogManager, SharedLogManager};
49use crate::rate_limit::{RateLimitConfig, RateLimitManager};
50use crate::reload::{
51    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
52};
53use crate::routing::RouteMatcher;
54use crate::scoped_routing::ScopedRouteMatcher;
55use crate::static_files::StaticFileServer;
56use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
57use crate::validation::SchemaValidator;
58
59use sentinel_common::TraceIdFormat;
60use sentinel_config::{Config, FlattenedConfig};
61
62/// Main proxy service implementing Pingora's ProxyHttp trait
63pub struct SentinelProxy {
64    /// Configuration manager with hot reload
65    pub config_manager: Arc<ConfigManager>,
66    /// Route matcher (global routes only, for backward compatibility)
67    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
68    /// Scoped route matcher (namespace/service aware)
69    pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
70    /// Upstream pools (keyed by upstream ID, global only)
71    pub(super) upstream_pools: Registry<UpstreamPool>,
72    /// Scoped upstream pools (namespace/service aware)
73    pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
74    /// Agent manager for external processing
75    pub(super) agent_manager: Arc<AgentManager>,
76    /// Passive health checker
77    pub(super) passive_health: Arc<PassiveHealthChecker>,
78    /// Metrics collector
79    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
80    /// Scoped metrics collector (with namespace/service labels)
81    pub(super) scoped_metrics: Arc<ScopedMetrics>,
82    /// Application state
83    pub(super) app_state: Arc<AppState>,
84    /// Graceful reload coordinator
85    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
86    /// Error handlers per route (keyed by route ID)
87    pub(super) error_handlers: Registry<ErrorHandler>,
88    /// API schema validators per route (keyed by route ID)
89    pub(super) validators: Registry<SchemaValidator>,
90    /// Static file servers per route (keyed by route ID)
91    pub(super) static_servers: Registry<StaticFileServer>,
92    /// Builtin handler state
93    pub(super) builtin_state: Arc<BuiltinHandlerState>,
94    /// Log manager for file-based logging
95    pub(super) log_manager: SharedLogManager,
96    /// Trace ID format for request tracing
97    pub(super) trace_id_format: TraceIdFormat,
98    /// Active health check runner
99    pub(super) health_check_runner: Arc<HealthCheckRunner>,
100    /// Rate limit manager
101    pub(super) rate_limit_manager: Arc<RateLimitManager>,
102    /// HTTP cache manager
103    pub(super) cache_manager: Arc<CacheManager>,
104    /// GeoIP filter manager
105    pub(super) geo_filter_manager: Arc<GeoFilterManager>,
106    /// Inference rate limit manager (token-based rate limiting for LLM/AI routes)
107    pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
108    /// Warmth tracker for cold model detection on inference routes
109    pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
110    /// Guardrail processor for semantic inspection (prompt injection, PII detection)
111    pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
112    /// ACME challenge manager for HTTP-01 challenge handling
113    /// Present only when ACME is configured for at least one listener
114    pub acme_challenges: Option<Arc<crate::acme::ChallengeManager>>,
115    /// ACME client for certificate management
116    /// Present only when ACME is configured
117    pub acme_client: Option<Arc<crate::acme::AcmeClient>>,
118}
119
120impl SentinelProxy {
121    /// Create new proxy instance
122    ///
123    /// If config_path is None, uses the embedded default configuration.
124    /// Note: Tracing must be initialized by the caller before calling this function.
125    pub async fn new(config_path: Option<&str>) -> Result<Self> {
126        info!("Starting Sentinel Proxy");
127
128        // Load initial configuration
129        let (config, effective_config_path) = match config_path {
130            Some(path) => {
131                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
132                (cfg, path.to_string())
133            }
134            None => {
135                let cfg = Config::default_embedded()
136                    .context("Failed to load embedded default configuration")?;
137                // Use a sentinel path to indicate embedded config
138                (cfg, "_embedded_".to_string())
139            }
140        };
141
142        config
143            .validate()
144            .context("Initial configuration validation failed")?;
145
146        // Configure global cache storage (must be done before cache is accessed)
147        if let Some(ref cache_config) = config.cache {
148            info!(
149                max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
150                backend = ?cache_config.backend,
151                "Configuring HTTP cache storage"
152            );
153            crate::cache::configure_cache(cache_config.clone());
154        }
155
156        // Create configuration manager
157        let config_manager =
158            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
159
160        // Add validators
161        config_manager.add_validator(Box::new(RouteValidator)).await;
162        config_manager
163            .add_validator(Box::new(UpstreamValidator))
164            .await;
165
166        // Create route matcher (global routes only)
167        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
168
169        // Flatten config for namespace/service resources
170        let flattened = config.flatten();
171
172        // Create scoped route matcher
173        let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
174            ScopedRouteMatcher::from_flattened(&flattened)
175                .await
176                .context("Failed to create scoped route matcher")?,
177        ));
178
179        // Create upstream pools and active health checkers (global only)
180        let mut pools = HashMap::new();
181        let mut health_check_runner = HealthCheckRunner::new();
182
183        for (upstream_id, upstream_config) in &config.upstreams {
184            let mut config_with_id = upstream_config.clone();
185            config_with_id.id = upstream_id.clone();
186            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
187            pools.insert(upstream_id.clone(), pool);
188
189            // Create active health checker if health check is configured
190            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
191                health_check_runner.add_checker(checker);
192            }
193        }
194        let upstream_pools = Registry::from_map(pools);
195
196        // Create scoped upstream pools from flattened config
197        let scoped_upstream_pools =
198            Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
199
200        let health_check_runner = Arc::new(health_check_runner);
201
202        // Create passive health checker
203        let passive_health = Arc::new(PassiveHealthChecker::new(
204            0.5,  // 50% failure rate threshold
205            100,  // Window size
206            None, // Will be linked to active health checkers
207        ));
208
209        // Create agent manager (per-agent queue isolation)
210        let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
211        agent_manager.initialize().await?;
212
213        // Create metrics collectors
214        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
215        let scoped_metrics =
216            Arc::new(ScopedMetrics::new().context("Failed to create scoped metrics collector")?);
217
218        // Create application state
219        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
220
221        // Create reload coordinator
222        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
223            Duration::from_secs(30), // Max drain time
224        ));
225
226        // Setup configuration reload subscription
227        Self::setup_reload_handler(
228            config_manager.clone(),
229            route_matcher.clone(),
230            upstream_pools.clone(),
231            scoped_route_matcher.clone(),
232            scoped_upstream_pools.clone(),
233        )
234        .await;
235
236        // Initialize service type components
237        let (error_handlers, validators, static_servers) =
238            Self::initialize_route_components(&config).await?;
239
240        // Create builtin handler state
241        let builtin_state = Arc::new(BuiltinHandlerState::new(
242            env!("CARGO_PKG_VERSION").to_string(),
243            app_state.instance_id.clone(),
244        ));
245
246        // Create log manager for file-based logging
247        let log_manager = match LogManager::new(&config.observability.logging) {
248            Ok(manager) => {
249                if manager.access_log_enabled() {
250                    info!("Access logging enabled");
251                }
252                if manager.error_log_enabled() {
253                    info!("Error logging enabled");
254                }
255                if manager.audit_log_enabled() {
256                    info!("Audit logging enabled");
257                }
258                Arc::new(manager)
259            }
260            Err(e) => {
261                warn!(
262                    "Failed to initialize log manager, file logging disabled: {}",
263                    e
264                );
265                Arc::new(LogManager::disabled())
266            }
267        };
268
269        // Register audit reload hook to log configuration changes
270        {
271            use crate::reload::AuditReloadHook;
272            let audit_hook = AuditReloadHook::new(log_manager.clone());
273            config_manager.add_hook(Box::new(audit_hook)).await;
274            debug!("Registered audit reload hook");
275        }
276
277        // Start active health check runner in background
278        if health_check_runner.checker_count() > 0 {
279            let runner = health_check_runner.clone();
280            tokio::spawn(async move {
281                runner.run().await;
282            });
283            info!(
284                "Started active health checking for {} upstreams",
285                health_check_runner.checker_count()
286            );
287        }
288
289        // Initialize rate limit manager
290        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
291
292        // Initialize inference rate limit manager (for token-based LLM rate limiting)
293        let inference_rate_limit_manager =
294            Arc::new(Self::initialize_inference_rate_limiters(&config));
295
296        // Initialize warmth tracker for cold model detection
297        let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
298
299        // Initialize guardrail processor for semantic inspection
300        let guardrail_processor = Arc::new(crate::inference::GuardrailProcessor::new(
301            agent_manager.clone(),
302        ));
303
304        // Initialize geo filter manager
305        let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
306
307        // Start periodic cleanup task for rate limiters and geo caches
308        Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
309
310        // Start geo database file watcher for hot reload
311        Self::spawn_geo_database_watcher(geo_filter_manager.clone());
312
313        // Mark as ready
314        app_state.set_ready(true);
315
316        // Get trace ID format from config
317        let trace_id_format = config.server.trace_id_format;
318
319        // Initialize cache manager
320        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
321
322        // Initialize fallback metrics (best-effort, log warning if fails)
323        if let Err(e) = init_fallback_metrics() {
324            warn!("Failed to initialize fallback metrics: {}", e);
325        }
326
327        // Initialize model routing metrics (best-effort, log warning if fails)
328        if let Err(e) = init_model_routing_metrics() {
329            warn!("Failed to initialize model routing metrics: {}", e);
330        }
331
332        Ok(Self {
333            config_manager,
334            route_matcher,
335            scoped_route_matcher,
336            upstream_pools,
337            scoped_upstream_pools,
338            agent_manager,
339            passive_health,
340            metrics,
341            scoped_metrics,
342            app_state,
343            reload_coordinator,
344            error_handlers,
345            validators,
346            static_servers,
347            builtin_state,
348            log_manager,
349            trace_id_format,
350            health_check_runner,
351            rate_limit_manager,
352            cache_manager,
353            geo_filter_manager,
354            inference_rate_limit_manager,
355            warmth_tracker,
356            guardrail_processor,
357            // ACME challenge manager - initialized later if ACME is configured
358            acme_challenges: None,
359            acme_client: None,
360        })
361    }
362
363    /// Setup the configuration reload handler
364    async fn setup_reload_handler(
365        config_manager: Arc<ConfigManager>,
366        route_matcher: Arc<RwLock<RouteMatcher>>,
367        upstream_pools: Registry<UpstreamPool>,
368        scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
369        scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
370    ) {
371        let mut reload_rx = config_manager.subscribe();
372        let config_manager_clone = config_manager.clone();
373
374        tokio::spawn(async move {
375            while let Ok(event) = reload_rx.recv().await {
376                if let ReloadEvent::Applied { .. } = event {
377                    // Reload routes and upstreams
378                    let new_config = config_manager_clone.current();
379                    let flattened = new_config.flatten();
380
381                    // Update route matcher (sync parking_lot::RwLock)
382                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
383                        *route_matcher.write() = new_matcher;
384                        info!("Global routes reloaded successfully");
385                    }
386
387                    // Update scoped route matcher
388                    if let Err(e) = scoped_route_matcher
389                        .write()
390                        .await
391                        .load_from_flattened(&flattened)
392                        .await
393                    {
394                        error!("Failed to reload scoped routes: {}", e);
395                    } else {
396                        info!(
397                            "Scoped routes reloaded ({} scopes)",
398                            scoped_route_matcher.read().await.scope_count().await
399                        );
400                    }
401
402                    // Update global upstream pools
403                    let mut new_pools = HashMap::new();
404                    for (upstream_id, upstream_config) in &new_config.upstreams {
405                        let mut config_with_id = upstream_config.clone();
406                        config_with_id.id = upstream_id.clone();
407                        match UpstreamPool::new(config_with_id).await {
408                            Ok(pool) => {
409                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
410                            }
411                            Err(e) => {
412                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
413                            }
414                        }
415                    }
416
417                    // Gracefully swap global pools
418                    let old_pools = upstream_pools.replace(new_pools).await;
419
420                    // Update scoped upstream pools
421                    let new_scoped_pools = Self::build_scoped_pools_list(&flattened).await;
422                    let old_scoped_pools =
423                        scoped_upstream_pools.replace_all(new_scoped_pools).await;
424
425                    info!(
426                        "Scoped upstream pools reloaded ({} pools)",
427                        scoped_upstream_pools.len().await
428                    );
429
430                    // Shutdown old pools after delay
431                    tokio::spawn(async move {
432                        tokio::time::sleep(Duration::from_secs(60)).await;
433
434                        // Shutdown old global pools
435                        for (name, pool) in old_pools {
436                            info!("Shutting down old global pool: {}", name);
437                            pool.shutdown().await;
438                        }
439
440                        // Shutdown old scoped pools
441                        for (name, pool) in old_scoped_pools {
442                            info!("Shutting down old scoped pool: {}", name);
443                            pool.shutdown().await;
444                        }
445                    });
446                }
447            }
448        });
449    }
450
451    /// Create scoped upstream pools from flattened config
452    async fn create_scoped_upstream_pools(
453        flattened: &FlattenedConfig,
454        health_check_runner: &mut HealthCheckRunner,
455    ) -> Result<ScopedRegistry<UpstreamPool>> {
456        let registry = ScopedRegistry::new();
457
458        for (qid, upstream_config) in &flattened.upstreams {
459            let mut config_with_id = upstream_config.clone();
460            config_with_id.id = qid.canonical();
461
462            let pool = Arc::new(
463                UpstreamPool::new(config_with_id.clone())
464                    .await
465                    .with_context(|| {
466                        format!("Failed to create upstream pool '{}'", qid.canonical())
467                    })?,
468            );
469
470            // Track exports
471            let is_exported = flattened
472                .exported_upstreams
473                .contains_key(&upstream_config.id);
474
475            if is_exported {
476                registry.insert_exported(qid.clone(), pool).await;
477            } else {
478                registry.insert(qid.clone(), pool).await;
479            }
480
481            // Create active health checker if configured
482            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
483                health_check_runner.add_checker(checker);
484            }
485
486            debug!(
487                upstream_id = %qid.canonical(),
488                scope = ?qid.scope,
489                exported = is_exported,
490                "Created scoped upstream pool"
491            );
492        }
493
494        info!("Created {} scoped upstream pools", registry.len().await);
495
496        Ok(registry)
497    }
498
499    /// Build list of scoped pools for atomic replacement
500    async fn build_scoped_pools_list(
501        flattened: &FlattenedConfig,
502    ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
503        let mut result = Vec::new();
504
505        for (qid, upstream_config) in &flattened.upstreams {
506            let mut config_with_id = upstream_config.clone();
507            config_with_id.id = qid.canonical();
508
509            match UpstreamPool::new(config_with_id).await {
510                Ok(pool) => {
511                    let is_exported = flattened
512                        .exported_upstreams
513                        .contains_key(&upstream_config.id);
514                    result.push((qid.clone(), Arc::new(pool), is_exported));
515                }
516                Err(e) => {
517                    error!(
518                        "Failed to create scoped upstream pool {}: {}",
519                        qid.canonical(),
520                        e
521                    );
522                }
523            }
524        }
525
526        result
527    }
528
529    /// Initialize route-specific components (error handlers, validators, static servers)
530    async fn initialize_route_components(
531        config: &Config,
532    ) -> Result<(
533        Registry<ErrorHandler>,
534        Registry<SchemaValidator>,
535        Registry<StaticFileServer>,
536    )> {
537        let mut error_handlers_map = HashMap::new();
538        let mut validators_map = HashMap::new();
539        let mut static_servers_map = HashMap::new();
540
541        for route in &config.routes {
542            info!(
543                "Initializing components for route: {} with service type: {:?}",
544                route.id, route.service_type
545            );
546
547            // Initialize error handler for each route
548            if let Some(ref error_config) = route.error_pages {
549                let handler =
550                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
551                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
552                debug!("Initialized error handler for route: {}", route.id);
553            } else {
554                // Use default error handler for the service type
555                let handler = ErrorHandler::new(route.service_type.clone(), None);
556                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
557            }
558
559            // Initialize schema validator for API routes
560            if route.service_type == sentinel_config::ServiceType::Api {
561                if let Some(ref api_schema) = route.api_schema {
562                    match SchemaValidator::new(api_schema.clone()) {
563                        Ok(validator) => {
564                            validators_map.insert(route.id.clone(), Arc::new(validator));
565                            info!("Initialized schema validator for route: {}", route.id);
566                        }
567                        Err(e) => {
568                            warn!(
569                                "Failed to initialize schema validator for route {}: {}",
570                                route.id, e
571                            );
572                        }
573                    }
574                }
575            }
576
577            // Initialize static file server for static routes
578            if route.service_type == sentinel_config::ServiceType::Static {
579                if let Some(ref static_config) = route.static_files {
580                    let server = StaticFileServer::new(static_config.clone());
581                    static_servers_map.insert(route.id.clone(), Arc::new(server));
582                    info!("Initialized static file server for route: {}", route.id);
583                } else {
584                    warn!(
585                        "Static route {} has no static_files configuration",
586                        route.id
587                    );
588                }
589            }
590        }
591
592        Ok((
593            Registry::from_map(error_handlers_map),
594            Registry::from_map(validators_map),
595            Registry::from_map(static_servers_map),
596        ))
597    }
598
599    /// Get or generate trace ID from session
600    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
601        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
602    }
603
604    /// Initialize rate limiters from configuration
605    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
606        use sentinel_config::RateLimitAction;
607
608        // Create manager with global rate limit if configured
609        let manager = if let Some(ref global) = config.rate_limits.global {
610            info!(
611                max_rps = global.max_rps,
612                burst = global.burst,
613                key = ?global.key,
614                "Initializing global rate limiter"
615            );
616            RateLimitManager::with_global_limit(global.max_rps, global.burst)
617        } else {
618            RateLimitManager::new()
619        };
620
621        for route in &config.routes {
622            // Check for rate limit in route policies
623            if let Some(ref rate_limit) = route.policies.rate_limit {
624                let rl_config = RateLimitConfig {
625                    max_rps: rate_limit.requests_per_second,
626                    burst: rate_limit.burst,
627                    key: rate_limit.key.clone(),
628                    action: RateLimitAction::Reject,
629                    status_code: 429,
630                    message: None,
631                    backend: sentinel_config::RateLimitBackend::Local,
632                    max_delay_ms: 5000, // Default for policy-based rate limits
633                };
634                manager.register_route(&route.id, rl_config);
635                info!(
636                    route_id = %route.id,
637                    max_rps = rate_limit.requests_per_second,
638                    burst = rate_limit.burst,
639                    key = ?rate_limit.key,
640                    "Registered rate limiter for route"
641                );
642            }
643
644            // Also check for rate limit filters in the filter chain
645            for filter_id in &route.filters {
646                if let Some(filter_config) = config.filters.get(filter_id) {
647                    if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
648                    {
649                        let rl_config = RateLimitConfig {
650                            max_rps: rl_filter.max_rps,
651                            burst: rl_filter.burst,
652                            key: rl_filter.key.clone(),
653                            action: rl_filter.on_limit.clone(),
654                            status_code: rl_filter.status_code,
655                            message: rl_filter.limit_message.clone(),
656                            backend: rl_filter.backend.clone(),
657                            max_delay_ms: rl_filter.max_delay_ms,
658                        };
659                        manager.register_route(&route.id, rl_config);
660                        info!(
661                            route_id = %route.id,
662                            filter_id = %filter_id,
663                            max_rps = rl_filter.max_rps,
664                            backend = ?rl_filter.backend,
665                            "Registered rate limiter from filter for route"
666                        );
667                    }
668                }
669            }
670        }
671
672        if manager.route_count() > 0 {
673            info!(
674                route_count = manager.route_count(),
675                "Rate limiting initialized"
676            );
677        }
678
679        manager
680    }
681
682    /// Initialize inference rate limiters from configuration
683    ///
684    /// This creates token-based rate limiters for routes with `service-type "inference"`
685    /// and inference config blocks.
686    fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
687        let manager = InferenceRateLimitManager::new();
688
689        for route in &config.routes {
690            // Only initialize for inference service type routes with inference config
691            if route.service_type == sentinel_config::ServiceType::Inference {
692                if let Some(ref inference_config) = route.inference {
693                    manager.register_route(&route.id, inference_config);
694                }
695            }
696        }
697
698        if manager.route_count() > 0 {
699            info!(
700                route_count = manager.route_count(),
701                "Inference rate limiting initialized"
702            );
703        }
704
705        manager
706    }
707
708    /// Initialize cache manager from configuration
709    fn initialize_cache_manager(config: &Config) -> CacheManager {
710        let manager = CacheManager::new();
711
712        let mut enabled_count = 0;
713
714        for route in &config.routes {
715            // API routes: caching disabled by default (responses often dynamic)
716            if route.service_type == sentinel_config::ServiceType::Api {
717                let cache_config = CacheConfig {
718                    enabled: false, // Disabled until explicitly configured via KDL
719                    default_ttl_secs: 60,
720                    ..Default::default()
721                };
722                manager.register_route(&route.id, cache_config);
723            }
724
725            // Static routes: enable caching by default (assets are typically cacheable)
726            if route.service_type == sentinel_config::ServiceType::Static {
727                let cache_config = CacheConfig {
728                    enabled: true, // Enable by default for static routes
729                    default_ttl_secs: 3600,
730                    max_size_bytes: 50 * 1024 * 1024, // 50MB for static
731                    stale_while_revalidate_secs: 60,
732                    stale_if_error_secs: 300,
733                    ..Default::default()
734                };
735                manager.register_route(&route.id, cache_config);
736                enabled_count += 1;
737                info!(
738                    route_id = %route.id,
739                    default_ttl_secs = 3600,
740                    "HTTP caching enabled for static route"
741                );
742            }
743
744            // Web routes: disable by default (HTML often personalized)
745            if route.service_type == sentinel_config::ServiceType::Web {
746                let cache_config = CacheConfig {
747                    enabled: false, // Disabled until explicitly configured
748                    default_ttl_secs: 300,
749                    ..Default::default()
750                };
751                manager.register_route(&route.id, cache_config);
752            }
753        }
754
755        if enabled_count > 0 {
756            info!(enabled_routes = enabled_count, "HTTP caching initialized");
757        } else {
758            debug!("HTTP cache manager initialized (no routes with caching enabled)");
759        }
760
761        manager
762    }
763
764    /// Initialize geo filters from configuration
765    fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
766        let manager = GeoFilterManager::new();
767
768        for (filter_id, filter_config) in &config.filters {
769            if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
770                match manager.register_filter(filter_id, geo_filter.clone()) {
771                    Ok(_) => {
772                        info!(
773                            filter_id = %filter_id,
774                            database_path = %geo_filter.database_path,
775                            action = ?geo_filter.action,
776                            countries_count = geo_filter.countries.len(),
777                            "Registered geo filter"
778                        );
779                    }
780                    Err(e) => {
781                        error!(
782                            filter_id = %filter_id,
783                            error = %e,
784                            "Failed to register geo filter"
785                        );
786                    }
787                }
788            }
789        }
790
791        let filter_ids = manager.filter_ids();
792        if !filter_ids.is_empty() {
793            info!(
794                filter_count = filter_ids.len(),
795                filter_ids = ?filter_ids,
796                "GeoIP filtering initialized"
797            );
798        }
799
800        manager
801    }
802
803    /// Apply security headers to response
804    pub(super) fn apply_security_headers(
805        &self,
806        header: &mut ResponseHeader,
807    ) -> Result<(), Box<Error>> {
808        header.insert_header("X-Content-Type-Options", "nosniff")?;
809        header.insert_header("X-Frame-Options", "DENY")?;
810        header.insert_header("X-XSS-Protection", "1; mode=block")?;
811        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
812        header.remove_header("Server");
813        header.remove_header("X-Powered-By");
814        Ok(())
815    }
816
817    /// Spawn background task to periodically clean up idle rate limiters and expired geo caches
818    fn spawn_cleanup_task(
819        rate_limit_manager: Arc<RateLimitManager>,
820        geo_filter_manager: Arc<GeoFilterManager>,
821    ) {
822        // Cleanup interval: 5 minutes
823        const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
824
825        tokio::spawn(async move {
826            let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
827            // First tick completes immediately; skip it
828            interval.tick().await;
829
830            loop {
831                interval.tick().await;
832
833                // Clean up rate limiters (removes entries when pool exceeds max size)
834                rate_limit_manager.cleanup();
835
836                // Clean up expired geo filter caches
837                geo_filter_manager.clear_expired_caches();
838
839                debug!("Periodic cleanup completed");
840            }
841        });
842
843        info!(
844            interval_secs = CLEANUP_INTERVAL.as_secs(),
845            "Started periodic cleanup task"
846        );
847    }
848
849    /// Spawn background task to watch geo database files for changes
850    fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
851        let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
852
853        // Try to start watching
854        match watcher.start_watching() {
855            Ok(mut rx) => {
856                let watcher_clone = watcher.clone();
857                tokio::spawn(async move {
858                    // Debounce interval
859                    const DEBOUNCE_MS: u64 = 500;
860
861                    while let Some(path) = rx.recv().await {
862                        // Debounce rapid changes (e.g., temp file then rename)
863                        tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
864
865                        // Drain any additional events for the same path during debounce
866                        while rx.try_recv().is_ok() {}
867
868                        // Handle the change
869                        watcher_clone.handle_change(&path);
870                    }
871                });
872
873                info!("Started geo database file watcher");
874            }
875            Err(e) => {
876                warn!(
877                    error = %e,
878                    "Failed to start geo database file watcher, auto-reload disabled"
879                );
880            }
881        }
882    }
883}