Skip to main content

grapsus_proxy/proxy/
mod.rs

1//! Grapsus Proxy Core Implementation
2//!
3//! This module contains the main GrapsusProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod fallback;
12mod fallback_metrics;
13pub(crate) mod filters;
14mod handlers;
15mod http_trait;
16mod model_routing;
17mod model_routing_metrics;
18
19pub use context::{FallbackReason, RequestContext};
20pub use fallback::{FallbackDecision, FallbackEvaluator};
21pub use fallback_metrics::{get_fallback_metrics, init_fallback_metrics, FallbackMetrics};
22pub use model_routing::{extract_model_from_headers, find_upstream_for_model, ModelRoutingResult};
23pub use model_routing_metrics::{
24    get_model_routing_metrics, init_model_routing_metrics, ModelRoutingMetrics,
25};
26
27use anyhow::{Context, Result};
28use parking_lot::RwLock;
29use pingora::http::ResponseHeader;
30use pingora::prelude::*;
31use std::collections::HashMap;
32use std::sync::Arc;
33use std::time::Duration;
34use tracing::{debug, error, info, warn};
35use uuid::Uuid;
36
37use grapsus_common::ids::{QualifiedId, Scope};
38use grapsus_common::{Registry, ScopedMetrics, ScopedRegistry};
39
40use crate::agents::AgentManager;
41use crate::app::AppState;
42use crate::builtin_handlers::BuiltinHandlerState;
43use crate::cache::{CacheConfig, CacheManager};
44use crate::errors::ErrorHandler;
45use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
46use crate::health::PassiveHealthChecker;
47use crate::http_helpers;
48use crate::inference::InferenceRateLimitManager;
49use crate::logging::{LogManager, SharedLogManager};
50use crate::rate_limit::{RateLimitConfig, RateLimitManager};
51use crate::reload::{
52    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
53};
54use crate::routing::RouteMatcher;
55use crate::scoped_routing::ScopedRouteMatcher;
56use crate::static_files::StaticFileServer;
57use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
58use crate::validation::SchemaValidator;
59
60use grapsus_common::TraceIdFormat;
61use grapsus_config::{Config, FlattenedConfig};
62
63/// Main proxy service implementing Pingora's ProxyHttp trait
64pub struct GrapsusProxy {
65    /// Configuration manager with hot reload
66    pub config_manager: Arc<ConfigManager>,
67    /// Route matcher (global routes only, for backward compatibility)
68    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
69    /// Scoped route matcher (namespace/service aware)
70    pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
71    /// Upstream pools (keyed by upstream ID, global only)
72    pub(super) upstream_pools: Registry<UpstreamPool>,
73    /// Scoped upstream pools (namespace/service aware)
74    pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
75    /// Agent manager for external processing
76    pub(super) agent_manager: Arc<AgentManager>,
77    /// Passive health checker
78    pub(super) passive_health: Arc<PassiveHealthChecker>,
79    /// Metrics collector
80    pub(super) metrics: Arc<grapsus_common::observability::RequestMetrics>,
81    /// Scoped metrics collector (with namespace/service labels)
82    pub(super) scoped_metrics: Arc<ScopedMetrics>,
83    /// Application state
84    pub(super) app_state: Arc<AppState>,
85    /// Graceful reload coordinator
86    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
87    /// Error handlers per route (keyed by route ID)
88    pub(super) error_handlers: Registry<ErrorHandler>,
89    /// API schema validators per route (keyed by route ID)
90    pub(super) validators: Registry<SchemaValidator>,
91    /// Static file servers per route (keyed by route ID)
92    pub(super) static_servers: Registry<StaticFileServer>,
93    /// Builtin handler state
94    pub(super) builtin_state: Arc<BuiltinHandlerState>,
95    /// Log manager for file-based logging
96    pub(super) log_manager: SharedLogManager,
97    /// Trace ID format for request tracing
98    pub(super) trace_id_format: TraceIdFormat,
99    /// Active health check runner
100    pub(super) health_check_runner: Arc<HealthCheckRunner>,
101    /// Rate limit manager
102    pub(super) rate_limit_manager: Arc<RateLimitManager>,
103    /// HTTP cache manager
104    pub(super) cache_manager: Arc<CacheManager>,
105    /// GeoIP filter manager
106    pub(super) geo_filter_manager: Arc<GeoFilterManager>,
107    /// Inference rate limit manager (token-based rate limiting for LLM/AI routes)
108    pub(super) inference_rate_limit_manager: Arc<InferenceRateLimitManager>,
109    /// Warmth tracker for cold model detection on inference routes
110    pub(super) warmth_tracker: Arc<crate::health::WarmthTracker>,
111    /// Guardrail processor for semantic inspection (prompt injection, PII detection)
112    pub(super) guardrail_processor: Arc<crate::inference::GuardrailProcessor>,
113    /// ACME challenge manager for HTTP-01 challenge handling
114    /// Present only when ACME is configured for at least one listener
115    pub acme_challenges: Option<Arc<crate::acme::ChallengeManager>>,
116    /// ACME client for certificate management
117    /// Present only when ACME is configured
118    pub acme_client: Option<Arc<crate::acme::AcmeClient>>,
119}
120
121impl GrapsusProxy {
122    /// Create new proxy instance
123    ///
124    /// If config_path is None, uses the embedded default configuration.
125    /// Note: Tracing must be initialized by the caller before calling this function.
126    pub async fn new(config_path: Option<&str>) -> Result<Self> {
127        info!("Starting Grapsus Proxy");
128
129        // Load initial configuration
130        let (config, effective_config_path) = match config_path {
131            Some(path) => {
132                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
133                (cfg, path.to_string())
134            }
135            None => {
136                let cfg = Config::default_embedded()
137                    .context("Failed to load embedded default configuration")?;
138                // Use a grapsus path to indicate embedded config
139                (cfg, "_embedded_".to_string())
140            }
141        };
142
143        config
144            .validate()
145            .context("Initial configuration validation failed")?;
146
147        // Configure global cache storage (must be done before cache is accessed)
148        if let Some(ref cache_config) = config.cache {
149            info!(
150                max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
151                backend = ?cache_config.backend,
152                "Configuring HTTP cache storage"
153            );
154            crate::cache::configure_cache(cache_config.clone());
155            crate::cache::init_disk_cache_state().await;
156        }
157
158        // Create configuration manager
159        let config_manager =
160            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
161
162        // Add validators
163        config_manager.add_validator(Box::new(RouteValidator)).await;
164        config_manager
165            .add_validator(Box::new(UpstreamValidator))
166            .await;
167
168        // Create route matcher (global routes only)
169        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
170
171        // Flatten config for namespace/service resources
172        let flattened = config.flatten();
173
174        // Create scoped route matcher
175        let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
176            ScopedRouteMatcher::from_flattened(&flattened)
177                .await
178                .context("Failed to create scoped route matcher")?,
179        ));
180
181        // Create upstream pools and active health checkers (global only)
182        let mut pools = HashMap::new();
183        let mut health_check_runner = HealthCheckRunner::new();
184
185        for (upstream_id, upstream_config) in &config.upstreams {
186            let mut config_with_id = upstream_config.clone();
187            config_with_id.id = upstream_id.clone();
188            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
189            pools.insert(upstream_id.clone(), pool);
190
191            // Create active health checker if health check is configured
192            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
193                health_check_runner.add_checker(checker);
194            }
195        }
196        let upstream_pools = Registry::from_map(pools);
197
198        // Create scoped upstream pools from flattened config
199        let scoped_upstream_pools =
200            Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
201
202        let health_check_runner = Arc::new(health_check_runner);
203
204        // Create passive health checker
205        let passive_health = Arc::new(PassiveHealthChecker::new(
206            0.5,  // 50% failure rate threshold
207            100,  // Window size
208            None, // Will be linked to active health checkers
209        ));
210
211        // Create agent manager (per-agent queue isolation)
212        let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
213        agent_manager.initialize().await?;
214
215        // Create metrics collectors
216        let metrics = Arc::new(grapsus_common::observability::RequestMetrics::new()?);
217        let scoped_metrics =
218            Arc::new(ScopedMetrics::new().context("Failed to create scoped metrics collector")?);
219
220        // Create application state
221        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
222
223        // Create reload coordinator
224        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
225            Duration::from_secs(30), // Max drain time
226        ));
227
228        // Setup configuration reload subscription
229        Self::setup_reload_handler(
230            config_manager.clone(),
231            route_matcher.clone(),
232            upstream_pools.clone(),
233            scoped_route_matcher.clone(),
234            scoped_upstream_pools.clone(),
235        )
236        .await;
237
238        // Initialize service type components
239        let (error_handlers, validators, static_servers) =
240            Self::initialize_route_components(&config).await?;
241
242        // Create builtin handler state
243        let builtin_state = Arc::new(BuiltinHandlerState::new(
244            env!("CARGO_PKG_VERSION").to_string(),
245            app_state.instance_id.clone(),
246        ));
247
248        // Create log manager for file-based logging
249        let log_manager = match LogManager::new(&config.observability.logging) {
250            Ok(manager) => {
251                if manager.access_log_enabled() {
252                    info!("Access logging enabled");
253                }
254                if manager.error_log_enabled() {
255                    info!("Error logging enabled");
256                }
257                if manager.audit_log_enabled() {
258                    info!("Audit logging enabled");
259                }
260                Arc::new(manager)
261            }
262            Err(e) => {
263                warn!(
264                    "Failed to initialize log manager, file logging disabled: {}",
265                    e
266                );
267                Arc::new(LogManager::disabled())
268            }
269        };
270
271        // Register audit reload hook to log configuration changes
272        {
273            use crate::reload::AuditReloadHook;
274            let audit_hook = AuditReloadHook::new(log_manager.clone());
275            config_manager.add_hook(Box::new(audit_hook)).await;
276            debug!("Registered audit reload hook");
277        }
278
279        // Start active health check runner in background
280        if health_check_runner.checker_count() > 0 {
281            let runner = health_check_runner.clone();
282            tokio::spawn(async move {
283                runner.run().await;
284            });
285            info!(
286                "Started active health checking for {} upstreams",
287                health_check_runner.checker_count()
288            );
289        }
290
291        // Initialize rate limit manager
292        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
293
294        // Initialize inference rate limit manager (for token-based LLM rate limiting)
295        let inference_rate_limit_manager =
296            Arc::new(Self::initialize_inference_rate_limiters(&config));
297
298        // Initialize warmth tracker for cold model detection
299        let warmth_tracker = Arc::new(crate::health::WarmthTracker::with_defaults());
300
301        // Initialize guardrail processor for semantic inspection
302        let guardrail_processor = Arc::new(crate::inference::GuardrailProcessor::new(
303            agent_manager.clone(),
304        ));
305
306        // Initialize geo filter manager
307        let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
308
309        // Start periodic cleanup task for rate limiters and geo caches
310        Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
311
312        // Start geo database file watcher for hot reload
313        Self::spawn_geo_database_watcher(geo_filter_manager.clone());
314
315        // Mark as ready
316        app_state.set_ready(true);
317
318        // Get trace ID format from config
319        let trace_id_format = config.server.trace_id_format;
320
321        // Initialize cache manager
322        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
323
324        // Initialize fallback metrics (best-effort, log warning if fails)
325        if let Err(e) = init_fallback_metrics() {
326            warn!("Failed to initialize fallback metrics: {}", e);
327        }
328
329        // Initialize model routing metrics (best-effort, log warning if fails)
330        if let Err(e) = init_model_routing_metrics() {
331            warn!("Failed to initialize model routing metrics: {}", e);
332        }
333
334        Ok(Self {
335            config_manager,
336            route_matcher,
337            scoped_route_matcher,
338            upstream_pools,
339            scoped_upstream_pools,
340            agent_manager,
341            passive_health,
342            metrics,
343            scoped_metrics,
344            app_state,
345            reload_coordinator,
346            error_handlers,
347            validators,
348            static_servers,
349            builtin_state,
350            log_manager,
351            trace_id_format,
352            health_check_runner,
353            rate_limit_manager,
354            cache_manager,
355            geo_filter_manager,
356            inference_rate_limit_manager,
357            warmth_tracker,
358            guardrail_processor,
359            // ACME challenge manager - initialized later if ACME is configured
360            acme_challenges: None,
361            acme_client: None,
362        })
363    }
364
365    /// Setup the configuration reload handler
366    async fn setup_reload_handler(
367        config_manager: Arc<ConfigManager>,
368        route_matcher: Arc<RwLock<RouteMatcher>>,
369        upstream_pools: Registry<UpstreamPool>,
370        scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
371        scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
372    ) {
373        let mut reload_rx = config_manager.subscribe();
374        let config_manager_clone = config_manager.clone();
375
376        tokio::spawn(async move {
377            while let Ok(event) = reload_rx.recv().await {
378                if let ReloadEvent::Applied { .. } = event {
379                    // Reload routes and upstreams
380                    let new_config = config_manager_clone.current();
381                    let flattened = new_config.flatten();
382
383                    // Update route matcher (sync parking_lot::RwLock)
384                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
385                        *route_matcher.write() = new_matcher;
386                        info!("Global routes reloaded successfully");
387                    }
388
389                    // Update scoped route matcher
390                    if let Err(e) = scoped_route_matcher
391                        .write()
392                        .await
393                        .load_from_flattened(&flattened)
394                        .await
395                    {
396                        error!("Failed to reload scoped routes: {}", e);
397                    } else {
398                        info!(
399                            "Scoped routes reloaded ({} scopes)",
400                            scoped_route_matcher.read().await.scope_count().await
401                        );
402                    }
403
404                    // Update global upstream pools
405                    let mut new_pools = HashMap::new();
406                    for (upstream_id, upstream_config) in &new_config.upstreams {
407                        let mut config_with_id = upstream_config.clone();
408                        config_with_id.id = upstream_id.clone();
409                        match UpstreamPool::new(config_with_id).await {
410                            Ok(pool) => {
411                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
412                            }
413                            Err(e) => {
414                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
415                            }
416                        }
417                    }
418
419                    // Gracefully swap global pools
420                    let old_pools = upstream_pools.replace(new_pools).await;
421
422                    // Update scoped upstream pools
423                    let new_scoped_pools = Self::build_scoped_pools_list(&flattened).await;
424                    let old_scoped_pools =
425                        scoped_upstream_pools.replace_all(new_scoped_pools).await;
426
427                    info!(
428                        "Scoped upstream pools reloaded ({} pools)",
429                        scoped_upstream_pools.len().await
430                    );
431
432                    // Shutdown old pools after delay
433                    tokio::spawn(async move {
434                        tokio::time::sleep(Duration::from_secs(60)).await;
435
436                        // Shutdown old global pools
437                        for (name, pool) in old_pools {
438                            info!("Shutting down old global pool: {}", name);
439                            pool.shutdown().await;
440                        }
441
442                        // Shutdown old scoped pools
443                        for (name, pool) in old_scoped_pools {
444                            info!("Shutting down old scoped pool: {}", name);
445                            pool.shutdown().await;
446                        }
447                    });
448                }
449            }
450        });
451    }
452
453    /// Create scoped upstream pools from flattened config
454    async fn create_scoped_upstream_pools(
455        flattened: &FlattenedConfig,
456        health_check_runner: &mut HealthCheckRunner,
457    ) -> Result<ScopedRegistry<UpstreamPool>> {
458        let registry = ScopedRegistry::new();
459
460        for (qid, upstream_config) in &flattened.upstreams {
461            let mut config_with_id = upstream_config.clone();
462            config_with_id.id = qid.canonical();
463
464            let pool = Arc::new(
465                UpstreamPool::new(config_with_id.clone())
466                    .await
467                    .with_context(|| {
468                        format!("Failed to create upstream pool '{}'", qid.canonical())
469                    })?,
470            );
471
472            // Track exports
473            let is_exported = flattened
474                .exported_upstreams
475                .contains_key(&upstream_config.id);
476
477            if is_exported {
478                registry.insert_exported(qid.clone(), pool).await;
479            } else {
480                registry.insert(qid.clone(), pool).await;
481            }
482
483            // Create active health checker if configured
484            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
485                health_check_runner.add_checker(checker);
486            }
487
488            debug!(
489                upstream_id = %qid.canonical(),
490                scope = ?qid.scope,
491                exported = is_exported,
492                "Created scoped upstream pool"
493            );
494        }
495
496        info!("Created {} scoped upstream pools", registry.len().await);
497
498        Ok(registry)
499    }
500
501    /// Build list of scoped pools for atomic replacement
502    async fn build_scoped_pools_list(
503        flattened: &FlattenedConfig,
504    ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
505        let mut result = Vec::new();
506
507        for (qid, upstream_config) in &flattened.upstreams {
508            let mut config_with_id = upstream_config.clone();
509            config_with_id.id = qid.canonical();
510
511            match UpstreamPool::new(config_with_id).await {
512                Ok(pool) => {
513                    let is_exported = flattened
514                        .exported_upstreams
515                        .contains_key(&upstream_config.id);
516                    result.push((qid.clone(), Arc::new(pool), is_exported));
517                }
518                Err(e) => {
519                    error!(
520                        "Failed to create scoped upstream pool {}: {}",
521                        qid.canonical(),
522                        e
523                    );
524                }
525            }
526        }
527
528        result
529    }
530
531    /// Initialize route-specific components (error handlers, validators, static servers)
532    async fn initialize_route_components(
533        config: &Config,
534    ) -> Result<(
535        Registry<ErrorHandler>,
536        Registry<SchemaValidator>,
537        Registry<StaticFileServer>,
538    )> {
539        let mut error_handlers_map = HashMap::new();
540        let mut validators_map = HashMap::new();
541        let mut static_servers_map = HashMap::new();
542
543        for route in &config.routes {
544            info!(
545                "Initializing components for route: {} with service type: {:?}",
546                route.id, route.service_type
547            );
548
549            // Initialize error handler for each route
550            if let Some(ref error_config) = route.error_pages {
551                let handler =
552                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
553                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
554                debug!("Initialized error handler for route: {}", route.id);
555            } else {
556                // Use default error handler for the service type
557                let handler = ErrorHandler::new(route.service_type.clone(), None);
558                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
559            }
560
561            // Initialize schema validator for API routes
562            if route.service_type == grapsus_config::ServiceType::Api {
563                if let Some(ref api_schema) = route.api_schema {
564                    match SchemaValidator::new(api_schema.clone()) {
565                        Ok(validator) => {
566                            validators_map.insert(route.id.clone(), Arc::new(validator));
567                            info!("Initialized schema validator for route: {}", route.id);
568                        }
569                        Err(e) => {
570                            warn!(
571                                "Failed to initialize schema validator for route {}: {}",
572                                route.id, e
573                            );
574                        }
575                    }
576                }
577            }
578
579            // Initialize static file server for static routes
580            if route.service_type == grapsus_config::ServiceType::Static {
581                if let Some(ref static_config) = route.static_files {
582                    let server = StaticFileServer::new(static_config.clone());
583                    static_servers_map.insert(route.id.clone(), Arc::new(server));
584                    info!("Initialized static file server for route: {}", route.id);
585                } else {
586                    warn!(
587                        "Static route {} has no static_files configuration",
588                        route.id
589                    );
590                }
591            }
592        }
593
594        Ok((
595            Registry::from_map(error_handlers_map),
596            Registry::from_map(validators_map),
597            Registry::from_map(static_servers_map),
598        ))
599    }
600
601    /// Get or generate trace ID from session
602    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
603        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
604    }
605
606    /// Initialize rate limiters from configuration
607    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
608        use grapsus_config::RateLimitAction;
609
610        // Create manager with global rate limit if configured
611        let manager = if let Some(ref global) = config.rate_limits.global {
612            info!(
613                max_rps = global.max_rps,
614                burst = global.burst,
615                key = ?global.key,
616                "Initializing global rate limiter"
617            );
618            RateLimitManager::with_global_limit(global.max_rps, global.burst)
619        } else {
620            RateLimitManager::new()
621        };
622
623        for route in &config.routes {
624            // Check for rate limit in route policies
625            if let Some(ref rate_limit) = route.policies.rate_limit {
626                let rl_config = RateLimitConfig {
627                    max_rps: rate_limit.requests_per_second,
628                    burst: rate_limit.burst,
629                    key: rate_limit.key.clone(),
630                    action: RateLimitAction::Reject,
631                    status_code: 429,
632                    message: None,
633                    backend: grapsus_config::RateLimitBackend::Local,
634                    max_delay_ms: 5000, // Default for policy-based rate limits
635                };
636                manager.register_route(&route.id, rl_config);
637                info!(
638                    route_id = %route.id,
639                    max_rps = rate_limit.requests_per_second,
640                    burst = rate_limit.burst,
641                    key = ?rate_limit.key,
642                    "Registered rate limiter for route"
643                );
644            }
645
646            // Also check for rate limit filters in the filter chain
647            for filter_id in &route.filters {
648                if let Some(filter_config) = config.filters.get(filter_id) {
649                    if let grapsus_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
650                    {
651                        let rl_config = RateLimitConfig {
652                            max_rps: rl_filter.max_rps,
653                            burst: rl_filter.burst,
654                            key: rl_filter.key.clone(),
655                            action: rl_filter.on_limit.clone(),
656                            status_code: rl_filter.status_code,
657                            message: rl_filter.limit_message.clone(),
658                            backend: rl_filter.backend.clone(),
659                            max_delay_ms: rl_filter.max_delay_ms,
660                        };
661                        manager.register_route(&route.id, rl_config);
662                        info!(
663                            route_id = %route.id,
664                            filter_id = %filter_id,
665                            max_rps = rl_filter.max_rps,
666                            backend = ?rl_filter.backend,
667                            "Registered rate limiter from filter for route"
668                        );
669                    }
670                }
671            }
672        }
673
674        if manager.route_count() > 0 {
675            info!(
676                route_count = manager.route_count(),
677                "Rate limiting initialized"
678            );
679        }
680
681        manager
682    }
683
684    /// Initialize inference rate limiters from configuration
685    ///
686    /// This creates token-based rate limiters for routes with `service-type "inference"`
687    /// and inference config blocks.
688    fn initialize_inference_rate_limiters(config: &Config) -> InferenceRateLimitManager {
689        let manager = InferenceRateLimitManager::new();
690
691        for route in &config.routes {
692            // Only initialize for inference service type routes with inference config
693            if route.service_type == grapsus_config::ServiceType::Inference {
694                if let Some(ref inference_config) = route.inference {
695                    manager.register_route(&route.id, inference_config);
696                }
697            }
698        }
699
700        if manager.route_count() > 0 {
701            info!(
702                route_count = manager.route_count(),
703                "Inference rate limiting initialized"
704            );
705        }
706
707        manager
708    }
709
710    /// Initialize cache manager from configuration
711    fn initialize_cache_manager(config: &Config) -> CacheManager {
712        let manager = CacheManager::new();
713
714        let mut enabled_count = 0;
715
716        for route in &config.routes {
717            // Use per-route cache config if present, otherwise fall back to service-type defaults
718            let cache_config = if let Some(ref rc) = route.policies.cache {
719                // Pre-compile exclude_paths glob patterns into regex at registration time
720                let exclude_paths = rc
721                    .exclude_paths
722                    .iter()
723                    .filter_map(|pattern| {
724                        let regex_str = crate::cache::compile_glob_to_regex(pattern);
725                        match regex::Regex::new(&regex_str) {
726                            Ok(re) => Some(re),
727                            Err(e) => {
728                                warn!(
729                                    route_id = %route.id,
730                                    pattern = %pattern,
731                                    error = %e,
732                                    "Failed to compile cache exclude-path pattern"
733                                );
734                                None
735                            }
736                        }
737                    })
738                    .collect();
739
740                CacheConfig {
741                    enabled: rc.enabled,
742                    default_ttl_secs: rc.default_ttl_secs,
743                    max_size_bytes: rc.max_size_bytes,
744                    cache_private: rc.cache_private,
745                    stale_while_revalidate_secs: rc.stale_while_revalidate_secs,
746                    stale_if_error_secs: rc.stale_if_error_secs,
747                    cacheable_methods: rc.cacheable_methods.clone(),
748                    cacheable_status_codes: rc.cacheable_status_codes.clone(),
749                    exclude_extensions: rc.exclude_extensions.clone(),
750                    exclude_paths,
751                }
752            } else {
753                match route.service_type {
754                    grapsus_config::ServiceType::Static => CacheConfig {
755                        enabled: true,
756                        default_ttl_secs: 3600,
757                        max_size_bytes: 50 * 1024 * 1024, // 50MB for static
758                        stale_while_revalidate_secs: 60,
759                        stale_if_error_secs: 300,
760                        ..Default::default()
761                    },
762                    grapsus_config::ServiceType::Api => CacheConfig {
763                        enabled: false,
764                        default_ttl_secs: 60,
765                        ..Default::default()
766                    },
767                    grapsus_config::ServiceType::Web => CacheConfig {
768                        enabled: false,
769                        default_ttl_secs: 300,
770                        ..Default::default()
771                    },
772                    _ => CacheConfig::default(),
773                }
774            };
775
776            if cache_config.enabled {
777                enabled_count += 1;
778                info!(
779                    route_id = %route.id,
780                    default_ttl_secs = cache_config.default_ttl_secs,
781                    from_config = route.policies.cache.is_some(),
782                    "HTTP caching enabled for route"
783                );
784            }
785            manager.register_route(&route.id, cache_config);
786        }
787
788        if enabled_count > 0 {
789            info!(enabled_routes = enabled_count, "HTTP caching initialized");
790        } else {
791            debug!("HTTP cache manager initialized (no routes with caching enabled)");
792        }
793
794        manager
795    }
796
797    /// Initialize geo filters from configuration
798    fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
799        let manager = GeoFilterManager::new();
800
801        for (filter_id, filter_config) in &config.filters {
802            if let grapsus_config::Filter::Geo(ref geo_filter) = filter_config.filter {
803                match manager.register_filter(filter_id, geo_filter.clone()) {
804                    Ok(_) => {
805                        info!(
806                            filter_id = %filter_id,
807                            database_path = %geo_filter.database_path,
808                            action = ?geo_filter.action,
809                            countries_count = geo_filter.countries.len(),
810                            "Registered geo filter"
811                        );
812                    }
813                    Err(e) => {
814                        error!(
815                            filter_id = %filter_id,
816                            error = %e,
817                            "Failed to register geo filter"
818                        );
819                    }
820                }
821            }
822        }
823
824        let filter_ids = manager.filter_ids();
825        if !filter_ids.is_empty() {
826            info!(
827                filter_count = filter_ids.len(),
828                filter_ids = ?filter_ids,
829                "GeoIP filtering initialized"
830            );
831        }
832
833        manager
834    }
835
836    /// Spawn background task to periodically clean up idle rate limiters and expired geo caches
837    fn spawn_cleanup_task(
838        rate_limit_manager: Arc<RateLimitManager>,
839        geo_filter_manager: Arc<GeoFilterManager>,
840    ) {
841        // Cleanup interval: 5 minutes
842        const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
843
844        tokio::spawn(async move {
845            let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
846            // First tick completes immediately; skip it
847            interval.tick().await;
848
849            loop {
850                interval.tick().await;
851
852                // Clean up rate limiters (removes entries when pool exceeds max size)
853                rate_limit_manager.cleanup();
854
855                // Clean up expired geo filter caches
856                geo_filter_manager.clear_expired_caches();
857
858                debug!("Periodic cleanup completed");
859            }
860        });
861
862        info!(
863            interval_secs = CLEANUP_INTERVAL.as_secs(),
864            "Started periodic cleanup task"
865        );
866    }
867
868    /// Spawn background task to watch geo database files for changes
869    fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
870        let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
871
872        // Try to start watching
873        match watcher.start_watching() {
874            Ok(mut rx) => {
875                let watcher_clone = watcher.clone();
876                tokio::spawn(async move {
877                    // Debounce interval
878                    const DEBOUNCE_MS: u64 = 500;
879
880                    while let Some(path) = rx.recv().await {
881                        // Debounce rapid changes (e.g., temp file then rename)
882                        tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
883
884                        // Drain any additional events for the same path during debounce
885                        while rx.try_recv().is_ok() {}
886
887                        // Handle the change
888                        watcher_clone.handle_change(&path);
889                    }
890                });
891
892                info!("Started geo database file watcher");
893            }
894            Err(e) => {
895                warn!(
896                    error = %e,
897                    "Failed to start geo database file watcher, auto-reload disabled"
898                );
899            }
900        }
901    }
902}