1mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use parking_lot::RwLock;
18use pingora::http::ResponseHeader;
19use pingora::prelude::*;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::Registry;
27
28use crate::agents::AgentManager;
29use crate::app::AppState;
30use crate::builtin_handlers::BuiltinHandlerState;
31use crate::cache::{CacheConfig, CacheManager};
32use crate::errors::ErrorHandler;
33use crate::health::PassiveHealthChecker;
34use crate::http_helpers;
35use crate::logging::{LogManager, SharedLogManager};
36use crate::rate_limit::{RateLimitConfig, RateLimitManager};
37use crate::reload::{
38 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
39};
40use crate::routing::RouteMatcher;
41use crate::static_files::StaticFileServer;
42use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
43use crate::validation::SchemaValidator;
44
45use sentinel_common::TraceIdFormat;
46use sentinel_config::Config;
47
48pub struct SentinelProxy {
50 pub config_manager: Arc<ConfigManager>,
52 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
54 pub(super) upstream_pools: Registry<UpstreamPool>,
56 pub(super) agent_manager: Arc<AgentManager>,
58 pub(super) passive_health: Arc<PassiveHealthChecker>,
60 pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
62 pub(super) app_state: Arc<AppState>,
64 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
66 pub(super) error_handlers: Registry<ErrorHandler>,
68 pub(super) validators: Registry<SchemaValidator>,
70 pub(super) static_servers: Registry<StaticFileServer>,
72 pub(super) builtin_state: Arc<BuiltinHandlerState>,
74 pub(super) log_manager: SharedLogManager,
76 pub(super) trace_id_format: TraceIdFormat,
78 pub(super) health_check_runner: Arc<HealthCheckRunner>,
80 pub(super) rate_limit_manager: Arc<RateLimitManager>,
82 pub(super) cache_manager: Arc<CacheManager>,
84}
85
86impl SentinelProxy {
87 pub async fn new(config_path: Option<&str>) -> Result<Self> {
92 info!("Starting Sentinel Proxy");
93
94 let (config, effective_config_path) = match config_path {
96 Some(path) => {
97 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
98 (cfg, path.to_string())
99 }
100 None => {
101 let cfg = Config::default_embedded()
102 .context("Failed to load embedded default configuration")?;
103 (cfg, "_embedded_".to_string())
105 }
106 };
107
108 config
109 .validate()
110 .context("Initial configuration validation failed")?;
111
112 let config_manager =
114 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
115
116 config_manager.add_validator(Box::new(RouteValidator)).await;
118 config_manager
119 .add_validator(Box::new(UpstreamValidator))
120 .await;
121
122 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
124
125 let mut pools = HashMap::new();
127 let mut health_check_runner = HealthCheckRunner::new();
128
129 for (upstream_id, upstream_config) in &config.upstreams {
130 let mut config_with_id = upstream_config.clone();
131 config_with_id.id = upstream_id.clone();
132 let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
133 pools.insert(upstream_id.clone(), pool);
134
135 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
137 health_check_runner.add_checker(checker);
138 }
139 }
140 let upstream_pools = Registry::from_map(pools);
141 let health_check_runner = Arc::new(health_check_runner);
142
143 let passive_health = Arc::new(PassiveHealthChecker::new(
145 0.5, 100, None, ));
149
150 let agent_manager = Arc::new(AgentManager::new(config.agents.clone(), 1000).await?);
152 agent_manager.initialize().await?;
153
154 let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
156
157 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
159
160 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
162 Duration::from_secs(30), ));
164
165 Self::setup_reload_handler(
167 config_manager.clone(),
168 route_matcher.clone(),
169 upstream_pools.clone(),
170 )
171 .await;
172
173 let (error_handlers, validators, static_servers) =
175 Self::initialize_route_components(&config).await?;
176
177 let builtin_state = Arc::new(BuiltinHandlerState::new(
179 env!("CARGO_PKG_VERSION").to_string(),
180 app_state.instance_id.clone(),
181 ));
182
183 let log_manager = match LogManager::new(&config.observability.logging) {
185 Ok(manager) => {
186 if manager.access_log_enabled() {
187 info!("Access logging enabled");
188 }
189 if manager.error_log_enabled() {
190 info!("Error logging enabled");
191 }
192 if manager.audit_log_enabled() {
193 info!("Audit logging enabled");
194 }
195 Arc::new(manager)
196 }
197 Err(e) => {
198 warn!(
199 "Failed to initialize log manager, file logging disabled: {}",
200 e
201 );
202 Arc::new(LogManager::disabled())
203 }
204 };
205
206 {
208 use crate::reload::AuditReloadHook;
209 let audit_hook = AuditReloadHook::new(log_manager.clone());
210 config_manager.add_hook(Box::new(audit_hook)).await;
211 debug!("Registered audit reload hook");
212 }
213
214 if health_check_runner.checker_count() > 0 {
216 let runner = health_check_runner.clone();
217 tokio::spawn(async move {
218 runner.run().await;
219 });
220 info!(
221 "Started active health checking for {} upstreams",
222 health_check_runner.checker_count()
223 );
224 }
225
226 app_state.set_ready(true);
228
229 let trace_id_format = config.server.trace_id_format;
231
232 let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
234
235 let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
237
238 Ok(Self {
239 config_manager,
240 route_matcher,
241 upstream_pools,
242 agent_manager,
243 passive_health,
244 metrics,
245 app_state,
246 reload_coordinator,
247 error_handlers,
248 validators,
249 static_servers,
250 builtin_state,
251 log_manager,
252 trace_id_format,
253 health_check_runner,
254 rate_limit_manager,
255 cache_manager,
256 })
257 }
258
259 async fn setup_reload_handler(
261 config_manager: Arc<ConfigManager>,
262 route_matcher: Arc<RwLock<RouteMatcher>>,
263 upstream_pools: Registry<UpstreamPool>,
264 ) {
265 let mut reload_rx = config_manager.subscribe();
266 let config_manager_clone = config_manager.clone();
267
268 tokio::spawn(async move {
269 while let Ok(event) = reload_rx.recv().await {
270 if let ReloadEvent::Applied { .. } = event {
271 let new_config = config_manager_clone.current();
273
274 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
276 *route_matcher.write() = new_matcher;
277 info!("Routes reloaded successfully");
278 }
279
280 let mut new_pools = HashMap::new();
282 for (upstream_id, upstream_config) in &new_config.upstreams {
283 let mut config_with_id = upstream_config.clone();
284 config_with_id.id = upstream_id.clone();
285 match UpstreamPool::new(config_with_id).await {
286 Ok(pool) => {
287 new_pools.insert(upstream_id.clone(), Arc::new(pool));
288 }
289 Err(e) => {
290 error!("Failed to create upstream pool {}: {}", upstream_id, e);
291 }
292 }
293 }
294
295 let old_pools = upstream_pools.replace(new_pools).await;
297
298 tokio::spawn(async move {
300 tokio::time::sleep(Duration::from_secs(60)).await;
301 for (name, pool) in old_pools {
302 info!("Shutting down old pool: {}", name);
303 pool.shutdown().await;
304 }
305 });
306 }
307 }
308 });
309 }
310
311 async fn initialize_route_components(
313 config: &Config,
314 ) -> Result<(
315 Registry<ErrorHandler>,
316 Registry<SchemaValidator>,
317 Registry<StaticFileServer>,
318 )> {
319 let mut error_handlers_map = HashMap::new();
320 let mut validators_map = HashMap::new();
321 let mut static_servers_map = HashMap::new();
322
323 for route in &config.routes {
324 info!(
325 "Initializing components for route: {} with service type: {:?}",
326 route.id, route.service_type
327 );
328
329 if let Some(ref error_config) = route.error_pages {
331 let handler =
332 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
333 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
334 debug!("Initialized error handler for route: {}", route.id);
335 } else {
336 let handler = ErrorHandler::new(route.service_type.clone(), None);
338 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
339 }
340
341 if route.service_type == sentinel_config::ServiceType::Api {
343 if let Some(ref api_schema) = route.api_schema {
344 match SchemaValidator::new(api_schema.clone()) {
345 Ok(validator) => {
346 validators_map.insert(route.id.clone(), Arc::new(validator));
347 info!("Initialized schema validator for route: {}", route.id);
348 }
349 Err(e) => {
350 warn!(
351 "Failed to initialize schema validator for route {}: {}",
352 route.id, e
353 );
354 }
355 }
356 }
357 }
358
359 if route.service_type == sentinel_config::ServiceType::Static {
361 if let Some(ref static_config) = route.static_files {
362 let server = StaticFileServer::new(static_config.clone());
363 static_servers_map.insert(route.id.clone(), Arc::new(server));
364 info!("Initialized static file server for route: {}", route.id);
365 } else {
366 warn!(
367 "Static route {} has no static_files configuration",
368 route.id
369 );
370 }
371 }
372 }
373
374 Ok((
375 Registry::from_map(error_handlers_map),
376 Registry::from_map(validators_map),
377 Registry::from_map(static_servers_map),
378 ))
379 }
380
381 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
383 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
384 }
385
386 fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
388 use sentinel_config::RateLimitAction;
389
390 let manager = RateLimitManager::new();
391
392 for route in &config.routes {
393 if let Some(ref rate_limit) = route.policies.rate_limit {
395 let rl_config = RateLimitConfig {
396 max_rps: rate_limit.requests_per_second,
397 burst: rate_limit.burst,
398 key: rate_limit.key.clone(),
399 action: RateLimitAction::Reject,
400 status_code: 429,
401 message: None,
402 backend: sentinel_config::RateLimitBackend::Local,
403 };
404 manager.register_route(&route.id, rl_config);
405 info!(
406 route_id = %route.id,
407 max_rps = rate_limit.requests_per_second,
408 burst = rate_limit.burst,
409 key = ?rate_limit.key,
410 "Registered rate limiter for route"
411 );
412 }
413
414 for filter_id in &route.filters {
416 if let Some(filter_config) = config.filters.get(filter_id) {
417 if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
418 {
419 let rl_config = RateLimitConfig {
420 max_rps: rl_filter.max_rps,
421 burst: rl_filter.burst,
422 key: rl_filter.key.clone(),
423 action: rl_filter.on_limit.clone(),
424 status_code: rl_filter.status_code,
425 message: rl_filter.limit_message.clone(),
426 backend: rl_filter.backend.clone(),
427 };
428 manager.register_route(&route.id, rl_config);
429 info!(
430 route_id = %route.id,
431 filter_id = %filter_id,
432 max_rps = rl_filter.max_rps,
433 backend = ?rl_filter.backend,
434 "Registered rate limiter from filter for route"
435 );
436 }
437 }
438 }
439 }
440
441 if manager.route_count() > 0 {
442 info!(
443 route_count = manager.route_count(),
444 "Rate limiting initialized"
445 );
446 }
447
448 manager
449 }
450
451 fn initialize_cache_manager(config: &Config) -> CacheManager {
453 let manager = CacheManager::new();
454
455 let mut enabled_count = 0;
456
457 for route in &config.routes {
458 if route.service_type == sentinel_config::ServiceType::Api {
460 let cache_config = CacheConfig {
461 enabled: false, default_ttl_secs: 60,
463 ..Default::default()
464 };
465 manager.register_route(&route.id, cache_config);
466 }
467
468 if route.service_type == sentinel_config::ServiceType::Static {
470 let cache_config = CacheConfig {
471 enabled: true, default_ttl_secs: 3600,
473 max_size_bytes: 50 * 1024 * 1024, stale_while_revalidate_secs: 60,
475 stale_if_error_secs: 300,
476 ..Default::default()
477 };
478 manager.register_route(&route.id, cache_config);
479 enabled_count += 1;
480 info!(
481 route_id = %route.id,
482 default_ttl_secs = 3600,
483 "HTTP caching enabled for static route"
484 );
485 }
486
487 if route.service_type == sentinel_config::ServiceType::Web {
489 let cache_config = CacheConfig {
490 enabled: false, default_ttl_secs: 300,
492 ..Default::default()
493 };
494 manager.register_route(&route.id, cache_config);
495 }
496 }
497
498 if enabled_count > 0 {
499 info!(enabled_routes = enabled_count, "HTTP caching initialized");
500 } else {
501 debug!("HTTP cache manager initialized (no routes with caching enabled)");
502 }
503
504 manager
505 }
506
507 pub(super) fn apply_security_headers(
509 &self,
510 header: &mut ResponseHeader,
511 ) -> Result<(), Box<Error>> {
512 header.insert_header("X-Content-Type-Options", "nosniff")?;
513 header.insert_header("X-Frame-Options", "DENY")?;
514 header.insert_header("X-XSS-Protection", "1; mode=block")?;
515 header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
516 header.remove_header("Server");
517 header.remove_header("X-Powered-By");
518 Ok(())
519 }
520}