sentinel_proxy/proxy/
mod.rs

1//! Sentinel Proxy Core Implementation
2//!
3//! This module contains the main SentinelProxy struct and its implementation,
4//! split across several submodules for maintainability:
5//!
6//! - `context`: Request context maintained throughout the request lifecycle
7//! - `handlers`: Helper methods for handling different route types
8//! - `http_trait`: ProxyHttp trait implementation for Pingora
9
10mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use pingora::http::ResponseHeader;
18use pingora::prelude::*;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::Registry;
27
28use crate::agents::AgentManager;
29use crate::app::AppState;
30use crate::builtin_handlers::BuiltinHandlerState;
31use crate::errors::ErrorHandler;
32use crate::health::PassiveHealthChecker;
33use crate::http_helpers;
34use crate::logging::{LogManager, SharedLogManager};
35use crate::reload::{
36    ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
37};
38use crate::routing::RouteMatcher;
39use crate::static_files::StaticFileServer;
40use crate::upstream::UpstreamPool;
41use crate::validation::SchemaValidator;
42
43use sentinel_common::TraceIdFormat;
44use sentinel_config::Config;
45
46/// Main proxy service implementing Pingora's ProxyHttp trait
47pub struct SentinelProxy {
48    /// Configuration manager with hot reload
49    pub config_manager: Arc<ConfigManager>,
50    /// Route matcher
51    pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
52    /// Upstream pools (keyed by upstream ID)
53    pub(super) upstream_pools: Registry<UpstreamPool>,
54    /// Agent manager for external processing
55    pub(super) agent_manager: Arc<AgentManager>,
56    /// Passive health checker
57    pub(super) passive_health: Arc<PassiveHealthChecker>,
58    /// Metrics collector
59    pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
60    /// Application state
61    pub(super) app_state: Arc<AppState>,
62    /// Graceful reload coordinator
63    pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
64    /// Error handlers per route (keyed by route ID)
65    pub(super) error_handlers: Registry<ErrorHandler>,
66    /// API schema validators per route (keyed by route ID)
67    pub(super) validators: Registry<SchemaValidator>,
68    /// Static file servers per route (keyed by route ID)
69    pub(super) static_servers: Registry<StaticFileServer>,
70    /// Builtin handler state
71    pub(super) builtin_state: Arc<BuiltinHandlerState>,
72    /// Log manager for file-based logging
73    pub(super) log_manager: SharedLogManager,
74    /// Trace ID format for request tracing
75    pub(super) trace_id_format: TraceIdFormat,
76}
77
78impl SentinelProxy {
79    /// Create new proxy instance
80    ///
81    /// If config_path is None, uses the embedded default configuration.
82    /// Note: Tracing must be initialized by the caller before calling this function.
83    pub async fn new(config_path: Option<&str>) -> Result<Self> {
84        info!("Starting Sentinel Proxy");
85
86        // Load initial configuration
87        let (config, effective_config_path) = match config_path {
88            Some(path) => {
89                let cfg = Config::from_file(path).context("Failed to load configuration file")?;
90                (cfg, path.to_string())
91            }
92            None => {
93                let cfg = Config::default_embedded()
94                    .context("Failed to load embedded default configuration")?;
95                // Use a sentinel path to indicate embedded config
96                (cfg, "_embedded_".to_string())
97            }
98        };
99
100        config
101            .validate()
102            .context("Initial configuration validation failed")?;
103
104        // Create configuration manager
105        let config_manager =
106            Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
107
108        // Add validators
109        config_manager.add_validator(Box::new(RouteValidator)).await;
110        config_manager
111            .add_validator(Box::new(UpstreamValidator))
112            .await;
113
114        // Create route matcher
115        let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
116
117        // Create upstream pools (skip for static routes as they don't need upstreams)
118        let mut pools = HashMap::new();
119        for (upstream_id, upstream_config) in &config.upstreams {
120            let mut config_with_id = upstream_config.clone();
121            config_with_id.id = upstream_id.clone();
122            let pool = Arc::new(UpstreamPool::new(config_with_id).await?);
123            pools.insert(upstream_id.clone(), pool);
124        }
125        let upstream_pools = Registry::from_map(pools);
126
127        // Create passive health checker
128        let passive_health = Arc::new(PassiveHealthChecker::new(
129            0.5,  // 50% failure rate threshold
130            100,  // Window size
131            None, // Will be linked to active health checkers
132        ));
133
134        // Create agent manager
135        let agent_manager = Arc::new(AgentManager::new(config.agents.clone(), 1000).await?);
136        agent_manager.initialize().await?;
137
138        // Create metrics collector
139        let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
140
141        // Create application state
142        let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
143
144        // Create reload coordinator
145        let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
146            Duration::from_secs(30), // Max drain time
147        ));
148
149        // Setup configuration reload subscription
150        Self::setup_reload_handler(
151            config_manager.clone(),
152            route_matcher.clone(),
153            upstream_pools.clone(),
154        )
155        .await;
156
157        // Initialize service type components
158        let (error_handlers, validators, static_servers) =
159            Self::initialize_route_components(&config).await?;
160
161        // Create builtin handler state
162        let builtin_state = Arc::new(BuiltinHandlerState::new(
163            env!("CARGO_PKG_VERSION").to_string(),
164            app_state.instance_id.clone(),
165        ));
166
167        // Create log manager for file-based logging
168        let log_manager = match LogManager::new(&config.observability.logging) {
169            Ok(manager) => {
170                if manager.access_log_enabled() {
171                    info!("Access logging enabled");
172                }
173                if manager.error_log_enabled() {
174                    info!("Error logging enabled");
175                }
176                if manager.audit_log_enabled() {
177                    info!("Audit logging enabled");
178                }
179                Arc::new(manager)
180            }
181            Err(e) => {
182                warn!(
183                    "Failed to initialize log manager, file logging disabled: {}",
184                    e
185                );
186                Arc::new(LogManager::disabled())
187            }
188        };
189
190        // Mark as ready
191        app_state.set_ready(true);
192
193        // Get trace ID format from config
194        let trace_id_format = config.server.trace_id_format;
195
196        Ok(Self {
197            config_manager,
198            route_matcher,
199            upstream_pools,
200            agent_manager,
201            passive_health,
202            metrics,
203            app_state,
204            reload_coordinator,
205            error_handlers,
206            validators,
207            static_servers,
208            builtin_state,
209            log_manager,
210            trace_id_format,
211        })
212    }
213
214    /// Setup the configuration reload handler
215    async fn setup_reload_handler(
216        config_manager: Arc<ConfigManager>,
217        route_matcher: Arc<RwLock<RouteMatcher>>,
218        upstream_pools: Registry<UpstreamPool>,
219    ) {
220        let mut reload_rx = config_manager.subscribe();
221        let config_manager_clone = config_manager.clone();
222
223        tokio::spawn(async move {
224            while let Ok(event) = reload_rx.recv().await {
225                match event {
226                    ReloadEvent::Applied { .. } => {
227                        // Reload routes and upstreams
228                        let new_config = config_manager_clone.current();
229
230                        // Update route matcher
231                        if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None)
232                        {
233                            *route_matcher.write().await = new_matcher;
234                            info!("Routes reloaded successfully");
235                        }
236
237                        // Update upstream pools
238                        let mut new_pools = HashMap::new();
239                        for (upstream_id, upstream_config) in &new_config.upstreams {
240                            let mut config_with_id = upstream_config.clone();
241                            config_with_id.id = upstream_id.clone();
242                            match UpstreamPool::new(config_with_id).await {
243                                Ok(pool) => {
244                                    new_pools.insert(upstream_id.clone(), Arc::new(pool));
245                                }
246                                Err(e) => {
247                                    error!(
248                                        "Failed to create upstream pool {}: {}",
249                                        upstream_id, e
250                                    );
251                                }
252                            }
253                        }
254
255                        // Gracefully swap pools
256                        let old_pools = upstream_pools.replace(new_pools).await;
257
258                        // Shutdown old pools after delay
259                        tokio::spawn(async move {
260                            tokio::time::sleep(Duration::from_secs(60)).await;
261                            for (name, pool) in old_pools {
262                                info!("Shutting down old pool: {}", name);
263                                pool.shutdown().await;
264                            }
265                        });
266                    }
267                    _ => {}
268                }
269            }
270        });
271    }
272
273    /// Initialize route-specific components (error handlers, validators, static servers)
274    async fn initialize_route_components(
275        config: &Config,
276    ) -> Result<(
277        Registry<ErrorHandler>,
278        Registry<SchemaValidator>,
279        Registry<StaticFileServer>,
280    )> {
281        let mut error_handlers_map = HashMap::new();
282        let mut validators_map = HashMap::new();
283        let mut static_servers_map = HashMap::new();
284
285        for route in &config.routes {
286            info!(
287                "Initializing components for route: {} with service type: {:?}",
288                route.id, route.service_type
289            );
290
291            // Initialize error handler for each route
292            if let Some(ref error_config) = route.error_pages {
293                let handler =
294                    ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
295                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
296                debug!("Initialized error handler for route: {}", route.id);
297            } else {
298                // Use default error handler for the service type
299                let handler = ErrorHandler::new(route.service_type.clone(), None);
300                error_handlers_map.insert(route.id.clone(), Arc::new(handler));
301            }
302
303            // Initialize schema validator for API routes
304            if route.service_type == sentinel_config::ServiceType::Api {
305                if let Some(ref api_schema) = route.api_schema {
306                    match SchemaValidator::new(api_schema.clone()) {
307                        Ok(validator) => {
308                            validators_map.insert(route.id.clone(), Arc::new(validator));
309                            info!("Initialized schema validator for route: {}", route.id);
310                        }
311                        Err(e) => {
312                            warn!(
313                                "Failed to initialize schema validator for route {}: {}",
314                                route.id, e
315                            );
316                        }
317                    }
318                }
319            }
320
321            // Initialize static file server for static routes
322            if route.service_type == sentinel_config::ServiceType::Static {
323                if let Some(ref static_config) = route.static_files {
324                    let server = StaticFileServer::new(static_config.clone());
325                    static_servers_map.insert(route.id.clone(), Arc::new(server));
326                    info!("Initialized static file server for route: {}", route.id);
327                } else {
328                    warn!(
329                        "Static route {} has no static_files configuration",
330                        route.id
331                    );
332                }
333            }
334        }
335
336        Ok((
337            Registry::from_map(error_handlers_map),
338            Registry::from_map(validators_map),
339            Registry::from_map(static_servers_map),
340        ))
341    }
342
343    /// Get or generate trace ID from session
344    pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
345        http_helpers::get_or_create_trace_id(session, self.trace_id_format)
346    }
347
348    /// Apply security headers to response
349    pub(super) fn apply_security_headers(
350        &self,
351        header: &mut ResponseHeader,
352    ) -> Result<(), Box<Error>> {
353        header.insert_header("X-Content-Type-Options", "nosniff")?;
354        header.insert_header("X-Frame-Options", "DENY")?;
355        header.insert_header("X-XSS-Protection", "1; mode=block")?;
356        header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
357        header.remove_header("Server");
358        header.remove_header("X-Powered-By");
359        Ok(())
360    }
361}