1mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use parking_lot::RwLock;
18use pingora::http::ResponseHeader;
19use pingora::prelude::*;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::ids::{QualifiedId, Scope};
27use sentinel_common::{Registry, ScopedMetrics, ScopedRegistry};
28
29use crate::agents::AgentManager;
30use crate::app::AppState;
31use crate::builtin_handlers::BuiltinHandlerState;
32use crate::cache::{CacheConfig, CacheManager};
33use crate::errors::ErrorHandler;
34use crate::geo_filter::{GeoDatabaseWatcher, GeoFilterManager};
35use crate::health::PassiveHealthChecker;
36use crate::http_helpers;
37use crate::logging::{LogManager, SharedLogManager};
38use crate::rate_limit::{RateLimitConfig, RateLimitManager};
39use crate::reload::{
40 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
41};
42use crate::routing::RouteMatcher;
43use crate::scoped_routing::ScopedRouteMatcher;
44use crate::static_files::StaticFileServer;
45use crate::upstream::{ActiveHealthChecker, HealthCheckRunner, UpstreamPool};
46use crate::validation::SchemaValidator;
47
48use sentinel_common::TraceIdFormat;
49use sentinel_config::{Config, FlattenedConfig};
50
51pub struct SentinelProxy {
53 pub config_manager: Arc<ConfigManager>,
55 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
57 pub(super) scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
59 pub(super) upstream_pools: Registry<UpstreamPool>,
61 pub(super) scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
63 pub(super) agent_manager: Arc<AgentManager>,
65 pub(super) passive_health: Arc<PassiveHealthChecker>,
67 pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
69 pub(super) scoped_metrics: Arc<ScopedMetrics>,
71 pub(super) app_state: Arc<AppState>,
73 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
75 pub(super) error_handlers: Registry<ErrorHandler>,
77 pub(super) validators: Registry<SchemaValidator>,
79 pub(super) static_servers: Registry<StaticFileServer>,
81 pub(super) builtin_state: Arc<BuiltinHandlerState>,
83 pub(super) log_manager: SharedLogManager,
85 pub(super) trace_id_format: TraceIdFormat,
87 pub(super) health_check_runner: Arc<HealthCheckRunner>,
89 pub(super) rate_limit_manager: Arc<RateLimitManager>,
91 pub(super) cache_manager: Arc<CacheManager>,
93 pub(super) geo_filter_manager: Arc<GeoFilterManager>,
95}
96
97impl SentinelProxy {
98 pub async fn new(config_path: Option<&str>) -> Result<Self> {
103 info!("Starting Sentinel Proxy");
104
105 let (config, effective_config_path) = match config_path {
107 Some(path) => {
108 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
109 (cfg, path.to_string())
110 }
111 None => {
112 let cfg = Config::default_embedded()
113 .context("Failed to load embedded default configuration")?;
114 (cfg, "_embedded_".to_string())
116 }
117 };
118
119 config
120 .validate()
121 .context("Initial configuration validation failed")?;
122
123 if let Some(ref cache_config) = config.cache {
125 info!(
126 max_size_mb = cache_config.max_size_bytes / 1024 / 1024,
127 backend = ?cache_config.backend,
128 "Configuring HTTP cache storage"
129 );
130 crate::cache::configure_cache(cache_config.clone());
131 }
132
133 let config_manager =
135 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
136
137 config_manager.add_validator(Box::new(RouteValidator)).await;
139 config_manager
140 .add_validator(Box::new(UpstreamValidator))
141 .await;
142
143 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
145
146 let flattened = config.flatten();
148
149 let scoped_route_matcher = Arc::new(tokio::sync::RwLock::new(
151 ScopedRouteMatcher::from_flattened(&flattened)
152 .await
153 .context("Failed to create scoped route matcher")?,
154 ));
155
156 let mut pools = HashMap::new();
158 let mut health_check_runner = HealthCheckRunner::new();
159
160 for (upstream_id, upstream_config) in &config.upstreams {
161 let mut config_with_id = upstream_config.clone();
162 config_with_id.id = upstream_id.clone();
163 let pool = Arc::new(UpstreamPool::new(config_with_id.clone()).await?);
164 pools.insert(upstream_id.clone(), pool);
165
166 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
168 health_check_runner.add_checker(checker);
169 }
170 }
171 let upstream_pools = Registry::from_map(pools);
172
173 let scoped_upstream_pools =
175 Self::create_scoped_upstream_pools(&flattened, &mut health_check_runner).await?;
176
177 let health_check_runner = Arc::new(health_check_runner);
178
179 let passive_health = Arc::new(PassiveHealthChecker::new(
181 0.5, 100, None, ));
185
186 let agent_manager = Arc::new(AgentManager::new(config.agents.clone()).await?);
188 agent_manager.initialize().await?;
189
190 let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
192 let scoped_metrics = Arc::new(
193 ScopedMetrics::new().context("Failed to create scoped metrics collector")?,
194 );
195
196 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
198
199 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
201 Duration::from_secs(30), ));
203
204 Self::setup_reload_handler(
206 config_manager.clone(),
207 route_matcher.clone(),
208 upstream_pools.clone(),
209 scoped_route_matcher.clone(),
210 scoped_upstream_pools.clone(),
211 )
212 .await;
213
214 let (error_handlers, validators, static_servers) =
216 Self::initialize_route_components(&config).await?;
217
218 let builtin_state = Arc::new(BuiltinHandlerState::new(
220 env!("CARGO_PKG_VERSION").to_string(),
221 app_state.instance_id.clone(),
222 ));
223
224 let log_manager = match LogManager::new(&config.observability.logging) {
226 Ok(manager) => {
227 if manager.access_log_enabled() {
228 info!("Access logging enabled");
229 }
230 if manager.error_log_enabled() {
231 info!("Error logging enabled");
232 }
233 if manager.audit_log_enabled() {
234 info!("Audit logging enabled");
235 }
236 Arc::new(manager)
237 }
238 Err(e) => {
239 warn!(
240 "Failed to initialize log manager, file logging disabled: {}",
241 e
242 );
243 Arc::new(LogManager::disabled())
244 }
245 };
246
247 {
249 use crate::reload::AuditReloadHook;
250 let audit_hook = AuditReloadHook::new(log_manager.clone());
251 config_manager.add_hook(Box::new(audit_hook)).await;
252 debug!("Registered audit reload hook");
253 }
254
255 if health_check_runner.checker_count() > 0 {
257 let runner = health_check_runner.clone();
258 tokio::spawn(async move {
259 runner.run().await;
260 });
261 info!(
262 "Started active health checking for {} upstreams",
263 health_check_runner.checker_count()
264 );
265 }
266
267 let rate_limit_manager = Arc::new(Self::initialize_rate_limiters(&config));
269
270 let geo_filter_manager = Arc::new(Self::initialize_geo_filters(&config));
272
273 Self::spawn_cleanup_task(rate_limit_manager.clone(), geo_filter_manager.clone());
275
276 Self::spawn_geo_database_watcher(geo_filter_manager.clone());
278
279 app_state.set_ready(true);
281
282 let trace_id_format = config.server.trace_id_format;
284
285 let cache_manager = Arc::new(Self::initialize_cache_manager(&config));
287
288 Ok(Self {
289 config_manager,
290 route_matcher,
291 scoped_route_matcher,
292 upstream_pools,
293 scoped_upstream_pools,
294 agent_manager,
295 passive_health,
296 metrics,
297 scoped_metrics,
298 app_state,
299 reload_coordinator,
300 error_handlers,
301 validators,
302 static_servers,
303 builtin_state,
304 log_manager,
305 trace_id_format,
306 health_check_runner,
307 rate_limit_manager,
308 cache_manager,
309 geo_filter_manager,
310 })
311 }
312
313 async fn setup_reload_handler(
315 config_manager: Arc<ConfigManager>,
316 route_matcher: Arc<RwLock<RouteMatcher>>,
317 upstream_pools: Registry<UpstreamPool>,
318 scoped_route_matcher: Arc<tokio::sync::RwLock<ScopedRouteMatcher>>,
319 scoped_upstream_pools: ScopedRegistry<UpstreamPool>,
320 ) {
321 let mut reload_rx = config_manager.subscribe();
322 let config_manager_clone = config_manager.clone();
323
324 tokio::spawn(async move {
325 while let Ok(event) = reload_rx.recv().await {
326 if let ReloadEvent::Applied { .. } = event {
327 let new_config = config_manager_clone.current();
329 let flattened = new_config.flatten();
330
331 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None) {
333 *route_matcher.write() = new_matcher;
334 info!("Global routes reloaded successfully");
335 }
336
337 if let Err(e) = scoped_route_matcher
339 .write()
340 .await
341 .load_from_flattened(&flattened)
342 .await
343 {
344 error!("Failed to reload scoped routes: {}", e);
345 } else {
346 info!(
347 "Scoped routes reloaded ({} scopes)",
348 scoped_route_matcher.read().await.scope_count().await
349 );
350 }
351
352 let mut new_pools = HashMap::new();
354 for (upstream_id, upstream_config) in &new_config.upstreams {
355 let mut config_with_id = upstream_config.clone();
356 config_with_id.id = upstream_id.clone();
357 match UpstreamPool::new(config_with_id).await {
358 Ok(pool) => {
359 new_pools.insert(upstream_id.clone(), Arc::new(pool));
360 }
361 Err(e) => {
362 error!("Failed to create upstream pool {}: {}", upstream_id, e);
363 }
364 }
365 }
366
367 let old_pools = upstream_pools.replace(new_pools).await;
369
370 let new_scoped_pools =
372 Self::build_scoped_pools_list(&flattened).await;
373 let old_scoped_pools = scoped_upstream_pools.replace_all(new_scoped_pools).await;
374
375 info!(
376 "Scoped upstream pools reloaded ({} pools)",
377 scoped_upstream_pools.len().await
378 );
379
380 tokio::spawn(async move {
382 tokio::time::sleep(Duration::from_secs(60)).await;
383
384 for (name, pool) in old_pools {
386 info!("Shutting down old global pool: {}", name);
387 pool.shutdown().await;
388 }
389
390 for (name, pool) in old_scoped_pools {
392 info!("Shutting down old scoped pool: {}", name);
393 pool.shutdown().await;
394 }
395 });
396 }
397 }
398 });
399 }
400
401 async fn create_scoped_upstream_pools(
403 flattened: &FlattenedConfig,
404 health_check_runner: &mut HealthCheckRunner,
405 ) -> Result<ScopedRegistry<UpstreamPool>> {
406 let registry = ScopedRegistry::new();
407
408 for (qid, upstream_config) in &flattened.upstreams {
409 let mut config_with_id = upstream_config.clone();
410 config_with_id.id = qid.canonical();
411
412 let pool = Arc::new(
413 UpstreamPool::new(config_with_id.clone())
414 .await
415 .with_context(|| format!("Failed to create upstream pool '{}'", qid.canonical()))?,
416 );
417
418 let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
420
421 if is_exported {
422 registry.insert_exported(qid.clone(), pool).await;
423 } else {
424 registry.insert(qid.clone(), pool).await;
425 }
426
427 if let Some(checker) = ActiveHealthChecker::new(&config_with_id) {
429 health_check_runner.add_checker(checker);
430 }
431
432 debug!(
433 upstream_id = %qid.canonical(),
434 scope = ?qid.scope,
435 exported = is_exported,
436 "Created scoped upstream pool"
437 );
438 }
439
440 info!(
441 "Created {} scoped upstream pools",
442 registry.len().await
443 );
444
445 Ok(registry)
446 }
447
448 async fn build_scoped_pools_list(
450 flattened: &FlattenedConfig,
451 ) -> Vec<(QualifiedId, Arc<UpstreamPool>, bool)> {
452 let mut result = Vec::new();
453
454 for (qid, upstream_config) in &flattened.upstreams {
455 let mut config_with_id = upstream_config.clone();
456 config_with_id.id = qid.canonical();
457
458 match UpstreamPool::new(config_with_id).await {
459 Ok(pool) => {
460 let is_exported = flattened.exported_upstreams.contains_key(&upstream_config.id);
461 result.push((qid.clone(), Arc::new(pool), is_exported));
462 }
463 Err(e) => {
464 error!(
465 "Failed to create scoped upstream pool {}: {}",
466 qid.canonical(),
467 e
468 );
469 }
470 }
471 }
472
473 result
474 }
475
476 async fn initialize_route_components(
478 config: &Config,
479 ) -> Result<(
480 Registry<ErrorHandler>,
481 Registry<SchemaValidator>,
482 Registry<StaticFileServer>,
483 )> {
484 let mut error_handlers_map = HashMap::new();
485 let mut validators_map = HashMap::new();
486 let mut static_servers_map = HashMap::new();
487
488 for route in &config.routes {
489 info!(
490 "Initializing components for route: {} with service type: {:?}",
491 route.id, route.service_type
492 );
493
494 if let Some(ref error_config) = route.error_pages {
496 let handler =
497 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
498 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
499 debug!("Initialized error handler for route: {}", route.id);
500 } else {
501 let handler = ErrorHandler::new(route.service_type.clone(), None);
503 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
504 }
505
506 if route.service_type == sentinel_config::ServiceType::Api {
508 if let Some(ref api_schema) = route.api_schema {
509 match SchemaValidator::new(api_schema.clone()) {
510 Ok(validator) => {
511 validators_map.insert(route.id.clone(), Arc::new(validator));
512 info!("Initialized schema validator for route: {}", route.id);
513 }
514 Err(e) => {
515 warn!(
516 "Failed to initialize schema validator for route {}: {}",
517 route.id, e
518 );
519 }
520 }
521 }
522 }
523
524 if route.service_type == sentinel_config::ServiceType::Static {
526 if let Some(ref static_config) = route.static_files {
527 let server = StaticFileServer::new(static_config.clone());
528 static_servers_map.insert(route.id.clone(), Arc::new(server));
529 info!("Initialized static file server for route: {}", route.id);
530 } else {
531 warn!(
532 "Static route {} has no static_files configuration",
533 route.id
534 );
535 }
536 }
537 }
538
539 Ok((
540 Registry::from_map(error_handlers_map),
541 Registry::from_map(validators_map),
542 Registry::from_map(static_servers_map),
543 ))
544 }
545
546 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
548 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
549 }
550
551 fn initialize_rate_limiters(config: &Config) -> RateLimitManager {
553 use sentinel_config::RateLimitAction;
554
555 let manager = if let Some(ref global) = config.rate_limits.global {
557 info!(
558 max_rps = global.max_rps,
559 burst = global.burst,
560 key = ?global.key,
561 "Initializing global rate limiter"
562 );
563 RateLimitManager::with_global_limit(global.max_rps, global.burst)
564 } else {
565 RateLimitManager::new()
566 };
567
568 for route in &config.routes {
569 if let Some(ref rate_limit) = route.policies.rate_limit {
571 let rl_config = RateLimitConfig {
572 max_rps: rate_limit.requests_per_second,
573 burst: rate_limit.burst,
574 key: rate_limit.key.clone(),
575 action: RateLimitAction::Reject,
576 status_code: 429,
577 message: None,
578 backend: sentinel_config::RateLimitBackend::Local,
579 max_delay_ms: 5000, };
581 manager.register_route(&route.id, rl_config);
582 info!(
583 route_id = %route.id,
584 max_rps = rate_limit.requests_per_second,
585 burst = rate_limit.burst,
586 key = ?rate_limit.key,
587 "Registered rate limiter for route"
588 );
589 }
590
591 for filter_id in &route.filters {
593 if let Some(filter_config) = config.filters.get(filter_id) {
594 if let sentinel_config::Filter::RateLimit(ref rl_filter) = filter_config.filter
595 {
596 let rl_config = RateLimitConfig {
597 max_rps: rl_filter.max_rps,
598 burst: rl_filter.burst,
599 key: rl_filter.key.clone(),
600 action: rl_filter.on_limit.clone(),
601 status_code: rl_filter.status_code,
602 message: rl_filter.limit_message.clone(),
603 backend: rl_filter.backend.clone(),
604 max_delay_ms: rl_filter.max_delay_ms,
605 };
606 manager.register_route(&route.id, rl_config);
607 info!(
608 route_id = %route.id,
609 filter_id = %filter_id,
610 max_rps = rl_filter.max_rps,
611 backend = ?rl_filter.backend,
612 "Registered rate limiter from filter for route"
613 );
614 }
615 }
616 }
617 }
618
619 if manager.route_count() > 0 {
620 info!(
621 route_count = manager.route_count(),
622 "Rate limiting initialized"
623 );
624 }
625
626 manager
627 }
628
629 fn initialize_cache_manager(config: &Config) -> CacheManager {
631 let manager = CacheManager::new();
632
633 let mut enabled_count = 0;
634
635 for route in &config.routes {
636 if route.service_type == sentinel_config::ServiceType::Api {
638 let cache_config = CacheConfig {
639 enabled: false, default_ttl_secs: 60,
641 ..Default::default()
642 };
643 manager.register_route(&route.id, cache_config);
644 }
645
646 if route.service_type == sentinel_config::ServiceType::Static {
648 let cache_config = CacheConfig {
649 enabled: true, default_ttl_secs: 3600,
651 max_size_bytes: 50 * 1024 * 1024, stale_while_revalidate_secs: 60,
653 stale_if_error_secs: 300,
654 ..Default::default()
655 };
656 manager.register_route(&route.id, cache_config);
657 enabled_count += 1;
658 info!(
659 route_id = %route.id,
660 default_ttl_secs = 3600,
661 "HTTP caching enabled for static route"
662 );
663 }
664
665 if route.service_type == sentinel_config::ServiceType::Web {
667 let cache_config = CacheConfig {
668 enabled: false, default_ttl_secs: 300,
670 ..Default::default()
671 };
672 manager.register_route(&route.id, cache_config);
673 }
674 }
675
676 if enabled_count > 0 {
677 info!(enabled_routes = enabled_count, "HTTP caching initialized");
678 } else {
679 debug!("HTTP cache manager initialized (no routes with caching enabled)");
680 }
681
682 manager
683 }
684
685 fn initialize_geo_filters(config: &Config) -> GeoFilterManager {
687 let manager = GeoFilterManager::new();
688
689 for (filter_id, filter_config) in &config.filters {
690 if let sentinel_config::Filter::Geo(ref geo_filter) = filter_config.filter {
691 match manager.register_filter(filter_id, geo_filter.clone()) {
692 Ok(_) => {
693 info!(
694 filter_id = %filter_id,
695 database_path = %geo_filter.database_path,
696 action = ?geo_filter.action,
697 countries_count = geo_filter.countries.len(),
698 "Registered geo filter"
699 );
700 }
701 Err(e) => {
702 error!(
703 filter_id = %filter_id,
704 error = %e,
705 "Failed to register geo filter"
706 );
707 }
708 }
709 }
710 }
711
712 let filter_ids = manager.filter_ids();
713 if !filter_ids.is_empty() {
714 info!(
715 filter_count = filter_ids.len(),
716 filter_ids = ?filter_ids,
717 "GeoIP filtering initialized"
718 );
719 }
720
721 manager
722 }
723
724 pub(super) fn apply_security_headers(
726 &self,
727 header: &mut ResponseHeader,
728 ) -> Result<(), Box<Error>> {
729 header.insert_header("X-Content-Type-Options", "nosniff")?;
730 header.insert_header("X-Frame-Options", "DENY")?;
731 header.insert_header("X-XSS-Protection", "1; mode=block")?;
732 header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
733 header.remove_header("Server");
734 header.remove_header("X-Powered-By");
735 Ok(())
736 }
737
738 fn spawn_cleanup_task(
740 rate_limit_manager: Arc<RateLimitManager>,
741 geo_filter_manager: Arc<GeoFilterManager>,
742 ) {
743 const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
745
746 tokio::spawn(async move {
747 let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
748 interval.tick().await;
750
751 loop {
752 interval.tick().await;
753
754 rate_limit_manager.cleanup();
756
757 geo_filter_manager.clear_expired_caches();
759
760 debug!("Periodic cleanup completed");
761 }
762 });
763
764 info!(
765 interval_secs = CLEANUP_INTERVAL.as_secs(),
766 "Started periodic cleanup task"
767 );
768 }
769
770 fn spawn_geo_database_watcher(geo_filter_manager: Arc<GeoFilterManager>) {
772 let watcher = Arc::new(GeoDatabaseWatcher::new(geo_filter_manager));
773
774 match watcher.start_watching() {
776 Ok(mut rx) => {
777 let watcher_clone = watcher.clone();
778 tokio::spawn(async move {
779 const DEBOUNCE_MS: u64 = 500;
781
782 while let Some(path) = rx.recv().await {
783 tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)).await;
785
786 while rx.try_recv().is_ok() {}
788
789 watcher_clone.handle_change(&path);
791 }
792 });
793
794 info!("Started geo database file watcher");
795 }
796 Err(e) => {
797 warn!(
798 error = %e,
799 "Failed to start geo database file watcher, auto-reload disabled"
800 );
801 }
802 }
803 }
804}