sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use parking_lot::RwLock;
18use pingora::http::ResponseHeader;
19use pingora::prelude::*;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::Registry;
27
28use crate::agents::AgentManager;
29use crate::app::AppState;
30use crate::builtin_handlers::BuiltinHandlerState;
31use crate::cache::{CacheConfig, CacheManager};
32use crate::errors::ErrorHandler;
33use crate::health::PassiveHealthChecker;
34use crate::http_helpers;
35use crate::logging::{LogManager, SharedLogManager};
36use crate::rate_limit::{RateLimitConfig, RateLimitManager};
37use crate::reload::{
38    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
39};
40use crate::routing::RouteMatcher;
41use crate::static_files::StaticFileServer;
42use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
43use crate::validation::SchemaValidator;
44
45use sentinel_common::TraceIdFormat;
46use sentinel_config::Config;
47
48/// Main proxy service implementing Pingora's ProxyHttp trait
49pub struct SentinelProxy {
50    /// Configuration manager with hot reload
51    pub config_manager: Arc<ConfigManager>,
52    /// Route matcher
53    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
54    /// Upstream pools (keyed by upstream ID)
55    pub(super) upstream_pools: Registry<UpstreamPool>,
56    /// Agent manager for external processing
57    pub(super) agent_manager: Arc<AgentManager>,
58    /// Passive health checker
59    pub(super) passive_health: Arc<PassiveHealthChecker>,
60    /// Metrics collector
61    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
62    /// Application state
63    pub(super) app_state: Arc<AppState>,
64    /// Graceful reload coordinator
65    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
66    /// Error handlers per route (keyed by route ID)
67    pub(super) error_handlers: Registry<ErrorHandler>,
68    /// API schema validators per route (keyed by route ID)
69    pub(super) validators: Registry<SchemaValidator>,
70    /// Static file servers per route (keyed by route ID)
71    pub(super) static_servers: Registry<StaticFileServer>,
72    /// Builtin handler state
73    pub(super) builtin_state: Arc<BuiltinHandlerState>,
74    /// Log manager for file-based logging
75    pub(super) log_manager: SharedLogManager,
76    /// Trace ID format for request tracing
77    pub(super) trace_id_format: TraceIdFormat,
78    /// Active health check runner
79    pub(super) health_check_runner: Arc<HealthCheckRunner>,
80    /// Rate limit manager
81    pub(super) rate_limit_manager: Arc<RateLimitManager>,
82    /// HTTP cache manager
83    pub(super) cache_manager: Arc<CacheManager>,
84}
85
86impl SentinelProxy {
87    /// Create new proxy instance
88    ///
89    /// If config_path is None, uses the embedded default configuration.
90    /// Note: Tracing must be initialized by the caller before calling this function.
91    pub async fn new(config_path: Option<&str>) -> Result<Self> {
92        info!("Starting Sentinel Proxy");
93
94        // Load initial configuration
95        let (config, effective_config_path) = match config_path {
96            Some(path) => {
97                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
98                (cfg, path.to_string())
99            }
100            None => {
101                let cfg = Config::default_embedded()
102                    .context("Failed to load embedded default configuration")?;
103                // Use a sentinel path to indicate embedded config
104                (cfg, "_embedded_".to_string())
105            }
106        };
107
108        config
109            .validate()
110            .context("Initial configuration validation failed")?;
111
112        // Create configuration manager
113        let config_manager =
114            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
115
116        // Add validators
117        config_manager.add_validator(Box::new(RouteValidator)).await;
118        config_manager
119            .add_validator(Box::new(UpstreamValidator))
120            .await;
121
122        // Create route matcher
123        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
124
125        // Create upstream pools and active health checkers
126        let mut pools = HashMap::new();
127        let mut health_check_runner = HealthCheckRunner::new();
128
129        for (upstream_id, upstream_config) in &config.upstreams {
130            let mut config_with_id = upstream_config.clone();
131            config_with_id.id = upstream_id.clone();
132            let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
133            pools.insert(upstream_id.clone(), pool);
134
135            // Create active health checker if health check is configured
136            if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
137                health_check_runner.add_checker(checker);
138            }
139        }
140        let upstream_pools = Registry::from_map(pools);
141        let health_check_runner = Arc::new(health_check_runner);
142
143        // Create passive health checker
144        let passive_health = Arc::new(PassiveHealthChecker::new(
145            0.5,  // 50% failure rate threshold
146            100,  // Window size
147            None, // Will be linked to active health checkers
148        ));
149
150        // Create agent manager
151        let agent_manager = Arc::new(AgentManager::new(config.agents.clone(), 1000).await?);
152        agent_manager.initialize().await?;
153
154        // Create metrics collector
155        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
156
157        // Create application state
158        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
159
160        // Create reload coordinator
161        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
162            Duration::from_secs(30), // Max drain time
163        ));
164
165        // Setup configuration reload subscription
166        Self::setup_reload_handler(
167            config_manager.clone(),
168            route_matcher.clone(),
169            upstream_pools.clone(),
170        )
171        .await;
172
173        // Initialize service type components
174        let (error_handlers, validators, static_servers) =
175            Self::initialize_route_components(&config).await?;
176
177        // Create builtin handler state
178        let builtin_state = Arc::new(BuiltinHandlerState::new(
179            env!("CARGO_PKG_VERSION").to_string(),
180            app_state.instance_id.clone(),
181        ));
182
183        // Create log manager for file-based logging
184        let log_manager = match LogManager::new(&config.observability.logging) {
185            Ok(manager) => {
186                if manager.access_log_enabled() {
187                    info!("Access logging enabled");
188                }
189                if manager.error_log_enabled() {
190                    info!("Error logging enabled");
191                }
192                if manager.audit_log_enabled() {
193                    info!("Audit logging enabled");
194                }
195                Arc::new(manager)
196            }
197            Err(e) => {
198                warn!(
199                    "Failed to initialize log manager, file logging disabled: {}",
200                    e
201                );
202                Arc::new(LogManager::disabled())
203            }
204        };
205
206        // Register audit reload hook to log configuration changes
207        {
208            use crate::reload::AuditReloadHook;
209            let audit_hook = AuditReloadHook::new(log_manager.clone());
210            config_manager.add_hook(Box::new(audit_hook)).await;
211            debug!("Registered audit reload hook");
212        }
213
214        // Start active health check runner in background
215        if health_check_runner.checker_count() > 0 {
216            let runner = health_check_runner.clone();
217            tokio::spawn(async move {
218                runner.run().await;
219            });
220            info!(
221                "Started active health checking for {} upstreams",
222                health_check_runner.checker_count()
223            );
224        }
225
226        // Mark as ready
227        app_state.set_ready(true);
228
229        // Get trace ID format from config
230        let trace_id_format = config.server.trace_id_format;
231
232        // Initialize rate limit manager
233        let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
234
235        // Initialize cache manager
236        let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
237
238        Ok(Self {
239            config_manager,
240            route_matcher,
241            upstream_pools,
242            agent_manager,
243            passive_health,
244            metrics,
245            app_state,
246            reload_coordinator,
247            error_handlers,
248            validators,
249            static_servers,
250            builtin_state,
251            log_manager,
252            trace_id_format,
253            health_check_runner,
254            rate_limit_manager,
255            cache_manager,
256        })
257    }
258
259    /// Setup the configuration reload handler
260    async fn setup_reload_handler(
261        config_manager: Arc<ConfigManager>,
262        route_matcher: Arc<RwLock<RouteMatcher>>,
263        upstream_pools: Registry<UpstreamPool>,
264    ) {
265        let mut reload_rx = config_manager.subscribe();
266        let config_manager_clone = config_manager.clone();
267
268        tokio::spawn(async move {
269            while let Ok(event) = reload_rx.recv().await {
270                if let ReloadEvent::Applied { .. } = event {
271                    // Reload routes and upstreams
272                    let new_config = config_manager_clone.current();
273
274                    // Update route matcher (sync parking_lot::RwLock)
275                    if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
276                        *route_matcher.write() = new_matcher;
277                        info!("Routes reloaded successfully");
278                    }
279
280                    // Update upstream pools
281                    let mut new_pools = HashMap::new();
282                    for (upstream_id, upstream_config) in &new_config.upstreams {
283                        let mut config_with_id = upstream_config.clone();
284                        config_with_id.id = upstream_id.clone();
285                        match UpstreamPool::new(config_with_id).await {
286                            Ok(pool) => {
287                                new_pools.insert(upstream_id.clone(), Arc::new(pool));
288                            }
289                            Err(e) => {
290                                error!("Failed to create upstream pool {}: {}", upstream_id, e);
291                            }
292                        }
293                    }
294
295                    // Gracefully swap pools
296                    let old_pools = upstream_pools.replace(new_pools).await;
297
298                    // Shutdown old pools after delay
299                    tokio::spawn(async move {
300                        tokio::time::sleep(Duration::from_secs(60)).await;
301                        for (name, pool) in old_pools {
302                            info!("Shutting down old pool: {}", name);
303                            pool.shutdown().await;
304                        }
305                    });
306                }
307            }
308        });
309    }
310
311    /// Initialize route-specific components (error handlers, validators, static servers)
312    async fn initialize_route_components(
313        config: &Config,
314    ) -> Result<(
315        Registry<ErrorHandler>,
316        Registry<SchemaValidator>,
317        Registry<StaticFileServer>,
318    )> {
319        let mut error_handlers_map = HashMap::new();
320        let mut validators_map = HashMap::new();
321        let mut static_servers_map = HashMap::new();
322
323        for route in &config.routes {
324            info!(
325                "Initializing components for route: {} with service type: {:?}",
326                route.id, route.service_type
327            );
328
329            // Initialize error handler for each route
330            if let Some(ref error_config) = route.error_pages {
331                let handler =
332                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
333                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
334                debug!("Initialized error handler for route: {}", route.id);
335            } else {
336                // Use default error handler for the service type
337                let handler = ErrorHandler::new(route.service_type.clone(), None);
338                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
339            }
340
341            // Initialize schema validator for API routes
342            if route.service_type == sentinel_config::ServiceType::Api {
343                if let Some(ref api_schema) = route.api_schema {
344                    match SchemaValidator::new(api_schema.clone()) {
345                        Ok(validator) => {
346                            validators_map.insert(route.id.clone(), Arc::new(validator));
347                            info!("Initialized schema validator for route: {}", route.id);
348                        }
349                        Err(e) => {
350                            warn!(
351                                "Failed to initialize schema validator for route {}: {}",
352                                route.id, e
353                            );
354                        }
355                    }
356                }
357            }
358
359            // Initialize static file server for static routes
360            if route.service_type == sentinel_config::ServiceType::Static {
361                if let Some(ref static_config) = route.static_files {
362                    let server = StaticFileServer::new(static_config.clone());
363                    static_servers_map.insert(route.id.clone(), Arc::new(server));
364                    info!("Initialized static file server for route: {}", route.id);
365                } else {
366                    warn!(
367                        "Static route {} has no static_files configuration",
368                        route.id
369                    );
370                }
371            }
372        }
373
374        Ok((
375            Registry::from_map(error_handlers_map),
376            Registry::from_map(validators_map),
377            Registry::from_map(static_servers_map),
378        ))
379    }
380
381    /// Get or generate trace ID from session
382    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
383        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
384    }
385
386    /// Initialize rate limiters from configuration
387    fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
388        use sentinel_config::RateLimitAction;
389
390        let manager = RateLimitManager::new();
391
392        for route in &config.routes {
393            // Check for rate limit in route policies
394            if let Some(ref rate_limit) = route.policies.rate_limit {
395                let rl_config = RateLimitConfig {
396                    max_rps: rate_limit.requests_per_second,
397                    burst: rate_limit.burst,
398                    key: rate_limit.key.clone(),
399                    action: RateLimitAction::Reject,
400                    status_code: 429,
401                    message: None,
402                    backend: sentinel_config::RateLimitBackend::Local,
403                };
404                manager.register_route(&route.id, rl_config);
405                info!(
406                    route_id = %route.id,
407                    max_rps = rate_limit.requests_per_second,
408                    burst = rate_limit.burst,
409                    key = ?rate_limit.key,
410                    "Registered rate limiter for route"
411                );
412            }
413
414            // Also check for rate limit filters in the filter chain
415            for filter_id in &route.filters {
416                if let Some(filter_config) = config.filters.get(filter_id) {
417                    if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
418                    {
419                        let rl_config = RateLimitConfig {
420                            max_rps: rl_filter.max_rps,
421                            burst: rl_filter.burst,
422                            key: rl_filter.key.clone(),
423                            action: rl_filter.on_limit.clone(),
424                            status_code: rl_filter.status_code,
425                            message: rl_filter.limit_message.clone(),
426                            backend: rl_filter.backend.clone(),
427                        };
428                        manager.register_route(&route.id, rl_config);
429                        info!(
430                            route_id = %route.id,
431                            filter_id = %filter_id,
432                            max_rps = rl_filter.max_rps,
433                            backend = ?rl_filter.backend,
434                            "Registered rate limiter from filter for route"
435                        );
436                    }
437                }
438            }
439        }
440
441        if manager.route_count() > 0 {
442            info!(
443                route_count = manager.route_count(),
444                "Rate limiting initialized"
445            );
446        }
447
448        manager
449    }
450
451    /// Initialize cache manager from configuration
452    fn initialize_cache_manager(config: &Config) -> CacheManager {
453        let manager = CacheManager::new();
454
455        let mut enabled_count = 0;
456
457        for route in &config.routes {
458            // API routes: caching disabled by default (responses often dynamic)
459            if route.service_type == sentinel_config::ServiceType::Api {
460                let cache_config = CacheConfig {
461                    enabled: false, // Disabled until explicitly configured via KDL
462                    default_ttl_secs: 60,
463                    ..Default::default()
464                };
465                manager.register_route(&route.id, cache_config);
466            }
467
468            // Static routes: enable caching by default (assets are typically cacheable)
469            if route.service_type == sentinel_config::ServiceType::Static {
470                let cache_config = CacheConfig {
471                    enabled: true, // Enable by default for static routes
472                    default_ttl_secs: 3600,
473                    max_size_bytes: 50 * 1024 * 1024, // 50MB for static
474                    stale_while_revalidate_secs: 60,
475                    stale_if_error_secs: 300,
476                    ..Default::default()
477                };
478                manager.register_route(&route.id, cache_config);
479                enabled_count += 1;
480                info!(
481                    route_id = %route.id,
482                    default_ttl_secs = 3600,
483                    "HTTP caching enabled for static route"
484                );
485            }
486
487            // Web routes: disable by default (HTML often personalized)
488            if route.service_type == sentinel_config::ServiceType::Web {
489                let cache_config = CacheConfig {
490                    enabled: false, // Disabled until explicitly configured
491                    default_ttl_secs: 300,
492                    ..Default::default()
493                };
494                manager.register_route(&route.id, cache_config);
495            }
496        }
497
498        if enabled_count > 0 {
499            info!(enabled_routes = enabled_count, "HTTP caching initialized");
500        } else {
501            debug!("HTTP cache manager initialized (no routes with caching enabled)");
502        }
503
504        manager
505    }
506
507    /// Apply security headers to response
508    pub(super) fn apply_security_headers(
509        &self,
510        header: &mut ResponseHeader,
511    ) -> Result<(), Box<Error>> {
512        header.insert_header("X-Content-Type-Options", "nosniff")?;
513        header.insert_header("X-Frame-Options", "DENY")?;
514        header.insert_header("X-XSS-Protection", "1; mode=block")?;
515        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
516        header.remove_header("Server");
517        header.remove_header("X-Powered-By");
518        Ok(())
519    }
520}