sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use parking_lot::RwLock;
18use pingora::http::ResponseHeader;
19use pingora::prelude::*;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::Registry;
27
28use crate::agents::AgentManager;
29use crate::app::AppState;
30use crate::builtin_handlers::BuiltinHandlerState;
31use crate::cache::{CacheConfig, CacheManager};
32use crate::errors::ErrorHandler;
33use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
34use crate::health::PassiveHealthChecker;
35use crate::http_helpers;
36use crate::logging::{LogManager, SharedLogManager};
37use crate::rate_limit::{RateLimitConfig, RateLimitManager};
38use crate::reload::{
39    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
40};
41use crate::routing::RouteMatcher;
42use crate::static_files::StaticFileServer;
43use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
44use crate::validation::SchemaValidator;
45
46use sentinel_common::TraceIdFormat;
47use sentinel_config::Config;
48
49/// Main proxy service implementing Pingora's ProxyHttp trait
50pub struct SentinelProxy {
51    /// Configuration manager with hot reload
52    pub config_manager: Arc<ConfigManager>,
53    /// Route matcher
54    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
55    /// Upstream pools (keyed by upstream ID)
56    pub(super) upstream_pools: Registry<UpstreamPool>,
57    /// Agent manager for external processing
58    pub(super) agent_manager: Arc<AgentManager>,
59    /// Passive health checker
60    pub(super) passive_health: Arc<PassiveHealthChecker>,
61    /// Metrics collector
62    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
63    /// Application state
64    pub(super) app_state: Arc<AppState>,
65    /// Graceful reload coordinator
66    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
67    /// Error handlers per route (keyed by route ID)
68    pub(super) error_handlers: Registry<ErrorHandler>,
69    /// API schema validators per route (keyed by route ID)
70    pub(super) validators: Registry<SchemaValidator>,
71    /// Static file servers per route (keyed by route ID)
72    pub(super) static_servers: Registry<StaticFileServer>,
73    /// Builtin handler state
74    pub(super) builtin_state: Arc<BuiltinHandlerState>,
75    /// Log manager for file-based logging
76    pub(super) log_manager: SharedLogManager,
77    /// Trace ID format for request tracing
78    pub(super) trace_id_format: TraceIdFormat,
79    /// Active health check runner
80    pub(super) health_check_runner: Arc<HealthCheckRunner>,
81    /// Rate limit manager
82    pub(super) rate_limit_manager: Arc<RateLimitManager>,
83    /// HTTP cache manager
84    pub(super) cache_manager: Arc<CacheManager>,
85    /// GeoIP filter manager
86    pub(super) geo_filter_manager: Arc<GeoFilterManager>,
87}
88
89impl SentinelProxy {
90    /// Create new proxy instance
91    ///
92    /// If config_path is None, uses the embedded default configuration.
93    /// Note: Tracing must be initialized by the caller before calling this function.
94    pub async fn new(config_path: Option<&str>) -> Result<Self> {
95        info!("Starting Sentinel Proxy");
96
97        // Load initial configuration
98        let (config, effective_config_path) = match config_path {
99            Some(path) => {
100                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
101                (cfg, path.to_string())
102            }
103            None => {
104                let cfg = Config::default_embedded()
105                    .context("Failed to load embedded default configuration")?;
106                // Use a sentinel path to indicate embedded config
107                (cfg, "_embedded_".to_string())
108            }
109        };
110
111        config
112            .validate()
113            .context("Initial configuration validation failed")?;
114
115        // Configure global cache storage (must be done before cache is accessed)
116        if let Some(ref cache_config) = config.cache {
117            info!(
118                max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
119                backend = ?cache_config.backend,
120                "Configuring HTTP cache storage"
121            );
122            crate::cache::configure_cache(cache_config.clone());
123        }
124
125        // Create configuration manager
126        let config_manager =
127            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
128
129        // Add validators
130        config_manager.add_validator(Box::new(RouteValidator)).await;
131        config_manager
132            .add_validator(Box::new(UpstreamValidator))
133            .await;
134
135        // Create route matcher
136        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
137
138        // Create upstream pools and active health checkers
139        let mut pools = HashMap::new();
140        let mut health_check_runner = HealthCheckRunner::new();
141
142        for (upstream_id, upstream_config) in &config.upstreams {
143            let mut config_with_id = upstream_config.clone();
144            config_with_id.id = upstream_id.clone();
145            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
146            pools.insert(upstream_id.clone(), pool);
147
148            // Create active health checker if health check is configured
149            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
150                health_check_runner.add_checker(checker);
151            }
152        }
153        let upstream_pools = Registry::from_map(pools);
154        let health_check_runner = Arc::new(health_check_runner);
155
156        // Create passive health checker
157        let passive_health = Arc::new(PassiveHealthChecker::new(
158            0.5,  // 50% failure rate threshold
159            100,  // Window size
160            None, // Will be linked to active health checkers
161        ));
162
163        // Create agent manager
164        let agent_manager = Arc::new(AgentManager::new(config.agents.clone(), 1000).await?);
165        agent_manager.initialize().await?;
166
167        // Create metrics collector
168        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
169
170        // Create application state
171        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
172
173        // Create reload coordinator
174        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
175            Duration::from_secs(30), // Max drain time
176        ));
177
178        // Setup configuration reload subscription
179        Self::setup_reload_handler(
180            config_manager.clone(),
181            route_matcher.clone(),
182            upstream_pools.clone(),
183        )
184        .await;
185
186        // Initialize service type components
187        let (error_handlers, validators, static_servers) =
188            Self::initialize_route_components(&config).await?;
189
190        // Create builtin handler state
191        let builtin_state = Arc::new(BuiltinHandlerState::new(
192            env!("CARGO_PKG_VERSION").to_string(),
193            app_state.instance_id.clone(),
194        ));
195
196        // Create log manager for file-based logging
197        let log_manager = match LogManager::new(&config.observability.logging) {
198            Ok(manager) => {
199                if manager.access_log_enabled() {
200                    info!("Access logging enabled");
201                }
202                if manager.error_log_enabled() {
203                    info!("Error logging enabled");
204                }
205                if manager.audit_log_enabled() {
206                    info!("Audit logging enabled");
207                }
208                Arc::new(manager)
209            }
210            Err(e) => {
211                warn!(
212                    "Failed to initialize log manager, file logging disabled: {}",
213                    e
214                );
215                Arc::new(LogManager::disabled())
216            }
217        };
218
219        // Register audit reload hook to log configuration changes
220        {
221            use crate::reload::AuditReloadHook;
222            let audit_hook = AuditReloadHook::new(log_manager.clone());
223            config_manager.add_hook(Box::new(audit_hook)).await;
224            debug!("Registered audit reload hook");
225        }
226
227        // Start active health check runner in background
228        if health_check_runner.checker_count() > 0 {
229            let runner = health_check_runner.clone();
230            tokio::spawn(async move {
231                runner.run().await;
232            });
233            info!(
234                "Started active health checking for {} upstreams",
235                health_check_runner.checker_count()
236            );
237        }
238
239        // Initialize rate limit manager
240        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
241
242        // Initialize geo filter manager
243        let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
244
245        // Start periodic cleanup task for rate limiters and geo caches
246        Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
247
248        // Start geo database file watcher for hot reload
249        Self::spawn_geo_database_watcher(geo_filter_manager.clone());
250
251        // Mark as ready
252        app_state.set_ready(true);
253
254        // Get trace ID format from config
255        let trace_id_format = config.server.trace_id_format;
256
257        // Initialize cache manager
258        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
259
260        Ok(Self {
261            config_manager,
262            route_matcher,
263            upstream_pools,
264            agent_manager,
265            passive_health,
266            metrics,
267            app_state,
268            reload_coordinator,
269            error_handlers,
270            validators,
271            static_servers,
272            builtin_state,
273            log_manager,
274            trace_id_format,
275            health_check_runner,
276            rate_limit_manager,
277            cache_manager,
278            geo_filter_manager,
279        })
280    }
281
282    /// Setup the configuration reload handler
283    async fn setup_reload_handler(
284        config_manager: Arc<ConfigManager>,
285        route_matcher: Arc<RwLock<RouteMatcher>>,
286        upstream_pools: Registry<UpstreamPool>,
287    ) {
288        let mut reload_rx = config_manager.subscribe();
289        let config_manager_clone = config_manager.clone();
290
291        tokio::spawn(async move {
292            while let Ok(event) = reload_rx.recv().await {
293                if let ReloadEvent::Applied { .. } = event {
294                    // Reload routes and upstreams
295                    let new_config = config_manager_clone.current();
296
297                    // Update route matcher (sync parking_lot::RwLock)
298                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
299                        *route_matcher.write() = new_matcher;
300                        info!("Routes reloaded successfully");
301                    }
302
303                    // Update upstream pools
304                    let mut new_pools = HashMap::new();
305                    for (upstream_id, upstream_config) in &new_config.upstreams {
306                        let mut config_with_id = upstream_config.clone();
307                        config_with_id.id = upstream_id.clone();
308                        match UpstreamPool::new(config_with_id).await {
309                            Ok(pool) => {
310                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
311                            }
312                            Err(e) => {
313                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
314                            }
315                        }
316                    }
317
318                    // Gracefully swap pools
319                    let old_pools = upstream_pools.replace(new_pools).await;
320
321                    // Shutdown old pools after delay
322                    tokio::spawn(async move {
323                        tokio::time::sleep(Duration::from_secs(60)).await;
324                        for (name, pool) in old_pools {
325                            info!("Shutting down old pool: {}", name);
326                            pool.shutdown().await;
327                        }
328                    });
329                }
330            }
331        });
332    }
333
334    /// Initialize route-specific components (error handlers, validators, static servers)
335    async fn initialize_route_components(
336        config: &Config,
337    ) -> Result<(
338        Registry<ErrorHandler>,
339        Registry<SchemaValidator>,
340        Registry<StaticFileServer>,
341    )> {
342        let mut error_handlers_map = HashMap::new();
343        let mut validators_map = HashMap::new();
344        let mut static_servers_map = HashMap::new();
345
346        for route in &config.routes {
347            info!(
348                "Initializing components for route: {} with service type: {:?}",
349                route.id, route.service_type
350            );
351
352            // Initialize error handler for each route
353            if let Some(ref error_config) = route.error_pages {
354                let handler =
355                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
356                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
357                debug!("Initialized error handler for route: {}", route.id);
358            } else {
359                // Use default error handler for the service type
360                let handler = ErrorHandler::new(route.service_type.clone(), None);
361                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
362            }
363
364            // Initialize schema validator for API routes
365            if route.service_type == sentinel_config::ServiceType::Api {
366                if let Some(ref api_schema) = route.api_schema {
367                    match SchemaValidator::new(api_schema.clone()) {
368                        Ok(validator) => {
369                            validators_map.insert(route.id.clone(), Arc::new(validator));
370                            info!("Initialized schema validator for route: {}", route.id);
371                        }
372                        Err(e) => {
373                            warn!(
374                                "Failed to initialize schema validator for route {}: {}",
375                                route.id, e
376                            );
377                        }
378                    }
379                }
380            }
381
382            // Initialize static file server for static routes
383            if route.service_type == sentinel_config::ServiceType::Static {
384                if let Some(ref static_config) = route.static_files {
385                    let server = StaticFileServer::new(static_config.clone());
386                    static_servers_map.insert(route.id.clone(), Arc::new(server));
387                    info!("Initialized static file server for route: {}", route.id);
388                } else {
389                    warn!(
390                        "Static route {} has no static_files configuration",
391                        route.id
392                    );
393                }
394            }
395        }
396
397        Ok((
398            Registry::from_map(error_handlers_map),
399            Registry::from_map(validators_map),
400            Registry::from_map(static_servers_map),
401        ))
402    }
403
404    /// Get or generate trace ID from session
405    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
406        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
407    }
408
409    /// Initialize rate limiters from configuration
410    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
411        use sentinel_config::RateLimitAction;
412
413        // Create manager with global rate limit if configured
414        let manager = if let Some(ref global) = config.rate_limits.global {
415            info!(
416                max_rps = global.max_rps,
417                burst = global.burst,
418                key = ?global.key,
419                "Initializing global rate limiter"
420            );
421            RateLimitManager::with_global_limit(global.max_rps, global.burst)
422        } else {
423            RateLimitManager::new()
424        };
425
426        for route in &config.routes {
427            // Check for rate limit in route policies
428            if let Some(ref rate_limit) = route.policies.rate_limit {
429                let rl_config = RateLimitConfig {
430                    max_rps: rate_limit.requests_per_second,
431                    burst: rate_limit.burst,
432                    key: rate_limit.key.clone(),
433                    action: RateLimitAction::Reject,
434                    status_code: 429,
435                    message: None,
436                    backend: sentinel_config::RateLimitBackend::Local,
437                    max_delay_ms: 5000, // Default for policy-based rate limits
438                };
439                manager.register_route(&route.id, rl_config);
440                info!(
441                    route_id = %route.id,
442                    max_rps = rate_limit.requests_per_second,
443                    burst = rate_limit.burst,
444                    key = ?rate_limit.key,
445                    "Registered rate limiter for route"
446                );
447            }
448
449            // Also check for rate limit filters in the filter chain
450            for filter_id in &route.filters {
451                if let Some(filter_config) = config.filters.get(filter_id) {
452                    if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
453                    {
454                        let rl_config = RateLimitConfig {
455                            max_rps: rl_filter.max_rps,
456                            burst: rl_filter.burst,
457                            key: rl_filter.key.clone(),
458                            action: rl_filter.on_limit.clone(),
459                            status_code: rl_filter.status_code,
460                            message: rl_filter.limit_message.clone(),
461                            backend: rl_filter.backend.clone(),
462                            max_delay_ms: rl_filter.max_delay_ms,
463                        };
464                        manager.register_route(&route.id, rl_config);
465                        info!(
466                            route_id = %route.id,
467                            filter_id = %filter_id,
468                            max_rps = rl_filter.max_rps,
469                            backend = ?rl_filter.backend,
470                            "Registered rate limiter from filter for route"
471                        );
472                    }
473                }
474            }
475        }
476
477        if manager.route_count() > 0 {
478            info!(
479                route_count = manager.route_count(),
480                "Rate limiting initialized"
481            );
482        }
483
484        manager
485    }
486
487    /// Initialize cache manager from configuration
488    fn initialize_cache_manager(config: &Config) -> CacheManager {
489        let manager = CacheManager::new();
490
491        let mut enabled_count = 0;
492
493        for route in &config.routes {
494            // API routes: caching disabled by default (responses often dynamic)
495            if route.service_type == sentinel_config::ServiceType::Api {
496                let cache_config = CacheConfig {
497                    enabled: false, // Disabled until explicitly configured via KDL
498                    default_ttl_secs: 60,
499                    ..Default::default()
500                };
501                manager.register_route(&route.id, cache_config);
502            }
503
504            // Static routes: enable caching by default (assets are typically cacheable)
505            if route.service_type == sentinel_config::ServiceType::Static {
506                let cache_config = CacheConfig {
507                    enabled: true, // Enable by default for static routes
508                    default_ttl_secs: 3600,
509                    max_size_bytes: 50 * 1024 * 1024, // 50MB for static
510                    stale_while_revalidate_secs: 60,
511                    stale_if_error_secs: 300,
512                    ..Default::default()
513                };
514                manager.register_route(&route.id, cache_config);
515                enabled_count += 1;
516                info!(
517                    route_id = %route.id,
518                    default_ttl_secs = 3600,
519                    "HTTP caching enabled for static route"
520                );
521            }
522
523            // Web routes: disable by default (HTML often personalized)
524            if route.service_type == sentinel_config::ServiceType::Web {
525                let cache_config = CacheConfig {
526                    enabled: false, // Disabled until explicitly configured
527                    default_ttl_secs: 300,
528                    ..Default::default()
529                };
530                manager.register_route(&route.id, cache_config);
531            }
532        }
533
534        if enabled_count > 0 {
535            info!(enabled_routes = enabled_count, "HTTP caching initialized");
536        } else {
537            debug!("HTTP cache manager initialized (no routes with caching enabled)");
538        }
539
540        manager
541    }
542
543    /// Initialize geo filters from configuration
544    fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
545        let manager = GeoFilterManager::new();
546
547        for (filter_id, filter_config) in &config.filters {
548            if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
549                match manager.register_filter(filter_id, geo_filter.clone()) {
550                    Ok(_) => {
551                        info!(
552                            filter_id = %filter_id,
553                            database_path = %geo_filter.database_path,
554                            action = ?geo_filter.action,
555                            countries_count = geo_filter.countries.len(),
556                            "Registered geo filter"
557                        );
558                    }
559                    Err(e) => {
560                        error!(
561                            filter_id = %filter_id,
562                            error = %e,
563                            "Failed to register geo filter"
564                        );
565                    }
566                }
567            }
568        }
569
570        let filter_ids = manager.filter_ids();
571        if !filter_ids.is_empty() {
572            info!(
573                filter_count = filter_ids.len(),
574                filter_ids = ?filter_ids,
575                "GeoIP filtering initialized"
576            );
577        }
578
579        manager
580    }
581
582    /// Apply security headers to response
583    pub(super) fn apply_security_headers(
584        &self,
585        header: &mut ResponseHeader,
586    ) -> Result<(), Box<Error>> {
587        header.insert_header("X-Content-Type-Options", "nosniff")?;
588        header.insert_header("X-Frame-Options", "DENY")?;
589        header.insert_header("X-XSS-Protection", "1; mode=block")?;
590        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
591        header.remove_header("Server");
592        header.remove_header("X-Powered-By");
593        Ok(())
594    }
595
596    /// Spawn background task to periodically clean up idle rate limiters and expired geo caches
597    fn spawn_cleanup_task(
598        rate_limit_manager: Arc<RateLimitManager>,
599        geo_filter_manager: Arc<GeoFilterManager>,
600    ) {
601        // Cleanup interval: 5 minutes
602        const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
603
604        tokio::spawn(async move {
605            let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
606            // First tick completes immediately; skip it
607            interval.tick().await;
608
609            loop {
610                interval.tick().await;
611
612                // Clean up rate limiters (removes entries when pool exceeds max size)
613                rate_limit_manager.cleanup();
614
615                // Clean up expired geo filter caches
616                geo_filter_manager.clear_expired_caches();
617
618                debug!("Periodic cleanup completed");
619            }
620        });
621
622        info!(
623            interval_secs = CLEANUP_INTERVAL.as_secs(),
624            "Started periodic cleanup task"
625        );
626    }
627
628    /// Spawn background task to watch geo database files for changes
629    fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
630        let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
631
632        // Try to start watching
633        match watcher.start_watching() {
634            Ok(mut rx) => {
635                let watcher_clone = watcher.clone();
636                tokio::spawn(async move {
637                    // Debounce interval
638                    const DEBOUNCE_MS: u64 = 500;
639
640                    while let Some(path) = rx.recv().await {
641                        // Debounce rapid changes (e.g., temp file then rename)
642                        tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
643
644                        // Drain any additional events for the same path during debounce
645                        while rx.try_recv().is_ok() {}
646
647                        // Handle the change
648                        watcher_clone.handle_change(&path);
649                    }
650                });
651
652                info!("Started geo database file watcher");
653            }
654            Err(e) => {
655                warn!(
656                    error = %e,
657                    "Failed to start geo database file watcher, auto-reload disabled"
658                );
659            }
660        }
661    }
662}