sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod fallback;
12mod fallback_metrics;
13mod handlers;
14mod http_trait;
15mod model_routing;
16mod model_routing_metrics;
17
18pub use context::{FallbackReason, RequestContext};
19pub use fallback::{FallbackDecision, FallbackEvaluator};
20pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
21pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
22pub use model_routing_metrics::{
23    get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
24};
25
26use anyhow::{Context, Result};
27use parking_lot::RwLock;
28use pingora::http::ResponseHeader;
29use pingora::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use sentinel_common::ids::{QualifiedId, Scope};
37use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
38
39use crate::agents::AgentManager;
40use crate::app::AppState;
41use crate::builtin_handlers::BuiltinHandlerState;
42use crate::cache::{CacheConfig, CacheManager};
43use crate::errors::ErrorHandler;
44use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
45use crate::health::PassiveHealthChecker;
46use crate::http_helpers;
47use crate::inference::InferenceRateLimitManager;
48use crate::logging::{LogManager, SharedLogManager};
49use crate::rate_limit::{RateLimitConfig, RateLimitManager};
50use crate::reload::{
51    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
52};
53use crate::routing::RouteMatcher;
54use crate::scoped_routing::ScopedRouteMatcher;
55use crate::static_files::StaticFileServer;
56use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
57use crate::validation::SchemaValidator;
58
59use sentinel_common::TraceIdFormat;
60use sentinel_config::{Config, FlattenedConfig};
61
62/// Main proxy service implementing Pingora's ProxyHttp trait
63pub struct SentinelProxy {
64    /// Configuration manager with hot reload
65    pub config_manager: Arc<ConfigManager>,
66    /// Route matcher (global routes only, for backward compatibility)
67    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
68    /// Scoped route matcher (namespace/service aware)
69    pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
70    /// Upstream pools (keyed by upstream ID, global only)
71    pub(super) upstream_pools: Registry<UpstreamPool>,
72    /// Scoped upstream pools (namespace/service aware)
73    pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
74    /// Agent manager for external processing
75    pub(super) agent_manager: Arc<AgentManager>,
76    /// Passive health checker
77    pub(super) passive_health: Arc<PassiveHealthChecker>,
78    /// Metrics collector
79    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
80    /// Scoped metrics collector (with namespace/service labels)
81    pub(super) scoped_metrics: Arc<ScopedMetrics>,
82    /// Application state
83    pub(super) app_state: Arc<AppState>,
84    /// Graceful reload coordinator
85    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
86    /// Error handlers per route (keyed by route ID)
87    pub(super) error_handlers: Registry<ErrorHandler>,
88    /// API schema validators per route (keyed by route ID)
89    pub(super) validators: Registry<SchemaValidator>,
90    /// Static file servers per route (keyed by route ID)
91    pub(super) static_servers: Registry<StaticFileServer>,
92    /// Builtin handler state
93    pub(super) builtin_state: Arc<BuiltinHandlerState>,
94    /// Log manager for file-based logging
95    pub(super) log_manager: SharedLogManager,
96    /// Trace ID format for request tracing
97    pub(super) trace_id_format: TraceIdFormat,
98    /// Active health check runner
99    pub(super) health_check_runner: Arc<HealthCheckRunner>,
100    /// Rate limit manager
101    pub(super) rate_limit_manager: Arc<RateLimitManager>,
102    /// HTTP cache manager
103    pub(super) cache_manager: Arc<CacheManager>,
104    /// GeoIP filter manager
105    pub(super) geo_filter_manager: Arc<GeoFilterManager>,
106    /// Inference rate limit manager (token-based rate limiting for LLM/AI routes)
107    pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
108    /// Warmth tracker for cold model detection on inference routes
109    pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
110    /// Guardrail processor for semantic inspection (prompt injection, PII detection)
111    pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
112    /// ACME challenge manager for HTTP-01 challenge handling
113    /// Present only when ACME is configured for at least one listener
114    pub acme_challenges: Option<Arc<crate::acme::ChallengeManager>>,
115    /// ACME client for certificate management
116    /// Present only when ACME is configured
117    pub acme_client: Option<Arc<crate::acme::AcmeClient>>,
118}
119
120impl SentinelProxy {
121    /// Create new proxy instance
122    ///
123    /// If config_path is None, uses the embedded default configuration.
124    /// Note: Tracing must be initialized by the caller before calling this function.
125    pub async fn new(config_path: Option<&str>) -> Result<Self> {
126        info!("Starting Sentinel Proxy");
127
128        // Load initial configuration
129        let (config, effective_config_path) = match config_path {
130            Some(path) => {
131                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
132                (cfg, path.to_string())
133            }
134            None => {
135                let cfg = Config::default_embedded()
136                    .context("Failed to load embedded default configuration")?;
137                // Use a sentinel path to indicate embedded config
138                (cfg, "_embedded_".to_string())
139            }
140        };
141
142        config
143            .validate()
144            .context("Initial configuration validation failed")?;
145
146        // Configure global cache storage (must be done before cache is accessed)
147        if let Some(ref cache_config) = config.cache {
148            info!(
149                max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
150                backend = ?cache_config.backend,
151                "Configuring HTTP cache storage"
152            );
153            crate::cache::configure_cache(cache_config.clone());
154        }
155
156        // Create configuration manager
157        let config_manager =
158            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
159
160        // Add validators
161        config_manager.add_validator(Box::new(RouteValidator)).await;
162        config_manager
163            .add_validator(Box::new(UpstreamValidator))
164            .await;
165
166        // Create route matcher (global routes only)
167        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
168
169        // Flatten config for namespace/service resources
170        let flattened = config.flatten();
171
172        // Create scoped route matcher
173        let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
174            ScopedRouteMatcher::from_flattened(&flattened)
175                .await
176                .context("Failed to create scoped route matcher")?,
177        ));
178
179        // Create upstream pools and active health checkers (global only)
180        let mut pools = HashMap::new();
181        let mut health_check_runner = HealthCheckRunner::new();
182
183        for (upstream_id, upstream_config) in &config.upstreams {
184            let mut config_with_id = upstream_config.clone();
185            config_with_id.id = upstream_id.clone();
186            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
187            pools.insert(upstream_id.clone(), pool);
188
189            // Create active health checker if health check is configured
190            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
191                health_check_runner.add_checker(checker);
192            }
193        }
194        let upstream_pools = Registry::from_map(pools);
195
196        // Create scoped upstream pools from flattened config
197        let scoped_upstream_pools =
198            Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
199
200        let health_check_runner = Arc::new(health_check_runner);
201
202        // Create passive health checker
203        let passive_health = Arc::new(PassiveHealthChecker::new(
204            0.5,  // 50% failure rate threshold
205            100,  // Window size
206            None, // Will be linked to active health checkers
207        ));
208
209        // Create agent manager (per-agent queue isolation)
210        let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
211        agent_manager.initialize().await?;
212
213        // Create metrics collectors
214        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
215        let scoped_metrics = Arc::new(
216            ScopedMetrics::new().context("Failed to create scoped metrics collector")?,
217        );
218
219        // Create application state
220        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
221
222        // Create reload coordinator
223        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
224            Duration::from_secs(30), // Max drain time
225        ));
226
227        // Setup configuration reload subscription
228        Self::setup_reload_handler(
229            config_manager.clone(),
230            route_matcher.clone(),
231            upstream_pools.clone(),
232            scoped_route_matcher.clone(),
233            scoped_upstream_pools.clone(),
234        )
235        .await;
236
237        // Initialize service type components
238        let (error_handlers, validators, static_servers) =
239            Self::initialize_route_components(&config).await?;
240
241        // Create builtin handler state
242        let builtin_state = Arc::new(BuiltinHandlerState::new(
243            env!("CARGO_PKG_VERSION").to_string(),
244            app_state.instance_id.clone(),
245        ));
246
247        // Create log manager for file-based logging
248        let log_manager = match LogManager::new(&config.observability.logging) {
249            Ok(manager) => {
250                if manager.access_log_enabled() {
251                    info!("Access logging enabled");
252                }
253                if manager.error_log_enabled() {
254                    info!("Error logging enabled");
255                }
256                if manager.audit_log_enabled() {
257                    info!("Audit logging enabled");
258                }
259                Arc::new(manager)
260            }
261            Err(e) => {
262                warn!(
263                    "Failed to initialize log manager, file logging disabled: {}",
264                    e
265                );
266                Arc::new(LogManager::disabled())
267            }
268        };
269
270        // Register audit reload hook to log configuration changes
271        {
272            use crate::reload::AuditReloadHook;
273            let audit_hook = AuditReloadHook::new(log_manager.clone());
274            config_manager.add_hook(Box::new(audit_hook)).await;
275            debug!("Registered audit reload hook");
276        }
277
278        // Start active health check runner in background
279        if health_check_runner.checker_count() > 0 {
280            let runner = health_check_runner.clone();
281            tokio::spawn(async move {
282                runner.run().await;
283            });
284            info!(
285                "Started active health checking for {} upstreams",
286                health_check_runner.checker_count()
287            );
288        }
289
290        // Initialize rate limit manager
291        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
292
293        // Initialize inference rate limit manager (for token-based LLM rate limiting)
294        let inference_rate_limit_manager =
295            Arc::new(Self::initialize_inference_rate_limiters(&config));
296
297        // Initialize warmth tracker for cold model detection
298        let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
299
300        // Initialize guardrail processor for semantic inspection
301        let guardrail_processor =
302            Arc::new(crate::inference::GuardrailProcessor::new(agent_manager.clone()));
303
304        // Initialize geo filter manager
305        let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
306
307        // Start periodic cleanup task for rate limiters and geo caches
308        Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
309
310        // Start geo database file watcher for hot reload
311        Self::spawn_geo_database_watcher(geo_filter_manager.clone());
312
313        // Mark as ready
314        app_state.set_ready(true);
315
316        // Get trace ID format from config
317        let trace_id_format = config.server.trace_id_format;
318
319        // Initialize cache manager
320        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
321
322        // Initialize fallback metrics (best-effort, log warning if fails)
323        if let Err(e) = init_fallback_metrics() {
324            warn!("Failed to initialize fallback metrics: {}", e);
325        }
326
327        // Initialize model routing metrics (best-effort, log warning if fails)
328        if let Err(e) = init_model_routing_metrics() {
329            warn!("Failed to initialize model routing metrics: {}", e);
330        }
331
332        Ok(Self {
333            config_manager,
334            route_matcher,
335            scoped_route_matcher,
336            upstream_pools,
337            scoped_upstream_pools,
338            agent_manager,
339            passive_health,
340            metrics,
341            scoped_metrics,
342            app_state,
343            reload_coordinator,
344            error_handlers,
345            validators,
346            static_servers,
347            builtin_state,
348            log_manager,
349            trace_id_format,
350            health_check_runner,
351            rate_limit_manager,
352            cache_manager,
353            geo_filter_manager,
354            inference_rate_limit_manager,
355            warmth_tracker,
356            guardrail_processor,
357            // ACME challenge manager - initialized later if ACME is configured
358            acme_challenges: None,
359            acme_client: None,
360        })
361    }
362
363    /// Setup the configuration reload handler
364    async fn setup_reload_handler(
365        config_manager: Arc<ConfigManager>,
366        route_matcher: Arc<RwLock<RouteMatcher>>,
367        upstream_pools: Registry<UpstreamPool>,
368        scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
369        scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
370    ) {
371        let mut reload_rx = config_manager.subscribe();
372        let config_manager_clone = config_manager.clone();
373
374        tokio::spawn(async move {
375            while let Ok(event) = reload_rx.recv().await {
376                if let ReloadEvent::Applied { .. } = event {
377                    // Reload routes and upstreams
378                    let new_config = config_manager_clone.current();
379                    let flattened = new_config.flatten();
380
381                    // Update route matcher (sync parking_lot::RwLock)
382                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
383                        *route_matcher.write() = new_matcher;
384                        info!("Global routes reloaded successfully");
385                    }
386
387                    // Update scoped route matcher
388                    if let Err(e) = scoped_route_matcher
389                        .write()
390                        .await
391                        .load_from_flattened(&flattened)
392                        .await
393                    {
394                        error!("Failed to reload scoped routes: {}", e);
395                    } else {
396                        info!(
397                            "Scoped routes reloaded ({} scopes)",
398                            scoped_route_matcher.read().await.scope_count().await
399                        );
400                    }
401
402                    // Update global upstream pools
403                    let mut new_pools = HashMap::new();
404                    for (upstream_id, upstream_config) in &new_config.upstreams {
405                        let mut config_with_id = upstream_config.clone();
406                        config_with_id.id = upstream_id.clone();
407                        match UpstreamPool::new(config_with_id).await {
408                            Ok(pool) => {
409                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
410                            }
411                            Err(e) => {
412                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
413                            }
414                        }
415                    }
416
417                    // Gracefully swap global pools
418                    let old_pools = upstream_pools.replace(new_pools).await;
419
420                    // Update scoped upstream pools
421                    let new_scoped_pools =
422                        Self::build_scoped_pools_list(&flattened).await;
423                    let old_scoped_pools = scoped_upstream_pools.replace_all(new_scoped_pools).await;
424
425                    info!(
426                        "Scoped upstream pools reloaded ({} pools)",
427                        scoped_upstream_pools.len().await
428                    );
429
430                    // Shutdown old pools after delay
431                    tokio::spawn(async move {
432                        tokio::time::sleep(Duration::from_secs(60)).await;
433
434                        // Shutdown old global pools
435                        for (name, pool) in old_pools {
436                            info!("Shutting down old global pool: {}", name);
437                            pool.shutdown().await;
438                        }
439
440                        // Shutdown old scoped pools
441                        for (name, pool) in old_scoped_pools {
442                            info!("Shutting down old scoped pool: {}", name);
443                            pool.shutdown().await;
444                        }
445                    });
446                }
447            }
448        });
449    }
450
451    /// Create scoped upstream pools from flattened config
452    async fn create_scoped_upstream_pools(
453        flattened: &FlattenedConfig,
454        health_check_runner: &mut HealthCheckRunner,
455    ) -> Result<ScopedRegistry<UpstreamPool>> {
456        let registry = ScopedRegistry::new();
457
458        for (qid, upstream_config) in &flattened.upstreams {
459            let mut config_with_id = upstream_config.clone();
460            config_with_id.id = qid.canonical();
461
462            let pool = Arc::new(
463                UpstreamPool::new(config_with_id.clone())
464                    .await
465                    .with_context(|| format!("Failed to create upstream pool '{}'", qid.canonical()))?,
466            );
467
468            // Track exports
469            let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
470
471            if is_exported {
472                registry.insert_exported(qid.clone(), pool).await;
473            } else {
474                registry.insert(qid.clone(), pool).await;
475            }
476
477            // Create active health checker if configured
478            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
479                health_check_runner.add_checker(checker);
480            }
481
482            debug!(
483                upstream_id = %qid.canonical(),
484                scope = ?qid.scope,
485                exported = is_exported,
486                "Created scoped upstream pool"
487            );
488        }
489
490        info!(
491            "Created {} scoped upstream pools",
492            registry.len().await
493        );
494
495        Ok(registry)
496    }
497
498    /// Build list of scoped pools for atomic replacement
499    async fn build_scoped_pools_list(
500        flattened: &FlattenedConfig,
501    ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
502        let mut result = Vec::new();
503
504        for (qid, upstream_config) in &flattened.upstreams {
505            let mut config_with_id = upstream_config.clone();
506            config_with_id.id = qid.canonical();
507
508            match UpstreamPool::new(config_with_id).await {
509                Ok(pool) => {
510                    let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
511                    result.push((qid.clone(), Arc::new(pool), is_exported));
512                }
513                Err(e) => {
514                    error!(
515                        "Failed to create scoped upstream pool {}: {}",
516                        qid.canonical(),
517                        e
518                    );
519                }
520            }
521        }
522
523        result
524    }
525
526    /// Initialize route-specific components (error handlers, validators, static servers)
527    async fn initialize_route_components(
528        config: &Config,
529    ) -> Result<(
530        Registry<ErrorHandler>,
531        Registry<SchemaValidator>,
532        Registry<StaticFileServer>,
533    )> {
534        let mut error_handlers_map = HashMap::new();
535        let mut validators_map = HashMap::new();
536        let mut static_servers_map = HashMap::new();
537
538        for route in &config.routes {
539            info!(
540                "Initializing components for route: {} with service type: {:?}",
541                route.id, route.service_type
542            );
543
544            // Initialize error handler for each route
545            if let Some(ref error_config) = route.error_pages {
546                let handler =
547                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
548                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
549                debug!("Initialized error handler for route: {}", route.id);
550            } else {
551                // Use default error handler for the service type
552                let handler = ErrorHandler::new(route.service_type.clone(), None);
553                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
554            }
555
556            // Initialize schema validator for API routes
557            if route.service_type == sentinel_config::ServiceType::Api {
558                if let Some(ref api_schema) = route.api_schema {
559                    match SchemaValidator::new(api_schema.clone()) {
560                        Ok(validator) => {
561                            validators_map.insert(route.id.clone(), Arc::new(validator));
562                            info!("Initialized schema validator for route: {}", route.id);
563                        }
564                        Err(e) => {
565                            warn!(
566                                "Failed to initialize schema validator for route {}: {}",
567                                route.id, e
568                            );
569                        }
570                    }
571                }
572            }
573
574            // Initialize static file server for static routes
575            if route.service_type == sentinel_config::ServiceType::Static {
576                if let Some(ref static_config) = route.static_files {
577                    let server = StaticFileServer::new(static_config.clone());
578                    static_servers_map.insert(route.id.clone(), Arc::new(server));
579                    info!("Initialized static file server for route: {}", route.id);
580                } else {
581                    warn!(
582                        "Static route {} has no static_files configuration",
583                        route.id
584                    );
585                }
586            }
587        }
588
589        Ok((
590            Registry::from_map(error_handlers_map),
591            Registry::from_map(validators_map),
592            Registry::from_map(static_servers_map),
593        ))
594    }
595
596    /// Get or generate trace ID from session
597    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
598        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
599    }
600
601    /// Initialize rate limiters from configuration
602    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
603        use sentinel_config::RateLimitAction;
604
605        // Create manager with global rate limit if configured
606        let manager = if let Some(ref global) = config.rate_limits.global {
607            info!(
608                max_rps = global.max_rps,
609                burst = global.burst,
610                key = ?global.key,
611                "Initializing global rate limiter"
612            );
613            RateLimitManager::with_global_limit(global.max_rps, global.burst)
614        } else {
615            RateLimitManager::new()
616        };
617
618        for route in &config.routes {
619            // Check for rate limit in route policies
620            if let Some(ref rate_limit) = route.policies.rate_limit {
621                let rl_config = RateLimitConfig {
622                    max_rps: rate_limit.requests_per_second,
623                    burst: rate_limit.burst,
624                    key: rate_limit.key.clone(),
625                    action: RateLimitAction::Reject,
626                    status_code: 429,
627                    message: None,
628                    backend: sentinel_config::RateLimitBackend::Local,
629                    max_delay_ms: 5000, // Default for policy-based rate limits
630                };
631                manager.register_route(&route.id, rl_config);
632                info!(
633                    route_id = %route.id,
634                    max_rps = rate_limit.requests_per_second,
635                    burst = rate_limit.burst,
636                    key = ?rate_limit.key,
637                    "Registered rate limiter for route"
638                );
639            }
640
641            // Also check for rate limit filters in the filter chain
642            for filter_id in &route.filters {
643                if let Some(filter_config) = config.filters.get(filter_id) {
644                    if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
645                    {
646                        let rl_config = RateLimitConfig {
647                            max_rps: rl_filter.max_rps,
648                            burst: rl_filter.burst,
649                            key: rl_filter.key.clone(),
650                            action: rl_filter.on_limit.clone(),
651                            status_code: rl_filter.status_code,
652                            message: rl_filter.limit_message.clone(),
653                            backend: rl_filter.backend.clone(),
654                            max_delay_ms: rl_filter.max_delay_ms,
655                        };
656                        manager.register_route(&route.id, rl_config);
657                        info!(
658                            route_id = %route.id,
659                            filter_id = %filter_id,
660                            max_rps = rl_filter.max_rps,
661                            backend = ?rl_filter.backend,
662                            "Registered rate limiter from filter for route"
663                        );
664                    }
665                }
666            }
667        }
668
669        if manager.route_count() > 0 {
670            info!(
671                route_count = manager.route_count(),
672                "Rate limiting initialized"
673            );
674        }
675
676        manager
677    }
678
679    /// Initialize inference rate limiters from configuration
680    ///
681    /// This creates token-based rate limiters for routes with `service-type "inference"`
682    /// and inference config blocks.
683    fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
684        let manager = InferenceRateLimitManager::new();
685
686        for route in &config.routes {
687            // Only initialize for inference service type routes with inference config
688            if route.service_type == sentinel_config::ServiceType::Inference {
689                if let Some(ref inference_config) = route.inference {
690                    manager.register_route(&route.id, inference_config);
691                }
692            }
693        }
694
695        if manager.route_count() > 0 {
696            info!(
697                route_count = manager.route_count(),
698                "Inference rate limiting initialized"
699            );
700        }
701
702        manager
703    }
704
705    /// Initialize cache manager from configuration
706    fn initialize_cache_manager(config: &Config) -> CacheManager {
707        let manager = CacheManager::new();
708
709        let mut enabled_count = 0;
710
711        for route in &config.routes {
712            // API routes: caching disabled by default (responses often dynamic)
713            if route.service_type == sentinel_config::ServiceType::Api {
714                let cache_config = CacheConfig {
715                    enabled: false, // Disabled until explicitly configured via KDL
716                    default_ttl_secs: 60,
717                    ..Default::default()
718                };
719                manager.register_route(&route.id, cache_config);
720            }
721
722            // Static routes: enable caching by default (assets are typically cacheable)
723            if route.service_type == sentinel_config::ServiceType::Static {
724                let cache_config = CacheConfig {
725                    enabled: true, // Enable by default for static routes
726                    default_ttl_secs: 3600,
727                    max_size_bytes: 50 * 1024 * 1024, // 50MB for static
728                    stale_while_revalidate_secs: 60,
729                    stale_if_error_secs: 300,
730                    ..Default::default()
731                };
732                manager.register_route(&route.id, cache_config);
733                enabled_count += 1;
734                info!(
735                    route_id = %route.id,
736                    default_ttl_secs = 3600,
737                    "HTTP caching enabled for static route"
738                );
739            }
740
741            // Web routes: disable by default (HTML often personalized)
742            if route.service_type == sentinel_config::ServiceType::Web {
743                let cache_config = CacheConfig {
744                    enabled: false, // Disabled until explicitly configured
745                    default_ttl_secs: 300,
746                    ..Default::default()
747                };
748                manager.register_route(&route.id, cache_config);
749            }
750        }
751
752        if enabled_count > 0 {
753            info!(enabled_routes = enabled_count, "HTTP caching initialized");
754        } else {
755            debug!("HTTP cache manager initialized (no routes with caching enabled)");
756        }
757
758        manager
759    }
760
761    /// Initialize geo filters from configuration
762    fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
763        let manager = GeoFilterManager::new();
764
765        for (filter_id, filter_config) in &config.filters {
766            if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
767                match manager.register_filter(filter_id, geo_filter.clone()) {
768                    Ok(_) => {
769                        info!(
770                            filter_id = %filter_id,
771                            database_path = %geo_filter.database_path,
772                            action = ?geo_filter.action,
773                            countries_count = geo_filter.countries.len(),
774                            "Registered geo filter"
775                        );
776                    }
777                    Err(e) => {
778                        error!(
779                            filter_id = %filter_id,
780                            error = %e,
781                            "Failed to register geo filter"
782                        );
783                    }
784                }
785            }
786        }
787
788        let filter_ids = manager.filter_ids();
789        if !filter_ids.is_empty() {
790            info!(
791                filter_count = filter_ids.len(),
792                filter_ids = ?filter_ids,
793                "GeoIP filtering initialized"
794            );
795        }
796
797        manager
798    }
799
800    /// Apply security headers to response
801    pub(super) fn apply_security_headers(
802        &self,
803        header: &mut ResponseHeader,
804    ) -> Result<(), Box<Error>> {
805        header.insert_header("X-Content-Type-Options", "nosniff")?;
806        header.insert_header("X-Frame-Options", "DENY")?;
807        header.insert_header("X-XSS-Protection", "1; mode=block")?;
808        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
809        header.remove_header("Server");
810        header.remove_header("X-Powered-By");
811        Ok(())
812    }
813
814    /// Spawn background task to periodically clean up idle rate limiters and expired geo caches
815    fn spawn_cleanup_task(
816        rate_limit_manager: Arc<RateLimitManager>,
817        geo_filter_manager: Arc<GeoFilterManager>,
818    ) {
819        // Cleanup interval: 5 minutes
820        const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
821
822        tokio::spawn(async move {
823            let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
824            // First tick completes immediately; skip it
825            interval.tick().await;
826
827            loop {
828                interval.tick().await;
829
830                // Clean up rate limiters (removes entries when pool exceeds max size)
831                rate_limit_manager.cleanup();
832
833                // Clean up expired geo filter caches
834                geo_filter_manager.clear_expired_caches();
835
836                debug!("Periodic cleanup completed");
837            }
838        });
839
840        info!(
841            interval_secs = CLEANUP_INTERVAL.as_secs(),
842            "Started periodic cleanup task"
843        );
844    }
845
846    /// Spawn background task to watch geo database files for changes
847    fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
848        let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
849
850        // Try to start watching
851        match watcher.start_watching() {
852            Ok(mut rx) => {
853                let watcher_clone = watcher.clone();
854                tokio::spawn(async move {
855                    // Debounce interval
856                    const DEBOUNCE_MS: u64 = 500;
857
858                    while let Some(path) = rx.recv().await {
859                        // Debounce rapid changes (e.g., temp file then rename)
860                        tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
861
862                        // Drain any additional events for the same path during debounce
863                        while rx.try_recv().is_ok() {}
864
865                        // Handle the change
866                        watcher_clone.handle_change(&path);
867                    }
868                });
869
870                info!("Started geo database file watcher");
871            }
872            Err(e) => {
873                warn!(
874                    error = %e,
875                    "Failed to start geo database file watcher, auto-reload disabled"
876                );
877            }
878        }
879    }
880}