1mod context;
11mod handlers;
12mod http_trait;
13
14pub use context::RequestContext;
15
16use anyhow::{Context, Result};
17use pingora::http::ResponseHeader;
18use pingora::prelude::*;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24use uuid::Uuid;
25
26use sentinel_common::Registry;
27
28use crate::agents::AgentManager;
29use crate::app::AppState;
30use crate::builtin_handlers::BuiltinHandlerState;
31use crate::errors::ErrorHandler;
32use crate::health::PassiveHealthChecker;
33use crate::http_helpers;
34use crate::logging::{LogManager, SharedLogManager};
35use crate::reload::{
36 ConfigManager, GracefulReloadCoordinator, ReloadEvent, RouteValidator, UpstreamValidator,
37};
38use crate::routing::RouteMatcher;
39use crate::static_files::StaticFileServer;
40use crate::upstream::UpstreamPool;
41use crate::validation::SchemaValidator;
42
43use sentinel_common::TraceIdFormat;
44use sentinel_config::Config;
45
46pub struct SentinelProxy {
48 pub config_manager: Arc<ConfigManager>,
50 pub(super) route_matcher: Arc<RwLock<RouteMatcher>>,
52 pub(super) upstream_pools: Registry<UpstreamPool>,
54 pub(super) agent_manager: Arc<AgentManager>,
56 pub(super) passive_health: Arc<PassiveHealthChecker>,
58 pub(super) metrics: Arc<sentinel_common::observability::RequestMetrics>,
60 pub(super) app_state: Arc<AppState>,
62 pub(super) reload_coordinator: Arc<GracefulReloadCoordinator>,
64 pub(super) error_handlers: Registry<ErrorHandler>,
66 pub(super) validators: Registry<SchemaValidator>,
68 pub(super) static_servers: Registry<StaticFileServer>,
70 pub(super) builtin_state: Arc<BuiltinHandlerState>,
72 pub(super) log_manager: SharedLogManager,
74 pub(super) trace_id_format: TraceIdFormat,
76}
77
78impl SentinelProxy {
79 pub async fn new(config_path: Option<&str>) -> Result<Self> {
84 info!("Starting Sentinel Proxy");
85
86 let (config, effective_config_path) = match config_path {
88 Some(path) => {
89 let cfg = Config::from_file(path).context("Failed to load configuration file")?;
90 (cfg, path.to_string())
91 }
92 None => {
93 let cfg = Config::default_embedded()
94 .context("Failed to load embedded default configuration")?;
95 (cfg, "_embedded_".to_string())
97 }
98 };
99
100 config
101 .validate()
102 .context("Initial configuration validation failed")?;
103
104 let config_manager =
106 Arc::new(ConfigManager::new(&effective_config_path, config.clone()).await?);
107
108 config_manager.add_validator(Box::new(RouteValidator)).await;
110 config_manager
111 .add_validator(Box::new(UpstreamValidator))
112 .await;
113
114 let route_matcher = Arc::new(RwLock::new(RouteMatcher::new(config.routes.clone(), None)?));
116
117 let mut pools = HashMap::new();
119 for (upstream_id, upstream_config) in &config.upstreams {
120 let mut config_with_id = upstream_config.clone();
121 config_with_id.id = upstream_id.clone();
122 let pool = Arc::new(UpstreamPool::new(config_with_id).await?);
123 pools.insert(upstream_id.clone(), pool);
124 }
125 let upstream_pools = Registry::from_map(pools);
126
127 let passive_health = Arc::new(PassiveHealthChecker::new(
129 0.5, 100, None, ));
133
134 let agent_manager = Arc::new(AgentManager::new(config.agents.clone(), 1000).await?);
136 agent_manager.initialize().await?;
137
138 let metrics = Arc::new(sentinel_common::observability::RequestMetrics::new()?);
140
141 let app_state = Arc::new(AppState::new(Uuid::new_v4().to_string()));
143
144 let reload_coordinator = Arc::new(GracefulReloadCoordinator::new(
146 Duration::from_secs(30), ));
148
149 Self::setup_reload_handler(
151 config_manager.clone(),
152 route_matcher.clone(),
153 upstream_pools.clone(),
154 )
155 .await;
156
157 let (error_handlers, validators, static_servers) =
159 Self::initialize_route_components(&config).await?;
160
161 let builtin_state = Arc::new(BuiltinHandlerState::new(
163 env!("CARGO_PKG_VERSION").to_string(),
164 app_state.instance_id.clone(),
165 ));
166
167 let log_manager = match LogManager::new(&config.observability.logging) {
169 Ok(manager) => {
170 if manager.access_log_enabled() {
171 info!("Access logging enabled");
172 }
173 if manager.error_log_enabled() {
174 info!("Error logging enabled");
175 }
176 if manager.audit_log_enabled() {
177 info!("Audit logging enabled");
178 }
179 Arc::new(manager)
180 }
181 Err(e) => {
182 warn!(
183 "Failed to initialize log manager, file logging disabled: {}",
184 e
185 );
186 Arc::new(LogManager::disabled())
187 }
188 };
189
190 app_state.set_ready(true);
192
193 let trace_id_format = config.server.trace_id_format;
195
196 Ok(Self {
197 config_manager,
198 route_matcher,
199 upstream_pools,
200 agent_manager,
201 passive_health,
202 metrics,
203 app_state,
204 reload_coordinator,
205 error_handlers,
206 validators,
207 static_servers,
208 builtin_state,
209 log_manager,
210 trace_id_format,
211 })
212 }
213
214 async fn setup_reload_handler(
216 config_manager: Arc<ConfigManager>,
217 route_matcher: Arc<RwLock<RouteMatcher>>,
218 upstream_pools: Registry<UpstreamPool>,
219 ) {
220 let mut reload_rx = config_manager.subscribe();
221 let config_manager_clone = config_manager.clone();
222
223 tokio::spawn(async move {
224 while let Ok(event) = reload_rx.recv().await {
225 match event {
226 ReloadEvent::Applied { .. } => {
227 let new_config = config_manager_clone.current();
229
230 if let Ok(new_matcher) = RouteMatcher::new(new_config.routes.clone(), None)
232 {
233 *route_matcher.write().await = new_matcher;
234 info!("Routes reloaded successfully");
235 }
236
237 let mut new_pools = HashMap::new();
239 for (upstream_id, upstream_config) in &new_config.upstreams {
240 let mut config_with_id = upstream_config.clone();
241 config_with_id.id = upstream_id.clone();
242 match UpstreamPool::new(config_with_id).await {
243 Ok(pool) => {
244 new_pools.insert(upstream_id.clone(), Arc::new(pool));
245 }
246 Err(e) => {
247 error!(
248 "Failed to create upstream pool {}: {}",
249 upstream_id, e
250 );
251 }
252 }
253 }
254
255 let old_pools = upstream_pools.replace(new_pools).await;
257
258 tokio::spawn(async move {
260 tokio::time::sleep(Duration::from_secs(60)).await;
261 for (name, pool) in old_pools {
262 info!("Shutting down old pool: {}", name);
263 pool.shutdown().await;
264 }
265 });
266 }
267 _ => {}
268 }
269 }
270 });
271 }
272
273 async fn initialize_route_components(
275 config: &Config,
276 ) -> Result<(
277 Registry<ErrorHandler>,
278 Registry<SchemaValidator>,
279 Registry<StaticFileServer>,
280 )> {
281 let mut error_handlers_map = HashMap::new();
282 let mut validators_map = HashMap::new();
283 let mut static_servers_map = HashMap::new();
284
285 for route in &config.routes {
286 info!(
287 "Initializing components for route: {} with service type: {:?}",
288 route.id, route.service_type
289 );
290
291 if let Some(ref error_config) = route.error_pages {
293 let handler =
294 ErrorHandler::new(route.service_type.clone(), Some(error_config.clone()));
295 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
296 debug!("Initialized error handler for route: {}", route.id);
297 } else {
298 let handler = ErrorHandler::new(route.service_type.clone(), None);
300 error_handlers_map.insert(route.id.clone(), Arc::new(handler));
301 }
302
303 if route.service_type == sentinel_config::ServiceType::Api {
305 if let Some(ref api_schema) = route.api_schema {
306 match SchemaValidator::new(api_schema.clone()) {
307 Ok(validator) => {
308 validators_map.insert(route.id.clone(), Arc::new(validator));
309 info!("Initialized schema validator for route: {}", route.id);
310 }
311 Err(e) => {
312 warn!(
313 "Failed to initialize schema validator for route {}: {}",
314 route.id, e
315 );
316 }
317 }
318 }
319 }
320
321 if route.service_type == sentinel_config::ServiceType::Static {
323 if let Some(ref static_config) = route.static_files {
324 let server = StaticFileServer::new(static_config.clone());
325 static_servers_map.insert(route.id.clone(), Arc::new(server));
326 info!("Initialized static file server for route: {}", route.id);
327 } else {
328 warn!(
329 "Static route {} has no static_files configuration",
330 route.id
331 );
332 }
333 }
334 }
335
336 Ok((
337 Registry::from_map(error_handlers_map),
338 Registry::from_map(validators_map),
339 Registry::from_map(static_servers_map),
340 ))
341 }
342
343 pub(super) fn get_trace_id(&self, session: &pingora::proxy::Session) -> String {
345 http_helpers::get_or_create_trace_id(session, self.trace_id_format)
346 }
347
348 pub(super) fn apply_security_headers(
350 &self,
351 header: &mut ResponseHeader,
352 ) -> Result<(), Box<Error>> {
353 header.insert_header("X-Content-Type-Options", "nosniff")?;
354 header.insert_header("X-Frame-Options", "DENY")?;
355 header.insert_header("X-XSS-Protection", "1; mode=block")?;
356 header.insert_header("Referrer-Policy", "strict-origin-when-cross-origin")?;
357 header.remove_header("Server");
358 header.remove_header("X-Powered-By");
359 Ok(())
360 }
361}