1mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use parking_lot::RwLock;
18use pingora::http::ResponseHeader;
19use pingora::prelude::*;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::Registry;
27
28use crate::agents::AgentManager;
29use crate::app::AppState;
30use crate::builtin_handlers::BuiltinHandlerState;
31use crate::cache::{CacheConfig, CacheManager};
32use crate::errors::ErrorHandler;
33use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
34use crate::health::PassiveHealthChecker;
35use crate::http_helpers;
36use crate::logging::{LogManager, SharedLogManager};
37use crate::rate_limit::{RateLimitConfig, RateLimitManager};
38use crate::reload::{
39 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
40};
41use crate::routing::RouteMatcher;
42use crate::static_files::StaticFileServer;
43use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
44use crate::validation::SchemaValidator;
45
46use sentinel_common::TraceIdFormat;
47use sentinel_config::Config;
48
49pub struct SentinelProxy {
51 pub config_manager: Arc<ConfigManager>,
53 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
55 pub(super) upstream_pools: Registry<UpstreamPool>,
57 pub(super) agent_manager: Arc<AgentManager>,
59 pub(super) passive_health: Arc<PassiveHealthChecker>,
61 pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
63 pub(super) app_state: Arc<AppState>,
65 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
67 pub(super) error_handlers: Registry<ErrorHandler>,
69 pub(super) validators: Registry<SchemaValidator>,
71 pub(super) static_servers: Registry<StaticFileServer>,
73 pub(super) builtin_state: Arc<BuiltinHandlerState>,
75 pub(super) log_manager: SharedLogManager,
77 pub(super) trace_id_format: TraceIdFormat,
79 pub(super) health_check_runner: Arc<HealthCheckRunner>,
81 pub(super) rate_limit_manager: Arc<RateLimitManager>,
83 pub(super) cache_manager: Arc<CacheManager>,
85 pub(super) geo_filter_manager: Arc<GeoFilterManager>,
87}
88
89impl SentinelProxy {
90 pub async fn new(config_path: Option<&str>) -> Result<Self> {
95 info!("Starting Sentinel Proxy");
96
97 let (config, effective_config_path) = match config_path {
99 Some(path) => {
100 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
101 (cfg, path.to_string())
102 }
103 None => {
104 let cfg = Config::default_embedded()
105 .context("Failed to load embedded default configuration")?;
106 (cfg, "_embedded_".to_string())
108 }
109 };
110
111 config
112 .validate()
113 .context("Initial configuration validation failed")?;
114
115 if let Some(ref cache_config) = config.cache {
117 info!(
118 max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
119 backend = ?cache_config.backend,
120 "Configuring HTTP cache storage"
121 );
122 crate::cache::configure_cache(cache_config.clone());
123 }
124
125 let config_manager =
127 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
128
129 config_manager.add_validator(Box::new(RouteValidator)).await;
131 config_manager
132 .add_validator(Box::new(UpstreamValidator))
133 .await;
134
135 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
137
138 let mut pools = HashMap::new();
140 let mut health_check_runner = HealthCheckRunner::new();
141
142 for (upstream_id, upstream_config) in &config.upstreams {
143 let mut config_with_id = upstream_config.clone();
144 config_with_id.id = upstream_id.clone();
145 let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
146 pools.insert(upstream_id.clone(), pool);
147
148 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
150 health_check_runner.add_checker(checker);
151 }
152 }
153 let upstream_pools = Registry::from_map(pools);
154 let health_check_runner = Arc::new(health_check_runner);
155
156 let passive_health = Arc::new(PassiveHealthChecker::new(
158 0.5, 100, None, ));
162
163 let agent_manager = Arc::new(AgentManager::new(config.agents.clone(), 1000).await?);
165 agent_manager.initialize().await?;
166
167 let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
169
170 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
172
173 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
175 Duration::from_secs(30), ));
177
178 Self::setup_reload_handler(
180 config_manager.clone(),
181 route_matcher.clone(),
182 upstream_pools.clone(),
183 )
184 .await;
185
186 let (error_handlers, validators, static_servers) =
188 Self::initialize_route_components(&config).await?;
189
190 let builtin_state = Arc::new(BuiltinHandlerState::new(
192 env!("CARGO_PKG_VERSION").to_string(),
193 app_state.instance_id.clone(),
194 ));
195
196 let log_manager = match LogManager::new(&config.observability.logging) {
198 Ok(manager) => {
199 if manager.access_log_enabled() {
200 info!("Access logging enabled");
201 }
202 if manager.error_log_enabled() {
203 info!("Error logging enabled");
204 }
205 if manager.audit_log_enabled() {
206 info!("Audit logging enabled");
207 }
208 Arc::new(manager)
209 }
210 Err(e) => {
211 warn!(
212 "Failed to initialize log manager, file logging disabled: {}",
213 e
214 );
215 Arc::new(LogManager::disabled())
216 }
217 };
218
219 {
221 use crate::reload::AuditReloadHook;
222 let audit_hook = AuditReloadHook::new(log_manager.clone());
223 config_manager.add_hook(Box::new(audit_hook)).await;
224 debug!("Registered audit reload hook");
225 }
226
227 if health_check_runner.checker_count() > 0 {
229 let runner = health_check_runner.clone();
230 tokio::spawn(async move {
231 runner.run().await;
232 });
233 info!(
234 "Started active health checking for {} upstreams",
235 health_check_runner.checker_count()
236 );
237 }
238
239 let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
241
242 let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
244
245 Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
247
248 Self::spawn_geo_database_watcher(geo_filter_manager.clone());
250
251 app_state.set_ready(true);
253
254 let trace_id_format = config.server.trace_id_format;
256
257 let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
259
260 Ok(Self {
261 config_manager,
262 route_matcher,
263 upstream_pools,
264 agent_manager,
265 passive_health,
266 metrics,
267 app_state,
268 reload_coordinator,
269 error_handlers,
270 validators,
271 static_servers,
272 builtin_state,
273 log_manager,
274 trace_id_format,
275 health_check_runner,
276 rate_limit_manager,
277 cache_manager,
278 geo_filter_manager,
279 })
280 }
281
282 async fn setup_reload_handler(
284 config_manager: Arc<ConfigManager>,
285 route_matcher: Arc<RwLock<RouteMatcher>>,
286 upstream_pools: Registry<UpstreamPool>,
287 ) {
288 let mut reload_rx = config_manager.subscribe();
289 let config_manager_clone = config_manager.clone();
290
291 tokio::spawn(async move {
292 while let Ok(event) = reload_rx.recv().await {
293 if let ReloadEvent::Applied { .. } = event {
294 let new_config = config_manager_clone.current();
296
297 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
299 *route_matcher.write() = new_matcher;
300 info!("Routes reloaded successfully");
301 }
302
303 let mut new_pools = HashMap::new();
305 for (upstream_id, upstream_config) in &new_config.upstreams {
306 let mut config_with_id = upstream_config.clone();
307 config_with_id.id = upstream_id.clone();
308 match UpstreamPool::new(config_with_id).await {
309 Ok(pool) => {
310 new_pools.insert(upstream_id.clone(), Arc::new(pool));
311 }
312 Err(e) => {
313 error!("Failed to create upstream pool {}: {}", upstream_id, e);
314 }
315 }
316 }
317
318 let old_pools = upstream_pools.replace(new_pools).await;
320
321 tokio::spawn(async move {
323 tokio::time::sleep(Duration::from_secs(60)).await;
324 for (name, pool) in old_pools {
325 info!("Shutting down old pool: {}", name);
326 pool.shutdown().await;
327 }
328 });
329 }
330 }
331 });
332 }
333
334 async fn initialize_route_components(
336 config: &Config,
337 ) -> Result<(
338 Registry<ErrorHandler>,
339 Registry<SchemaValidator>,
340 Registry<StaticFileServer>,
341 )> {
342 let mut error_handlers_map = HashMap::new();
343 let mut validators_map = HashMap::new();
344 let mut static_servers_map = HashMap::new();
345
346 for route in &config.routes {
347 info!(
348 "Initializing components for route: {} with service type: {:?}",
349 route.id, route.service_type
350 );
351
352 if let Some(ref error_config) = route.error_pages {
354 let handler =
355 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
356 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
357 debug!("Initialized error handler for route: {}", route.id);
358 } else {
359 let handler = ErrorHandler::new(route.service_type.clone(), None);
361 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
362 }
363
364 if route.service_type == sentinel_config::ServiceType::Api {
366 if let Some(ref api_schema) = route.api_schema {
367 match SchemaValidator::new(api_schema.clone()) {
368 Ok(validator) => {
369 validators_map.insert(route.id.clone(), Arc::new(validator));
370 info!("Initialized schema validator for route: {}", route.id);
371 }
372 Err(e) => {
373 warn!(
374 "Failed to initialize schema validator for route {}: {}",
375 route.id, e
376 );
377 }
378 }
379 }
380 }
381
382 if route.service_type == sentinel_config::ServiceType::Static {
384 if let Some(ref static_config) = route.static_files {
385 let server = StaticFileServer::new(static_config.clone());
386 static_servers_map.insert(route.id.clone(), Arc::new(server));
387 info!("Initialized static file server for route: {}", route.id);
388 } else {
389 warn!(
390 "Static route {} has no static_files configuration",
391 route.id
392 );
393 }
394 }
395 }
396
397 Ok((
398 Registry::from_map(error_handlers_map),
399 Registry::from_map(validators_map),
400 Registry::from_map(static_servers_map),
401 ))
402 }
403
404 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
406 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
407 }
408
409 fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
411 use sentinel_config::RateLimitAction;
412
413 let manager = if let Some(ref global) = config.rate_limits.global {
415 info!(
416 max_rps = global.max_rps,
417 burst = global.burst,
418 key = ?global.key,
419 "Initializing global rate limiter"
420 );
421 RateLimitManager::with_global_limit(global.max_rps, global.burst)
422 } else {
423 RateLimitManager::new()
424 };
425
426 for route in &config.routes {
427 if let Some(ref rate_limit) = route.policies.rate_limit {
429 let rl_config = RateLimitConfig {
430 max_rps: rate_limit.requests_per_second,
431 burst: rate_limit.burst,
432 key: rate_limit.key.clone(),
433 action: RateLimitAction::Reject,
434 status_code: 429,
435 message: None,
436 backend: sentinel_config::RateLimitBackend::Local,
437 max_delay_ms: 5000, };
439 manager.register_route(&route.id, rl_config);
440 info!(
441 route_id = %route.id,
442 max_rps = rate_limit.requests_per_second,
443 burst = rate_limit.burst,
444 key = ?rate_limit.key,
445 "Registered rate limiter for route"
446 );
447 }
448
449 for filter_id in &route.filters {
451 if let Some(filter_config) = config.filters.get(filter_id) {
452 if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
453 {
454 let rl_config = RateLimitConfig {
455 max_rps: rl_filter.max_rps,
456 burst: rl_filter.burst,
457 key: rl_filter.key.clone(),
458 action: rl_filter.on_limit.clone(),
459 status_code: rl_filter.status_code,
460 message: rl_filter.limit_message.clone(),
461 backend: rl_filter.backend.clone(),
462 max_delay_ms: rl_filter.max_delay_ms,
463 };
464 manager.register_route(&route.id, rl_config);
465 info!(
466 route_id = %route.id,
467 filter_id = %filter_id,
468 max_rps = rl_filter.max_rps,
469 backend = ?rl_filter.backend,
470 "Registered rate limiter from filter for route"
471 );
472 }
473 }
474 }
475 }
476
477 if manager.route_count() > 0 {
478 info!(
479 route_count = manager.route_count(),
480 "Rate limiting initialized"
481 );
482 }
483
484 manager
485 }
486
487 fn initialize_cache_manager(config: &Config) -> CacheManager {
489 let manager = CacheManager::new();
490
491 let mut enabled_count = 0;
492
493 for route in &config.routes {
494 if route.service_type == sentinel_config::ServiceType::Api {
496 let cache_config = CacheConfig {
497 enabled: false, default_ttl_secs: 60,
499 ..Default::default()
500 };
501 manager.register_route(&route.id, cache_config);
502 }
503
504 if route.service_type == sentinel_config::ServiceType::Static {
506 let cache_config = CacheConfig {
507 enabled: true, default_ttl_secs: 3600,
509 max_size_bytes: 50 * 1024 * 1024, stale_while_revalidate_secs: 60,
511 stale_if_error_secs: 300,
512 ..Default::default()
513 };
514 manager.register_route(&route.id, cache_config);
515 enabled_count += 1;
516 info!(
517 route_id = %route.id,
518 default_ttl_secs = 3600,
519 "HTTP caching enabled for static route"
520 );
521 }
522
523 if route.service_type == sentinel_config::ServiceType::Web {
525 let cache_config = CacheConfig {
526 enabled: false, default_ttl_secs: 300,
528 ..Default::default()
529 };
530 manager.register_route(&route.id, cache_config);
531 }
532 }
533
534 if enabled_count > 0 {
535 info!(enabled_routes = enabled_count, "HTTP caching initialized");
536 } else {
537 debug!("HTTP cache manager initialized (no routes with caching enabled)");
538 }
539
540 manager
541 }
542
543 fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
545 let manager = GeoFilterManager::new();
546
547 for (filter_id, filter_config) in &config.filters {
548 if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
549 match manager.register_filter(filter_id, geo_filter.clone()) {
550 Ok(_) => {
551 info!(
552 filter_id = %filter_id,
553 database_path = %geo_filter.database_path,
554 action = ?geo_filter.action,
555 countries_count = geo_filter.countries.len(),
556 "Registered geo filter"
557 );
558 }
559 Err(e) => {
560 error!(
561 filter_id = %filter_id,
562 error = %e,
563 "Failed to register geo filter"
564 );
565 }
566 }
567 }
568 }
569
570 let filter_ids = manager.filter_ids();
571 if !filter_ids.is_empty() {
572 info!(
573 filter_count = filter_ids.len(),
574 filter_ids = ?filter_ids,
575 "GeoIP filtering initialized"
576 );
577 }
578
579 manager
580 }
581
582 pub(super) fn apply_security_headers(
584 &self,
585 header: &mut ResponseHeader,
586 ) -> Result<(), Box<Error>> {
587 header.insert_header("X-Content-Type-Options", "nosniff")?;
588 header.insert_header("X-Frame-Options", "DENY")?;
589 header.insert_header("X-XSS-Protection", "1; mode=block")?;
590 header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
591 header.remove_header("Server");
592 header.remove_header("X-Powered-By");
593 Ok(())
594 }
595
596 fn spawn_cleanup_task(
598 rate_limit_manager: Arc<RateLimitManager>,
599 geo_filter_manager: Arc<GeoFilterManager>,
600 ) {
601 const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
603
604 tokio::spawn(async move {
605 let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
606 interval.tick().await;
608
609 loop {
610 interval.tick().await;
611
612 rate_limit_manager.cleanup();
614
615 geo_filter_manager.clear_expired_caches();
617
618 debug!("Periodic cleanup completed");
619 }
620 });
621
622 info!(
623 interval_secs = CLEANUP_INTERVAL.as_secs(),
624 "Started periodic cleanup task"
625 );
626 }
627
628 fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
630 let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
631
632 match watcher.start_watching() {
634 Ok(mut rx) => {
635 let watcher_clone = watcher.clone();
636 tokio::spawn(async move {
637 const DEBOUNCE_MS: u64 = 500;
639
640 while let Some(path) = rx.recv().await {
641 tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
643
644 while rx.try_recv().is_ok() {}
646
647 watcher_clone.handle_change(&path);
649 }
650 });
651
652 info!("Started geo database file watcher");
653 }
654 Err(e) => {
655 warn!(
656 error = %e,
657 "Failed to start geo database file watcher, auto-reload disabled"
658 );
659 }
660 }
661 }
662}