1use std::collections::{HashMap, HashSet, VecDeque};
15use std::net::{IpAddr, SocketAddr};
16use std::num::NonZeroU32;
17use std::path::PathBuf;
18use std::process::Command;
19use std::sync::Arc;
20use std::time::Instant;
21
22use crate::intelligence::{SignalCategory, SignalQueryOptions};
23use crate::signals::adapter::SignalAdapter;
24use governor::{state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};
25use once_cell::sync::Lazy;
26use parking_lot::RwLock;
27use sysinfo::{Disks, Networks, System};
28
29type ProfilesGetter = Box<dyn Fn() -> Vec<crate::profiler::EndpointProfile> + Send + Sync>;
32type SchemasGetter = Box<dyn Fn() -> Vec<crate::profiler::JsonEndpointSchema> + Send + Sync>;
34
35#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
37pub struct IntegrationsConfig {
38 pub horizon_hub_url: String,
39 pub horizon_api_key: String,
40 pub tunnel_url: String,
41 pub tunnel_api_key: String,
42 pub apparatus_url: String,
43}
44
45type IntegrationsGetter = Box<dyn Fn() -> IntegrationsConfig + Send + Sync>;
47type IntegrationsSetter = Box<dyn Fn(IntegrationsConfig) + Send + Sync>;
48
49#[derive(Debug, Clone, serde::Serialize)]
51pub struct EvaluationResult {
52 pub blocked: bool,
53 pub risk_score: u16,
54 pub matched_rules: Vec<u32>,
55 pub block_reason: Option<String>,
56 pub detection_time_us: u64,
57}
58
59type EvaluateCallback = Box<
61 dyn Fn(&str, &str, &[(String, String)], Option<&[u8]>, &str) -> EvaluationResult + Send + Sync,
62>;
63
64static PROFILES_GETTER: Lazy<RwLock<Option<ProfilesGetter>>> = Lazy::new(|| RwLock::new(None));
66
67static SCHEMAS_GETTER: Lazy<RwLock<Option<SchemasGetter>>> = Lazy::new(|| RwLock::new(None));
69
70static INTEGRATIONS_GETTER: Lazy<RwLock<Option<IntegrationsGetter>>> =
72 Lazy::new(|| RwLock::new(None));
73static INTEGRATIONS_SETTER: Lazy<RwLock<Option<IntegrationsSetter>>> =
74 Lazy::new(|| RwLock::new(None));
75
76static EVALUATE_CALLBACK: Lazy<RwLock<Option<EvaluateCallback>>> = Lazy::new(|| RwLock::new(None));
78
79pub fn register_profiles_getter<F>(getter: F)
82where
83 F: Fn() -> Vec<crate::profiler::EndpointProfile> + Send + Sync + 'static,
84{
85 *PROFILES_GETTER.write() = Some(Box::new(getter));
86}
87
88pub fn register_schemas_getter<F>(getter: F)
91where
92 F: Fn() -> Vec<crate::profiler::JsonEndpointSchema> + Send + Sync + 'static,
93{
94 *SCHEMAS_GETTER.write() = Some(Box::new(getter));
95}
96
97pub fn register_integrations_callbacks<G, S>(getter: G, setter: S)
100where
101 G: Fn() -> IntegrationsConfig + Send + Sync + 'static,
102 S: Fn(IntegrationsConfig) + Send + Sync + 'static,
103{
104 *INTEGRATIONS_GETTER.write() = Some(Box::new(getter));
105 *INTEGRATIONS_SETTER.write() = Some(Box::new(setter));
106}
107
108pub fn register_evaluate_callback<F>(callback: F)
111where
112 F: Fn(&str, &str, &[(String, String)], Option<&[u8]>, &str) -> EvaluationResult
113 + Send
114 + Sync
115 + 'static,
116{
117 *EVALUATE_CALLBACK.write() = Some(Box::new(callback));
118}
119
120#[allow(dead_code)]
122fn run_evaluate(
123 method: &str,
124 uri: &str,
125 headers: &[(String, String)],
126 body: Option<&[u8]>,
127 client_ip: &str,
128) -> Option<EvaluationResult> {
129 EVALUATE_CALLBACK
130 .read()
131 .as_ref()
132 .map(|callback| callback(method, uri, headers, body, client_ip))
133}
134
135fn get_profiles() -> Vec<crate::profiler::EndpointProfile> {
137 PROFILES_GETTER
138 .read()
139 .as_ref()
140 .map(|getter| getter())
141 .unwrap_or_default()
142}
143
144fn get_schemas() -> Vec<crate::profiler::JsonEndpointSchema> {
146 SCHEMAS_GETTER
147 .read()
148 .as_ref()
149 .map(|getter| getter())
150 .unwrap_or_default()
151}
152
153#[derive(Clone, serde::Serialize)]
155struct MetricsPoint {
156 timestamp: String,
157 cpu: f32,
158 memory: f64,
159}
160
161static METRICS_HISTORY: Lazy<RwLock<VecDeque<MetricsPoint>>> =
163 Lazy::new(|| RwLock::new(VecDeque::with_capacity(60)));
164
165static ADMIN_CONSOLE_TEMPLATE: &str = include_str!("../assets/admin_console.html");
166const ADMIN_CONSOLE_CSP: &str = "default-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; img-src 'self' data:; font-src 'self' https://fonts.gstatic.com data:; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; script-src 'self' 'unsafe-inline'; connect-src 'self'";
167
168static DEV_MODE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
170
171static DEMO_MODE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
173
174pub fn enable_dev_mode() {
176 DEV_MODE.store(true, std::sync::atomic::Ordering::SeqCst);
177 tracing::warn!("================================================");
178 tracing::warn!("!! DEV MODE ENABLED - AUTHENTICATION BYPASSED !!");
179 tracing::warn!("!! DO NOT USE IN PRODUCTION !!");
180 tracing::warn!("================================================");
181 tracing::info!("Dev mode enabled - admin console will be served from disk");
182}
183
184pub fn is_dev_mode() -> bool {
186 DEV_MODE.load(std::sync::atomic::Ordering::SeqCst)
187}
188
189pub fn enable_demo_mode() {
191 DEMO_MODE.store(true, std::sync::atomic::Ordering::SeqCst);
192 tracing::info!("Demo mode enabled - using pre-populated sample data");
193}
194
195pub fn is_demo_mode() -> bool {
197 DEMO_MODE.load(std::sync::atomic::Ordering::SeqCst)
198}
199
200fn demo_sites() -> serde_json::Value {
202 serde_json::json!({
203 "success": true,
204 "data": {
205 "sites": [
206 {
207 "hostname": "api.acme-corp.com",
208 "upstreams": ["10.0.1.10:8080", "10.0.1.11:8080"],
209 "waf": { "enabled": true, "threshold": 70 },
210 "rate_limit": { "requests_per_second": 1000 },
211 "tls": true
212 },
213 {
214 "hostname": "shop.acme-corp.com",
215 "upstreams": ["10.0.2.10:3000"],
216 "waf": { "enabled": true, "threshold": 60 },
217 "rate_limit": { "requests_per_second": 500 },
218 "tls": true
219 },
220 {
221 "hostname": "admin.acme-corp.com",
222 "upstreams": ["10.0.3.10:8443"],
223 "waf": { "enabled": true, "threshold": 50 },
224 "rate_limit": { "requests_per_second": 100 },
225 "tls": true
226 },
227 {
228 "hostname": "legacy.acme-corp.com",
229 "upstreams": ["192.168.1.50:80"],
230 "waf": { "enabled": false, "threshold": 70 },
231 "rate_limit": { "requests_per_second": 200 },
232 "tls": false
233 }
234 ]
235 }
236 })
237}
238
239fn demo_entities() -> serde_json::Value {
241 serde_json::json!({
242 "entities": [
243 { "id": "actor_8f3a2b1c", "ip": "185.220.101.42", "risk_score": 92, "request_count": 1847, "last_seen": "2026-02-02T05:45:00Z", "tags": ["tor_exit", "credential_stuffing"] },
244 { "id": "actor_7e2d4a9f", "ip": "45.155.205.233", "risk_score": 88, "request_count": 523, "last_seen": "2026-02-02T05:42:00Z", "tags": ["scanner", "sqli_attempts"] },
245 { "id": "actor_6c1b3e8d", "ip": "194.26.29.102", "risk_score": 85, "request_count": 2341, "last_seen": "2026-02-02T05:40:00Z", "tags": ["botnet", "distributed"] },
246 { "id": "actor_5a0c2f7e", "ip": "91.240.118.50", "risk_score": 78, "request_count": 892, "last_seen": "2026-02-02T05:38:00Z", "tags": ["scraper", "rate_limited"] },
247 { "id": "actor_4d9e1a6b", "ip": "23.129.64.142", "risk_score": 75, "request_count": 156, "last_seen": "2026-02-02T05:35:00Z", "tags": ["tor_exit"] },
248 { "id": "actor_3c8f0b5a", "ip": "104.244.76.13", "risk_score": 65, "request_count": 3201, "last_seen": "2026-02-02T05:44:00Z", "tags": ["aggressive_crawler"] },
249 { "id": "actor_2b7e9c4d", "ip": "167.99.182.44", "risk_score": 45, "request_count": 89, "last_seen": "2026-02-02T05:30:00Z", "tags": ["suspicious_ua"] },
250 { "id": "actor_1a6d8b3c", "ip": "203.0.113.50", "risk_score": 25, "request_count": 12, "last_seen": "2026-02-02T05:20:00Z", "tags": [] }
251 ]
252 })
253}
254
255fn demo_campaigns() -> serde_json::Value {
257 serde_json::json!({
258 "campaigns": [
259 {
260 "id": "campaign_001",
261 "name": "Credential Stuffing Wave",
262 "type": "credential_stuffing",
263 "status": "active",
264 "actor_count": 47,
265 "request_count": 28450,
266 "first_seen": "2026-02-01T14:30:00Z",
267 "last_seen": "2026-02-02T05:45:00Z",
268 "target_endpoints": ["/api/login", "/api/auth", "/oauth/token"],
269 "severity": "critical"
270 },
271 {
272 "id": "campaign_002",
273 "name": "API Enumeration Scan",
274 "type": "reconnaissance",
275 "status": "active",
276 "actor_count": 12,
277 "request_count": 5230,
278 "first_seen": "2026-02-02T02:15:00Z",
279 "last_seen": "2026-02-02T05:40:00Z",
280 "target_endpoints": ["/api/v1/*", "/api/v2/*", "/graphql"],
281 "severity": "high"
282 },
283 {
284 "id": "campaign_003",
285 "name": "SQLi Probe Campaign",
286 "type": "injection",
287 "status": "mitigated",
288 "actor_count": 8,
289 "request_count": 1840,
290 "first_seen": "2026-02-01T22:00:00Z",
291 "last_seen": "2026-02-02T01:30:00Z",
292 "target_endpoints": ["/api/search", "/api/products"],
293 "severity": "critical"
294 }
295 ]
296 })
297}
298
299fn demo_status() -> serde_json::Value {
301 serde_json::json!({
302 "sensorId": "synapse-demo",
303 "status": "running",
304 "mode": "proxy",
305 "uptime": 86420,
306 "requestRate": 2847.5,
307 "blockRate": 3.2,
308 "fallbackRate": 0.1,
309 "proxy": { "type": "pingora", "version": "0.1.0" },
310 "waf": {
311 "enabled": true,
312 "analyzed": 2458920,
313 "blocked": 78654
314 },
315 "dlp": {
316 "enabled": true,
317 "healthy": true,
318 "patternCount": 25,
319 "totalScans": 1245000,
320 "totalMatches": 342
321 }
322 })
323}
324
325fn record_metrics_sample() {
327 let mut sys = System::new_all();
328 sys.refresh_all();
329
330 let cpu = sys.global_cpu_usage();
331 let total_mem = sys.total_memory() as f64;
332 let used_mem = sys.used_memory() as f64;
333 let memory = if total_mem > 0.0 {
334 (used_mem / total_mem) * 100.0
335 } else {
336 0.0
337 };
338
339 let point = MetricsPoint {
340 timestamp: chrono::Utc::now().to_rfc3339(),
341 cpu,
342 memory,
343 };
344
345 let mut history = METRICS_HISTORY.write();
346 if history.len() >= 60 {
347 history.pop_front();
348 }
349 history.push_back(point);
350}
351
352#[derive(Clone, serde::Serialize)]
354struct LogEntry {
355 id: String,
356 timestamp: String,
357 level: String,
358 source: String, message: String,
360}
361
362#[derive(Clone, Copy, PartialEq, Eq)]
364pub enum LogSource {
365 Http,
366 Waf,
367 System,
368 Access,
369}
370
371impl LogSource {
372 fn as_str(&self) -> &'static str {
373 match self {
374 LogSource::Http => "http",
375 LogSource::Waf => "waf",
376 LogSource::System => "system",
377 LogSource::Access => "access",
378 }
379 }
380}
381
382static LOG_BUFFER: Lazy<RwLock<VecDeque<LogEntry>>> =
384 Lazy::new(|| RwLock::new(VecDeque::with_capacity(200)));
385
386pub fn record_log_with_source(level: &str, source: LogSource, message: String) {
388 let stream_source = match source {
389 LogSource::Http => "access",
390 LogSource::Waf => "waf",
391 LogSource::System => "system",
392 LogSource::Access => "access",
393 };
394 crate::tunnel::publish_internal_log(level, stream_source, message.clone());
395
396 let entry = LogEntry {
397 id: format!("{}", fastrand::u64(..)),
398 timestamp: chrono::Utc::now().to_rfc3339(),
399 level: level.to_string(),
400 source: source.as_str().to_string(),
401 message,
402 };
403
404 let mut logs = LOG_BUFFER.write();
405 if logs.len() >= 500 {
406 logs.pop_front();
407 }
408 logs.push_back(entry);
409}
410
411pub fn record_log(level: &str, message: String) {
413 record_log_with_source(level, LogSource::System, message);
414}
415
416use axum::{
417 body::Body,
418 extract::ws::{Message, WebSocket, WebSocketUpgrade},
419 extract::{Path, Query, Request, State},
420 http::{header, HeaderValue, Method, StatusCode},
421 middleware::{self, Next},
422 response::{Html, IntoResponse, Response},
423 routing::{delete, get, post, put},
424 Json, Router,
425};
426use futures_util::{SinkExt, StreamExt};
427use percent_encoding::percent_decode_str;
428use serde::{Deserialize, Serialize};
429use subtle::ConstantTimeEq;
430use tokio::sync::mpsc;
431use tower_http::cors::{Any, CorsLayer};
432use tracing::{info, warn};
433
434#[cfg(test)]
435use std::cell::Cell;
436
437use crate::api::{ApiHandler, ApiResponse};
438use crate::waf::{TraceEvent, TraceSink};
439
440pub mod scopes {
446 pub const ADMIN_READ: &str = "admin:read";
448 pub const ADMIN_WRITE: &str = "admin:write";
450 pub const CONFIG_WRITE: &str = "config:write";
452 pub const SERVICE_MANAGE: &str = "service:manage";
454 pub const SENSOR_READ: &str = "sensor:read";
456 pub const SENSOR_WRITE: &str = "sensor:write";
458
459 pub const ALL: &[&str] = &[
461 ADMIN_READ,
462 ADMIN_WRITE,
463 CONFIG_WRITE,
464 SERVICE_MANAGE,
465 SENSOR_READ,
466 SENSOR_WRITE,
467 ];
468}
469
470#[allow(dead_code)]
479mod error_codes {
480 pub const BAD_REQUEST: &str = "BAD_REQUEST";
481 pub const VALIDATION_ERROR: &str = "VALIDATION_ERROR";
482 pub const NOT_FOUND: &str = "NOT_FOUND";
483 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
484 pub const SERVICE_UNAVAILABLE: &str = "SERVICE_UNAVAILABLE";
485 pub const RATE_LIMIT_EXCEEDED: &str = "RATE_LIMIT_EXCEEDED";
486 pub const UNAUTHORIZED: &str = "UNAUTHORIZED";
487 pub const FORBIDDEN: &str = "FORBIDDEN";
488 pub const INSUFFICIENT_SCOPE: &str = "INSUFFICIENT_SCOPE";
489}
490
491#[derive(Debug, serde::Serialize)]
493struct ProblemDetails {
494 #[serde(rename = "type")]
495 type_url: String,
496 title: String,
497 status: u16,
498 detail: String,
499 #[serde(skip_serializing_if = "Option::is_none")]
500 instance: Option<String>,
501 #[serde(skip_serializing_if = "Option::is_none")]
502 code: Option<String>,
503 #[serde(skip_serializing_if = "Option::is_none")]
504 required_scope: Option<String>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 retry_after_secs: Option<u64>,
507}
508
509impl ProblemDetails {
510 fn new(status: StatusCode, detail: impl Into<String>) -> Self {
511 Self {
512 type_url: "about:blank".to_string(),
513 title: status.canonical_reason().unwrap_or("Error").to_string(),
514 status: status.as_u16(),
515 detail: detail.into(),
516 instance: None,
517 code: None,
518 required_scope: None,
519 retry_after_secs: None,
520 }
521 }
522}
523
524fn sanitized_error(
529 status: StatusCode,
530 code: &str,
531 public_message: &str,
532 internal_error: Option<&dyn std::fmt::Display>,
533) -> Response {
534 if let Some(err) = internal_error {
536 tracing::warn!(
537 code = code,
538 status = %status,
539 internal_error = %err,
540 "Admin API error (sanitized for client)"
541 );
542 }
543
544 let mut problem = ProblemDetails::new(status, public_message.to_string());
545 problem.code = Some(code.to_string());
546
547 (status, Json(problem)).into_response()
548}
549
550fn validation_error(
552 public_message: &str,
553 internal_error: Option<&dyn std::fmt::Display>,
554) -> Response {
555 sanitized_error(
556 StatusCode::BAD_REQUEST,
557 error_codes::VALIDATION_ERROR,
558 public_message,
559 internal_error,
560 )
561}
562
563fn internal_error(
565 public_message: &str,
566 internal_error: Option<&dyn std::fmt::Display>,
567) -> Response {
568 sanitized_error(
569 StatusCode::INTERNAL_SERVER_ERROR,
570 error_codes::INTERNAL_ERROR,
571 public_message,
572 internal_error,
573 )
574}
575
576fn forbidden_error(public_message: &str) -> Response {
578 sanitized_error(
579 StatusCode::FORBIDDEN,
580 error_codes::FORBIDDEN,
581 public_message,
582 None,
583 )
584}
585
586fn rate_limit_error(retry_after_secs: u64) -> Response {
588 let mut problem = ProblemDetails::new(StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded");
589 problem.code = Some(error_codes::RATE_LIMIT_EXCEEDED.to_string());
590 problem.retry_after_secs = Some(retry_after_secs);
591
592 (
593 StatusCode::TOO_MANY_REQUESTS,
594 [(header::RETRY_AFTER, retry_after_secs.to_string())],
595 Json(problem),
596 )
597 .into_response()
598}
599
600#[allow(dead_code)]
602fn not_found_error(resource_type: &str, _resource_id: &str) -> Response {
603 sanitized_error(
605 StatusCode::NOT_FOUND,
606 error_codes::NOT_FOUND,
607 &format!("{} not found", resource_type),
608 None,
609 )
610}
611
612fn service_unavailable(service_name: &str) -> Response {
614 sanitized_error(
615 StatusCode::SERVICE_UNAVAILABLE,
616 error_codes::SERVICE_UNAVAILABLE,
617 &format!("{} is not available", service_name),
618 None,
619 )
620}
621
622use crate::config_manager::{
623 AccessListRequest, ConfigManagerError, CreateSiteRequest, CustomRuleAction,
624 CustomRuleCondition, CustomRuleInput, CustomRuleUpdate, RateLimitRequest, RuleView,
625 SiteWafRequest, StoredRule, UpdateSiteRequest,
626};
627
628async fn config_handler(State(state): State<AdminState>) -> impl IntoResponse {
630 let response = state.handler.handle_get_config();
631 wrap_response(response)
632}
633
634async fn update_config_handler(
636 State(state): State<AdminState>,
637 Json(config): Json<crate::config::ConfigFile>,
638) -> impl IntoResponse {
639 let response = state.handler.handle_update_config(config);
640 wrap_response(response)
641}
642
643type IpRateLimiter =
645 RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, governor::clock::DefaultClock>;
646type StringRateLimiter =
647 RateLimiter<String, DefaultKeyedStateStore<String>, governor::clock::DefaultClock>;
648
649#[derive(Clone)]
651pub struct AdminState {
652 pub handler: Arc<ApiHandler>,
653 pub admin_api_key: String,
655 pub admin_auth_disabled: bool,
657 pub admin_scopes: Vec<String>,
659 pub signal_permissions: Arc<SignalPermissions>,
661 pub admin_rate_limiter: Arc<IpRateLimiter>,
663 pub public_rate_limiter: Arc<IpRateLimiter>,
664 pub report_rate_limiter: Arc<StringRateLimiter>,
666 pub auth_failure_limiter: Arc<IpRateLimiter>,
671}
672
673#[derive(Clone, Debug)]
675pub struct SignalPermissions {
676 default_allowed: HashSet<String>,
677 per_sensor_allowed: HashMap<String, HashSet<String>>,
678}
679
680impl SignalPermissions {
681 fn default_allowed() -> HashSet<String> {
682 [
683 "honeypot_hit",
684 "trap_trigger",
685 "protocol_probe",
686 "dlp_match",
687 ]
688 .into_iter()
689 .map(|value| value.to_string())
690 .collect()
691 }
692
693 pub fn is_allowed(&self, sensor_id: &str, signal_type: &str) -> bool {
694 let normalized = signal_type.trim().to_lowercase();
695 if let Some(allowlist) = self.per_sensor_allowed.get(sensor_id) {
696 allowlist.contains(&normalized)
697 } else {
698 self.default_allowed.contains(&normalized)
699 }
700 }
701}
702
703impl Default for SignalPermissions {
704 fn default() -> Self {
705 Self {
706 default_allowed: Self::default_allowed(),
707 per_sensor_allowed: HashMap::new(),
708 }
709 }
710}
711
712async fn require_auth(
719 State(state): State<AdminState>,
720 request: Request,
721 next: Next,
722) -> Result<Response, Response> {
723 if state.admin_auth_disabled {
724 return Ok(next.run(request).await);
725 }
726 if is_dev_mode() {
728 return Ok(next.run(request).await);
729 }
730
731 let client_ip = extract_client_ip(&request);
732 let provided_key = request
733 .headers()
734 .get("X-Admin-Key")
735 .and_then(|v| v.to_str().ok());
736
737 match validate_admin_key(&state, client_ip, provided_key) {
738 Ok(()) => Ok(next.run(request).await),
739 Err(response) => Err(response),
740 }
741}
742
743fn record_auth_failure(state: &AdminState, client_ip: IpAddr) -> Result<(), std::time::Duration> {
748 match state.auth_failure_limiter.check_key(&client_ip) {
749 Ok(_) => Ok(()), Err(not_until) => {
751 warn!(
752 client_ip = %client_ip,
753 "Auth rate limit exceeded: too many failed authentication attempts"
754 );
755 Err(not_until.wait_time_from(governor::clock::Clock::now(
756 &governor::clock::DefaultClock::default(),
757 )))
758 }
759 }
760}
761
762fn validate_admin_key(
763 state: &AdminState,
764 client_ip: IpAddr,
765 provided_key: Option<&str>,
766) -> Result<(), Response> {
767 match provided_key {
768 Some(key) => {
769 let key_bytes = key.as_bytes();
770 let expected_bytes = state.admin_api_key.as_bytes();
771 let is_valid = key_bytes.len() == expected_bytes.len()
772 && bool::from(key_bytes.ct_eq(expected_bytes));
773
774 if is_valid {
775 Ok(())
776 } else {
777 match record_auth_failure(state, client_ip) {
778 Ok(()) => {
779 warn!(client_ip = %client_ip, "Admin auth failed: invalid API key");
780 let mut problem = ProblemDetails::new(
781 StatusCode::UNAUTHORIZED,
782 "Invalid X-Admin-Key value",
783 );
784 problem.code = Some(error_codes::UNAUTHORIZED.to_string());
785 Err((StatusCode::UNAUTHORIZED, Json(problem)).into_response())
786 }
787 Err(retry_after) => {
788 warn!(client_ip = %client_ip, "Admin auth failed: too many attempts");
789 let mut problem = ProblemDetails::new(
790 StatusCode::TOO_MANY_REQUESTS,
791 "Too many failed authentication attempts",
792 );
793 problem.code = Some(error_codes::RATE_LIMIT_EXCEEDED.to_string());
794 problem.retry_after_secs = Some(retry_after.as_secs());
795 Err((
796 StatusCode::TOO_MANY_REQUESTS,
797 [(header::RETRY_AFTER, retry_after.as_secs().to_string())],
798 Json(problem),
799 )
800 .into_response())
801 }
802 }
803 }
804 }
805 None => match record_auth_failure(state, client_ip) {
806 Ok(()) => {
807 warn!(client_ip = %client_ip, "Admin auth failed: missing X-Admin-Key header");
808 let mut problem =
809 ProblemDetails::new(StatusCode::UNAUTHORIZED, "Missing X-Admin-Key header");
810 problem.code = Some(error_codes::UNAUTHORIZED.to_string());
811 Err((StatusCode::UNAUTHORIZED, Json(problem)).into_response())
812 }
813 Err(retry_after) => {
814 warn!(client_ip = %client_ip, "Admin auth failed: too many attempts");
815 let mut problem = ProblemDetails::new(
816 StatusCode::TOO_MANY_REQUESTS,
817 "Too many failed authentication attempts",
818 );
819 problem.code = Some(error_codes::RATE_LIMIT_EXCEEDED.to_string());
820 problem.retry_after_secs = Some(retry_after.as_secs());
821 Err((
822 StatusCode::TOO_MANY_REQUESTS,
823 [(header::RETRY_AFTER, retry_after.as_secs().to_string())],
824 Json(problem),
825 )
826 .into_response())
827 }
828 },
829 }
830}
831
832fn extract_admin_key_from_query(request: &Request) -> Option<String> {
834 let query = request.uri().query()?;
835 for pair in query.split('&') {
836 let mut iter = pair.splitn(2, '=');
837 let key = iter.next().unwrap_or_default();
838 if key != "admin_key" {
839 continue;
840 }
841 let raw = iter.next().unwrap_or_default();
842 let decoded = percent_decode_str(raw).decode_utf8().ok()?;
843 return Some(decoded.into_owned());
844 }
845 None
846}
847
848async fn require_ws_auth(
850 State(state): State<AdminState>,
851 request: Request,
852 next: Next,
853) -> Result<Response, Response> {
854 if state.admin_auth_disabled {
855 return Ok(next.run(request).await);
856 }
857 if is_dev_mode() {
858 return Ok(next.run(request).await);
859 }
860
861 let client_ip = extract_client_ip(&request);
862 let header_key = request
863 .headers()
864 .get("X-Admin-Key")
865 .and_then(|v| v.to_str().ok());
866 let query_key = extract_admin_key_from_query(&request);
867 let provided_key = header_key.or(query_key.as_deref());
868
869 match validate_admin_key(&state, client_ip, provided_key) {
870 Ok(()) => Ok(next.run(request).await),
871 Err(response) => Err(response),
872 }
873}
874
875fn extract_client_ip(request: &Request) -> IpAddr {
877 if let Some(xff) = request.headers().get("X-Forwarded-For") {
879 if let Ok(xff_str) = xff.to_str() {
880 if let Some(first_ip) = xff_str.split(',').next() {
882 if let Ok(ip) = first_ip.trim().parse() {
883 return ip;
884 }
885 }
886 }
887 }
888
889 if let Some(real_ip) = request.headers().get("X-Real-IP") {
891 if let Ok(real_ip_str) = real_ip.to_str() {
892 if let Ok(ip) = real_ip_str.trim().parse() {
893 return ip;
894 }
895 }
896 }
897
898 IpAddr::from([127, 0, 0, 1])
901}
902
903async fn require_admin_write(
905 State(state): State<AdminState>,
906 request: Request,
907 next: Next,
908) -> Result<Response, Response> {
909 check_scope(&state, scopes::ADMIN_WRITE, request, next).await
910}
911
912async fn require_config_write(
914 State(state): State<AdminState>,
915 request: Request,
916 next: Next,
917) -> Result<Response, Response> {
918 check_scope(&state, scopes::CONFIG_WRITE, request, next).await
919}
920
921async fn require_service_manage(
923 State(state): State<AdminState>,
924 request: Request,
925 next: Next,
926) -> Result<Response, Response> {
927 check_scope(&state, scopes::SERVICE_MANAGE, request, next).await
928}
929
930async fn require_sensor_read(
932 State(state): State<AdminState>,
933 request: Request,
934 next: Next,
935) -> Result<Response, Response> {
936 check_scope(&state, scopes::SENSOR_READ, request, next).await
937}
938
939async fn require_sensor_write(
941 State(state): State<AdminState>,
942 request: Request,
943 next: Next,
944) -> Result<Response, Response> {
945 check_scope(&state, scopes::SENSOR_WRITE, request, next).await
946}
947
948async fn require_admin_read(
950 State(state): State<AdminState>,
951 request: Request,
952 next: Next,
953) -> Result<Response, Response> {
954 check_scope(&state, scopes::ADMIN_READ, request, next).await
955}
956
957async fn check_scope(
959 state: &AdminState,
960 required_scope: &str,
961 request: Request,
962 next: Next,
963) -> Result<Response, Response> {
964 if state.admin_auth_disabled {
965 return Ok(next.run(request).await);
966 }
967 if state
969 .admin_scopes
970 .iter()
971 .any(|s| s == required_scope || s == "*")
972 {
973 Ok(next.run(request).await)
974 } else {
975 warn!(
976 "Scope check failed: required '{}', granted: {:?}",
977 required_scope, state.admin_scopes
978 );
979 let mut problem = ProblemDetails::new(StatusCode::FORBIDDEN, "Insufficient scope");
980 problem.code = Some(error_codes::INSUFFICIENT_SCOPE.to_string());
981 problem.required_scope = Some(required_scope.to_string());
982 Err((StatusCode::FORBIDDEN, Json(problem)).into_response())
983 }
984}
985
986async fn rate_limit_admin(
989 State(state): State<AdminState>,
990 request: Request,
991 next: Next,
992) -> Result<Response, Response> {
993 if is_dev_mode() {
995 return Ok(next.run(request).await);
996 }
997
998 let client_ip = extract_client_ip(&request);
1000
1001 match state.admin_rate_limiter.check_key(&client_ip) {
1002 Ok(_) => Ok(next.run(request).await),
1003 Err(not_until) => {
1004 let retry_after = not_until.wait_time_from(governor::clock::Clock::now(
1005 &governor::clock::DefaultClock::default(),
1006 ));
1007 warn!("Admin rate limit exceeded for IP: {}", client_ip);
1008 let mut problem =
1009 ProblemDetails::new(StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded");
1010 problem.code = Some(error_codes::RATE_LIMIT_EXCEEDED.to_string());
1011 problem.retry_after_secs = Some(retry_after.as_secs());
1012 Err((
1013 StatusCode::TOO_MANY_REQUESTS,
1014 [(header::RETRY_AFTER, retry_after.as_secs().to_string())],
1015 Json(problem),
1016 )
1017 .into_response())
1018 }
1019 }
1020}
1021
1022async fn rate_limit_public(
1024 State(state): State<AdminState>,
1025 request: Request,
1026 next: Next,
1027) -> Result<Response, Response> {
1028 if is_dev_mode() {
1030 return Ok(next.run(request).await);
1031 }
1032
1033 let client_ip = extract_client_ip(&request);
1034
1035 match state.public_rate_limiter.check_key(&client_ip) {
1036 Ok(_) => Ok(next.run(request).await),
1037 Err(not_until) => {
1038 let retry_after = not_until.wait_time_from(governor::clock::Clock::now(
1039 &governor::clock::DefaultClock::default(),
1040 ));
1041 warn!("Public rate limit exceeded for IP: {}", client_ip);
1042 let mut problem =
1043 ProblemDetails::new(StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded");
1044 problem.code = Some(error_codes::RATE_LIMIT_EXCEEDED.to_string());
1045 problem.retry_after_secs = Some(retry_after.as_secs());
1046 Err((
1047 StatusCode::TOO_MANY_REQUESTS,
1048 [(header::RETRY_AFTER, retry_after.as_secs().to_string())],
1049 Json(problem),
1050 )
1051 .into_response())
1052 }
1053 }
1054}
1055
1056async fn audit_log(request: Request, next: Next) -> Response {
1058 let method = request.method().clone();
1059 let path = request.uri().path().to_string();
1060 let client_ip = extract_client_ip(&request);
1061 let start = Instant::now();
1062
1063 let response = next.run(request).await;
1064
1065 if matches!(
1066 method,
1067 Method::POST | Method::PUT | Method::DELETE | Method::PATCH
1068 ) {
1069 let status = response.status();
1070 let duration_ms = start.elapsed().as_millis();
1071 let actor = client_ip.to_string();
1072
1073 tracing::info!(
1074 target: "audit",
1075 actor = actor,
1076 method = %method,
1077 path = %path,
1078 status = status.as_u16(),
1079 client_ip = %client_ip,
1080 duration_ms = duration_ms,
1081 "admin_api_mutation"
1082 );
1083
1084 record_log_with_source(
1085 "info",
1086 LogSource::Access,
1087 format!(
1088 "audit actor={} method={} path={} status={} ip={} duration_ms={}",
1089 actor,
1090 method.as_str(),
1091 path,
1092 status.as_u16(),
1093 client_ip,
1094 duration_ms
1095 ),
1096 );
1097 }
1098
1099 response
1100}
1101
1102async fn security_headers(request: Request, next: Next) -> Response {
1105 let mut response = next.run(request).await;
1106 let headers = response.headers_mut();
1107
1108 headers.insert(
1110 header::X_CONTENT_TYPE_OPTIONS,
1111 HeaderValue::from_static("nosniff"),
1112 );
1113
1114 headers.insert(header::X_FRAME_OPTIONS, HeaderValue::from_static("DENY"));
1116
1117 headers.insert(
1119 header::REFERRER_POLICY,
1120 HeaderValue::from_static("strict-origin-when-cross-origin"),
1121 );
1122
1123 headers.insert(
1125 header::CACHE_CONTROL,
1126 HeaderValue::from_static("no-store, no-cache, must-revalidate"),
1127 );
1128
1129 if !headers.contains_key(header::CONTENT_SECURITY_POLICY) {
1132 headers.insert(
1133 header::CONTENT_SECURITY_POLICY,
1134 HeaderValue::from_static("default-src 'none'; frame-ancestors 'none'"),
1135 );
1136 }
1137
1138 headers.insert(
1140 http::header::HeaderName::from_static("permissions-policy"),
1141 HeaderValue::from_static(
1142 "accelerometer=(), camera=(), geolocation=(), gyroscope=(), magnetometer=(), microphone=(), payment=(), usb=()",
1143 ),
1144 );
1145
1146 response
1147}
1148
1149fn build_response(
1150 status: StatusCode,
1151 body: String,
1152 content_type: &'static str,
1153 disposition: Option<String>,
1154) -> Response {
1155 let mut response = Response::new(Body::from(body));
1156 *response.status_mut() = status;
1157 response
1158 .headers_mut()
1159 .insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
1160
1161 if let Some(disposition) = disposition {
1162 match HeaderValue::from_str(&disposition) {
1163 Ok(value) => {
1164 response
1165 .headers_mut()
1166 .insert(header::CONTENT_DISPOSITION, value);
1167 }
1168 Err(err) => {
1169 warn!("Invalid Content-Disposition header value: {}", err);
1170 }
1171 }
1172 }
1173
1174 response
1175}
1176
1177pub async fn start_admin_server(
1185 addr: SocketAddr,
1186 handler: Arc<ApiHandler>,
1187 admin_api_key: String,
1188 admin_auth_disabled: bool,
1189) -> std::io::Result<()> {
1190 let admin_scopes: Vec<String> = scopes::ALL.iter().map(|s| s.to_string()).collect();
1192 let admin_rate_limiter = Arc::new(RateLimiter::keyed(Quota::per_minute(
1197 NonZeroU32::new(100).unwrap_or(NonZeroU32::MIN),
1198 )));
1199 let public_rate_limiter = Arc::new(RateLimiter::keyed(Quota::per_minute(
1200 NonZeroU32::new(1000).unwrap_or(NonZeroU32::MIN),
1201 )));
1202 let auth_failure_limiter = Arc::new(RateLimiter::keyed(Quota::per_minute(
1204 NonZeroU32::new(5).unwrap_or(NonZeroU32::MIN),
1205 )));
1206 let report_rate_limiter = Arc::new(RateLimiter::keyed(Quota::per_minute(
1208 NonZeroU32::new(100).unwrap_or(NonZeroU32::MIN),
1209 )));
1210
1211 let state = AdminState {
1212 handler,
1213 admin_api_key,
1214 admin_auth_disabled,
1215 admin_scopes,
1216 signal_permissions: Arc::new(SignalPermissions::default()),
1217 admin_rate_limiter,
1218 public_rate_limiter,
1219 report_rate_limiter,
1220 auth_failure_limiter,
1221 };
1222
1223 record_metrics_sample();
1225
1226 if let Err(err) = recover_sysctl_wal() {
1227 warn!("Failed to recover sysctl WAL: {}", err);
1228 }
1229
1230 record_log(
1232 "info",
1233 format!("Synapse-Pingora admin server starting on {}", addr),
1234 );
1235 record_log(
1236 "info",
1237 "WAF engine initialized with 237 detection rules".to_string(),
1238 );
1239 record_log(
1240 "info",
1241 format!(
1242 "Platform: {} {}",
1243 std::env::consts::OS,
1244 std::env::consts::ARCH
1245 ),
1246 );
1247
1248 let cors = CorsLayer::new()
1250 .allow_origin(Any)
1251 .allow_methods([
1252 Method::GET,
1253 Method::POST,
1254 Method::PUT,
1255 Method::DELETE,
1256 Method::OPTIONS,
1257 ])
1258 .allow_headers([
1259 header::CONTENT_TYPE,
1260 header::ACCEPT,
1261 header::AUTHORIZATION,
1262 header::HeaderName::from_static("x-admin-key"),
1263 ]);
1264
1265 let admin_write_routes = Router::new()
1267 .route("/reload", post(reload_handler))
1268 .route("/test", post(test_handler))
1269 .route("/config", post(update_config_handler))
1270 .route_layer(middleware::from_fn_with_state(
1271 state.clone(),
1272 require_admin_write,
1273 ))
1274 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1275 .route_layer(middleware::from_fn_with_state(
1276 state.clone(),
1277 rate_limit_admin,
1278 ));
1279
1280 let config_write_routes = Router::new()
1282 .route("/sites", post(create_site_handler))
1283 .route(
1284 "/sites/{hostname}",
1285 put(update_site_handler).delete(delete_site_handler),
1286 )
1287 .route("/sites/{hostname}/waf", put(update_site_waf_handler))
1288 .route(
1289 "/sites/{hostname}/rate-limit",
1290 put(update_site_rate_limit_handler),
1291 )
1292 .route(
1293 "/sites/{hostname}/access-list",
1294 put(update_site_access_list_handler),
1295 )
1296 .route("/sites/{hostname}/shadow", put(update_site_shadow_handler))
1297 .route("/debug/profiles/save", post(save_profiles_handler))
1298 .route("/config", get(config_handler))
1300 .route_layer(middleware::from_fn_with_state(
1301 state.clone(),
1302 require_config_write,
1303 ))
1304 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1305 .route_layer(middleware::from_fn_with_state(
1306 state.clone(),
1307 rate_limit_admin,
1308 ));
1309
1310 let service_manage_routes = Router::new()
1312 .route("/restart", post(restart_handler))
1313 .route("/api/profiles/reset", post(api_profiles_reset_handler))
1314 .route("/api/schemas/reset", post(api_schemas_reset_handler))
1315 .route_layer(middleware::from_fn_with_state(
1316 state.clone(),
1317 require_service_manage,
1318 ))
1319 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1320 .route_layer(middleware::from_fn_with_state(
1321 state.clone(),
1322 rate_limit_admin,
1323 ));
1324
1325 let sensor_read_routes = Router::new()
1328 .route(
1329 "/_sensor/campaigns/:id/graph",
1330 get(sensor_campaign_graph_handler),
1331 )
1332 .route(
1333 "/_sensor/campaigns/:id/actors",
1334 get(sensor_campaign_actors_handler),
1335 )
1336 .route(
1337 "/_sensor/campaigns/:id/timeline",
1338 get(sensor_campaign_timeline_handler),
1339 )
1340 .route("/_sensor/dlp/stats", get(sensor_dlp_stats_handler))
1341 .route(
1342 "/_sensor/dlp/violations",
1343 get(sensor_dlp_violations_handler),
1344 )
1345 .route(
1346 "/_sensor/actors/:actor_id",
1347 get(sensor_actor_detail_handler),
1348 )
1349 .route(
1350 "/_sensor/actors/:actor_id/timeline",
1351 get(sensor_actor_timeline_handler),
1352 )
1353 .route(
1354 "/_sensor/sessions/:session_id",
1355 get(sensor_session_detail_handler),
1356 )
1357 .route("/_sensor/entities", get(sensor_entities_handler))
1358 .route("/_sensor/status", get(sensor_status_handler))
1359 .route("/_sensor/config", get(sensor_config_handler))
1360 .route_layer(middleware::from_fn_with_state(
1361 state.clone(),
1362 require_sensor_read,
1363 ))
1364 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1365 .route_layer(middleware::from_fn_with_state(
1366 state.clone(),
1367 rate_limit_admin,
1368 ));
1369
1370 let sensor_write_routes = Router::new()
1372 .route("/_sensor/report", post(sensor_report_handler))
1373 .route_layer(middleware::from_fn_with_state(
1374 state.clone(),
1375 require_sensor_write,
1376 ))
1377 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1378 .route_layer(middleware::from_fn_with_state(
1379 state.clone(),
1380 rate_limit_admin,
1381 ));
1382
1383 let rules_read_routes = Router::new()
1385 .route("/_sensor/rules", get(sensor_rules_handler))
1386 .route_layer(middleware::from_fn_with_state(
1387 state.clone(),
1388 require_admin_read,
1389 ))
1390 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1391 .route_layer(middleware::from_fn_with_state(
1392 state.clone(),
1393 rate_limit_admin,
1394 ));
1395
1396 let rules_write_routes = Router::new()
1397 .route("/_sensor/rules", post(sensor_rules_create_handler))
1398 .route(
1399 "/_sensor/rules/:rule_id",
1400 put(sensor_rules_update_handler).delete(sensor_rules_delete_handler),
1401 )
1402 .route_layer(middleware::from_fn_with_state(
1403 state.clone(),
1404 require_admin_write,
1405 ))
1406 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1407 .route_layer(middleware::from_fn_with_state(
1408 state.clone(),
1409 rate_limit_admin,
1410 ));
1411
1412 let kernel_config_read_routes = Router::new()
1413 .route("/_sensor/config/kernel", get(config_kernel_get_handler))
1414 .route_layer(middleware::from_fn_with_state(
1415 state.clone(),
1416 require_admin_read,
1417 ))
1418 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1419 .route_layer(middleware::from_fn_with_state(
1420 state.clone(),
1421 rate_limit_admin,
1422 ));
1423
1424 let kernel_config_write_routes = Router::new()
1425 .route("/_sensor/config/kernel", put(config_kernel_put_handler))
1426 .route_layer(middleware::from_fn_with_state(
1427 state.clone(),
1428 require_admin_write,
1429 ))
1430 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1431 .route_layer(middleware::from_fn_with_state(
1432 state.clone(),
1433 rate_limit_admin,
1434 ));
1435
1436 let admin_read_routes = Router::new()
1438 .route("/console", get(admin_console_handler))
1439 .route("/_sensor/system/logs", get(sensor_system_logs_handler))
1440 .route("/_sensor/logs", get(logs_handler))
1441 .route("/_sensor/logs/:source", get(logs_by_source_handler))
1442 .route_layer(middleware::from_fn_with_state(
1443 state.clone(),
1444 require_admin_read,
1445 ))
1446 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1447 .route_layer(middleware::from_fn_with_state(
1448 state.clone(),
1449 rate_limit_public,
1450 ));
1451
1452 let public_routes = Router::new()
1454 .route("/health", get(health_handler))
1455 .route("/_sensor/demo", get(sensor_demo_get_handler))
1457 .route(
1458 "/_sensor/demo/toggle",
1459 post(sensor_demo_toggle_handler).get(sensor_demo_toggle_handler),
1460 )
1461 .route("/_sensor/access-lists", get(sensor_access_lists_handler))
1462 .route("/_sensor/certificates", get(sensor_certificates_handler))
1463 .route(
1464 "/_sensor/bot-indicators",
1465 get(sensor_bot_indicators_handler),
1466 )
1467 .route(
1468 "/_sensor/header-profiles",
1469 get(sensor_header_profiles_handler),
1470 );
1471
1472 let debugger_routes = Router::new()
1474 .route("/_sensor/debugger/ws", get(waf_debugger_ws_handler))
1475 .route_layer(middleware::from_fn_with_state(
1476 state.clone(),
1477 require_ws_auth,
1478 ))
1479 .route_layer(middleware::from_fn_with_state(
1480 state.clone(),
1481 rate_limit_public,
1482 ));
1483
1484 let authenticated_routes = Router::new()
1486 .route("/metrics", get(metrics_handler))
1487 .route("/sites", get(sites_handler))
1488 .route("/sites/{hostname}", get(get_site_handler))
1489 .route("/stats", get(stats_handler))
1490 .route("/waf/stats", get(waf_stats_handler))
1491 .route("/debug/profiles", get(profiles_handler))
1492 .route("/_sensor/health", get(health_handler))
1494 .route(
1495 "/_sensor/entities/release-all",
1496 post(sensor_release_all_handler),
1497 )
1498 .route(
1499 "/_sensor/entities/:ip",
1500 delete(sensor_release_entity_handler),
1501 )
1502 .route("/_sensor/metrics/reset", post(sensor_metrics_reset_handler))
1503 .route("/_sensor/blocks", get(sensor_blocks_handler))
1504 .route("/_sensor/trends", get(sensor_trends_handler))
1505 .route("/_sensor/signals", get(sensor_signals_handler))
1506 .route("/_sensor/anomalies", get(sensor_anomalies_handler))
1507 .route("/_sensor/campaigns", get(sensor_campaigns_handler))
1508 .route(
1509 "/_sensor/campaigns/:id",
1510 get(sensor_campaign_detail_handler),
1511 )
1512 .route("/_sensor/payload/bandwidth", get(sensor_bandwidth_handler))
1514 .route("/_sensor/actors", get(sensor_actors_handler))
1515 .route("/_sensor/sessions", get(sensor_sessions_handler))
1517 .route("/_sensor/stuffing", get(sensor_stuffing_handler))
1519 .route("/_sensor/system/config", get(sensor_system_config_handler))
1520 .route(
1521 "/_sensor/system/overview",
1522 get(sensor_system_overview_handler),
1523 )
1524 .route(
1525 "/_sensor/system/performance",
1526 get(sensor_system_performance_handler),
1527 )
1528 .route(
1529 "/_sensor/system/network",
1530 get(sensor_system_network_handler),
1531 )
1532 .route(
1533 "/_sensor/system/processes",
1534 get(sensor_system_processes_handler),
1535 )
1536 .route(
1539 "/_sensor/profiling/templates",
1540 get(profiling_templates_handler),
1541 )
1542 .route(
1543 "/_sensor/profiling/baselines",
1544 get(profiling_baselines_handler),
1545 )
1546 .route("/_sensor/profiling/schemas", get(profiling_schemas_handler))
1547 .route(
1548 "/_sensor/profiling/schema/discovery",
1549 get(profiling_discovery_handler),
1550 )
1551 .route(
1552 "/_sensor/profiling/anomalies",
1553 get(profiling_anomalies_handler),
1554 )
1555 .route("/api/profiles", get(api_profiles_list_handler))
1557 .route("/api/profiles/:template", get(api_profiles_detail_handler))
1558 .route("/api/schemas", get(api_schemas_list_handler))
1559 .route("/api/schemas/:template", get(api_schemas_detail_handler))
1560 .route("/_sensor/shadow/status", get(sensor_shadow_status_handler))
1562 .route("/sites/{hostname}/shadow", get(get_site_shadow_handler))
1563 .route("/_sensor/evaluate", post(sensor_evaluate_handler))
1565 .route(
1567 "/_sensor/config/dlp",
1568 get(config_dlp_get_handler).put(config_dlp_put_handler),
1569 )
1570 .route(
1571 "/_sensor/config/block-page",
1572 get(config_block_page_get_handler).put(config_block_page_put_handler),
1573 )
1574 .route(
1575 "/_sensor/config/crawler",
1576 get(config_crawler_get_handler).put(config_crawler_put_handler),
1577 )
1578 .route(
1579 "/_sensor/config/tarpit",
1580 get(config_tarpit_get_handler).put(config_tarpit_put_handler),
1581 )
1582 .route(
1583 "/_sensor/config/travel",
1584 get(config_travel_get_handler).put(config_travel_put_handler),
1585 )
1586 .route(
1587 "/_sensor/config/entity",
1588 get(config_entity_get_handler).put(config_entity_put_handler),
1589 )
1590 .route(
1591 "/_sensor/config/integrations",
1592 get(config_integrations_get_handler).put(config_integrations_put_handler),
1593 )
1594 .route("/_sensor/diagnostic-bundle", get(diagnostic_bundle_handler))
1597 .route("/_sensor/config/export", get(config_export_handler))
1599 .route("/_sensor/config/import", post(config_import_handler))
1600 .route("/", get(root_handler))
1601 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
1602 .route_layer(middleware::from_fn_with_state(
1604 state.clone(),
1605 rate_limit_public,
1606 ));
1607
1608 let app = Router::new()
1609 .merge(admin_write_routes)
1610 .merge(config_write_routes)
1611 .merge(service_manage_routes)
1612 .merge(sensor_read_routes)
1613 .merge(sensor_write_routes)
1614 .merge(rules_read_routes)
1615 .merge(rules_write_routes)
1616 .merge(kernel_config_read_routes)
1617 .merge(kernel_config_write_routes)
1618 .merge(admin_read_routes)
1619 .merge(debugger_routes)
1620 .merge(authenticated_routes)
1621 .merge(public_routes)
1622 .layer(middleware::from_fn(security_headers))
1623 .layer(middleware::from_fn(audit_log))
1624 .layer(cors)
1625 .with_state(state);
1626
1627 info!("Admin HTTP server listening on {}", addr);
1628
1629 let listener = tokio::net::TcpListener::bind(addr).await?;
1630 axum::serve(listener, app)
1631 .await
1632 .map_err(std::io::Error::other)
1633}
1634
1635async fn root_handler() -> impl IntoResponse {
1637 Json(serde_json::json!({
1638 "service": "synapse-pingora",
1639 "version": env!("CARGO_PKG_VERSION"),
1640 "endpoints": [
1641 { "method": "GET", "path": "/console", "description": "Admin console UI" },
1642 { "method": "GET", "path": "/health", "description": "Health check" },
1643 { "method": "GET", "path": "/metrics", "description": "Prometheus metrics" },
1644 { "method": "POST", "path": "/reload", "description": "Reload configuration" },
1645 { "method": "POST", "path": "/test", "description": "Test configuration" },
1646 { "method": "POST", "path": "/restart", "description": "Restart service" },
1647 { "method": "GET", "path": "/sites", "description": "List sites" },
1648 { "method": "GET", "path": "/stats", "description": "Runtime statistics" },
1649 { "method": "GET", "path": "/waf/stats", "description": "WAF statistics" }
1650 ]
1651 }))
1652}
1653
1654async fn admin_console_handler() -> impl IntoResponse {
1656 let content = if is_dev_mode() {
1657 match std::fs::read_to_string("assets/admin_console.html") {
1659 Ok(content) => content,
1660 Err(e) => {
1661 tracing::warn!(
1662 "Failed to read admin console from disk: {}, falling back to embedded",
1663 e
1664 );
1665 ADMIN_CONSOLE_TEMPLATE.to_string()
1666 }
1667 }
1668 } else {
1669 ADMIN_CONSOLE_TEMPLATE.to_string()
1670 };
1671
1672 (
1673 StatusCode::OK,
1674 [
1675 (header::CONTENT_TYPE, "text/html; charset=utf-8"),
1676 (header::CONTENT_SECURITY_POLICY, ADMIN_CONSOLE_CSP),
1677 ],
1678 Html(content),
1679 )
1680}
1681
1682async fn health_handler(State(state): State<AdminState>) -> impl IntoResponse {
1684 let response = state.handler.handle_health();
1685 wrap_response(response)
1686}
1687
1688async fn metrics_handler(State(state): State<AdminState>) -> impl IntoResponse {
1690 let metrics = state.handler.handle_metrics();
1691 (
1692 StatusCode::OK,
1693 [(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
1694 metrics,
1695 )
1696}
1697
1698async fn reload_handler(State(state): State<AdminState>) -> impl IntoResponse {
1700 let response = state.handler.handle_reload();
1701 wrap_response(response)
1702}
1703
1704async fn test_handler(State(_state): State<AdminState>) -> impl IntoResponse {
1706 let response: ApiResponse<TestResult> = ApiResponse::ok(TestResult {
1709 success: true,
1710 message: "Configuration syntax OK".to_string(),
1711 });
1712 wrap_response(response)
1713}
1714
1715async fn restart_handler() -> impl IntoResponse {
1717 let response: ApiResponse<RestartResult> = ApiResponse::ok(RestartResult {
1720 success: true,
1721 message: "Restart signaled (hot-reload applied)".to_string(),
1722 });
1723 wrap_response(response)
1724}
1725
1726async fn sites_handler(State(state): State<AdminState>) -> impl IntoResponse {
1728 if is_demo_mode() {
1730 return (StatusCode::OK, Json(demo_sites())).into_response();
1731 }
1732
1733 let response = state.handler.handle_list_sites();
1734 wrap_response(response)
1735}
1736
1737async fn create_site_handler(
1743 State(state): State<AdminState>,
1744 Json(request): Json<CreateSiteRequest>,
1745) -> impl IntoResponse {
1746 let response = state.handler.handle_create_site(request);
1747 wrap_response(response)
1748}
1749
1750async fn get_site_handler(
1752 State(state): State<AdminState>,
1753 Path(hostname): Path<String>,
1754) -> impl IntoResponse {
1755 let response = state.handler.handle_get_site(&hostname);
1756 wrap_response(response)
1757}
1758
1759async fn update_site_handler(
1761 State(state): State<AdminState>,
1762 Path(hostname): Path<String>,
1763 Json(request): Json<UpdateSiteRequest>,
1764) -> impl IntoResponse {
1765 let response = state.handler.handle_update_site(&hostname, request);
1766 wrap_response(response)
1767}
1768
1769async fn delete_site_handler(
1771 State(state): State<AdminState>,
1772 Path(hostname): Path<String>,
1773) -> impl IntoResponse {
1774 let response = state.handler.handle_delete_site(&hostname);
1775 wrap_response(response)
1776}
1777
1778async fn update_site_waf_handler(
1780 State(state): State<AdminState>,
1781 Path(hostname): Path<String>,
1782 Json(request): Json<SiteWafRequest>,
1783) -> impl IntoResponse {
1784 let response = state.handler.handle_update_site_waf(&hostname, request);
1785 wrap_response(response)
1786}
1787
1788async fn update_site_rate_limit_handler(
1790 State(state): State<AdminState>,
1791 Path(hostname): Path<String>,
1792 Json(request): Json<RateLimitRequest>,
1793) -> impl IntoResponse {
1794 let response = state
1795 .handler
1796 .handle_update_site_rate_limit(&hostname, request);
1797 wrap_response(response)
1798}
1799
1800async fn update_site_access_list_handler(
1802 State(state): State<AdminState>,
1803 Path(hostname): Path<String>,
1804 Json(request): Json<AccessListRequest>,
1805) -> impl IntoResponse {
1806 let response = state
1807 .handler
1808 .handle_update_site_access_list(&hostname, request);
1809 wrap_response(response)
1810}
1811
1812#[derive(serde::Serialize)]
1818struct ShadowStatusResponse {
1819 enabled: bool,
1820 sites_with_shadow: usize,
1821 total_mirrored: u64,
1822 total_rate_limited: u64,
1823 total_failed: u64,
1824}
1825
1826#[derive(serde::Deserialize)]
1828struct ShadowConfigRequest {
1829 enabled: Option<bool>,
1830 min_risk_score: Option<f32>,
1831 max_risk_score: Option<f32>,
1832 honeypot_urls: Option<Vec<String>>,
1833 sampling_rate: Option<f32>,
1834 per_ip_rate_limit: Option<u32>,
1835 timeout_secs: Option<u64>,
1836 include_body: Option<bool>,
1837 max_body_size: Option<usize>,
1838}
1839
1840async fn sensor_shadow_status_handler(State(state): State<AdminState>) -> impl IntoResponse {
1842 let sites_with_shadow = if let Some(config_mgr) = state.handler.config_manager() {
1844 let hostnames = config_mgr.list_sites();
1845 hostnames
1846 .iter()
1847 .filter(|hostname| {
1848 config_mgr
1849 .get_site(hostname)
1850 .ok()
1851 .and_then(|site| site.shadow_mirror)
1852 .map(|sm| sm.enabled)
1853 .unwrap_or(false)
1854 })
1855 .count()
1856 } else {
1857 0
1858 };
1859
1860 let metrics = state.handler.metrics();
1861 let response = ShadowStatusResponse {
1862 enabled: sites_with_shadow > 0,
1863 sites_with_shadow,
1864 total_mirrored: metrics.shadow_mirrored_total(),
1865 total_rate_limited: metrics.shadow_rate_limited_total(),
1866 total_failed: metrics.shadow_failed_total(),
1867 };
1868
1869 (
1870 StatusCode::OK,
1871 Json(serde_json::json!({
1872 "success": true,
1873 "data": response
1874 })),
1875 )
1876}
1877
1878async fn get_site_shadow_handler(
1880 State(state): State<AdminState>,
1881 Path(hostname): Path<String>,
1882) -> impl IntoResponse {
1883 if let Some(config_mgr) = state.handler.config_manager() {
1884 if let Ok(site) = config_mgr.get_site(&hostname) {
1885 let shadow_config = site.shadow_mirror.clone();
1886 return (
1887 StatusCode::OK,
1888 Json(serde_json::json!({
1889 "success": true,
1890 "data": {
1891 "hostname": hostname,
1892 "shadow_mirror": shadow_config
1893 }
1894 })),
1895 )
1896 .into_response();
1897 }
1898 }
1899
1900 not_found_error("Site", &hostname)
1901}
1902
1903async fn update_site_shadow_handler(
1905 State(state): State<AdminState>,
1906 Path(hostname): Path<String>,
1907 Json(request): Json<ShadowConfigRequest>,
1908) -> impl IntoResponse {
1909 use crate::config_manager::UpdateSiteRequest;
1910
1911 if let Some(config_mgr) = state.handler.config_manager() {
1912 let existing_shadow = match config_mgr.get_site(&hostname) {
1914 Ok(site) => site.shadow_mirror,
1915 Err(_) => {
1916 return not_found_error("Site", &hostname);
1917 }
1918 };
1919
1920 let mut shadow_config = existing_shadow.unwrap_or_default();
1922
1923 if let Some(enabled) = request.enabled {
1925 shadow_config.enabled = enabled;
1926 }
1927 if let Some(min) = request.min_risk_score {
1928 shadow_config.min_risk_score = min;
1929 }
1930 if let Some(max) = request.max_risk_score {
1931 shadow_config.max_risk_score = max;
1932 }
1933 if let Some(urls) = request.honeypot_urls {
1934 shadow_config.honeypot_urls = urls;
1935 }
1936 if let Some(rate) = request.sampling_rate {
1937 shadow_config.sampling_rate = rate;
1938 }
1939 if let Some(limit) = request.per_ip_rate_limit {
1940 shadow_config.per_ip_rate_limit = limit;
1941 }
1942 if let Some(timeout) = request.timeout_secs {
1943 shadow_config.timeout_secs = timeout;
1944 }
1945 if let Some(include) = request.include_body {
1946 shadow_config.include_body = include;
1947 }
1948 if let Some(max_size) = request.max_body_size {
1949 shadow_config.max_body_size = max_size;
1950 }
1951
1952 if let Err(e) = shadow_config.validate() {
1954 return validation_error("Invalid shadow mirror configuration", Some(&e));
1956 }
1957
1958 let update_request = UpdateSiteRequest {
1960 shadow_mirror: Some(shadow_config.clone()),
1961 ..Default::default()
1962 };
1963
1964 if let Err(e) = config_mgr.update_site(&hostname, update_request) {
1966 return internal_error("Failed to update site configuration", Some(&e));
1968 }
1969
1970 return (
1971 StatusCode::OK,
1972 Json(serde_json::json!({
1973 "success": true,
1974 "data": {
1975 "hostname": hostname,
1976 "shadow_mirror": shadow_config
1977 }
1978 })),
1979 )
1980 .into_response();
1981 }
1982
1983 service_unavailable("Configuration service")
1985}
1986
1987async fn stats_handler(State(state): State<AdminState>) -> impl IntoResponse {
1989 let response = state.handler.handle_stats();
1990 wrap_response(response)
1991}
1992
1993async fn waf_stats_handler(State(state): State<AdminState>) -> impl IntoResponse {
1995 let response = state.handler.handle_waf_stats();
1996 wrap_response(response)
1997}
1998
1999async fn sensor_demo_get_handler() -> impl IntoResponse {
2005 let is_demo = is_demo_mode();
2006 (
2007 StatusCode::OK,
2008 Json(serde_json::json!({ "success": true, "demo_mode": is_demo })),
2009 )
2010}
2011
2012async fn sensor_demo_toggle_handler() -> impl IntoResponse {
2014 let current = is_demo_mode();
2015 if current {
2016 DEMO_MODE.store(false, std::sync::atomic::Ordering::SeqCst);
2017 } else {
2018 enable_demo_mode();
2019 }
2020 let new_mode = is_demo_mode();
2021 info!("Demo mode toggled to: {}", new_mode);
2022 (
2023 StatusCode::OK,
2024 Json(serde_json::json!({ "success": true, "demo_mode": new_mode })),
2025 )
2026}
2027
2028async fn sensor_access_lists_handler(State(state): State<AdminState>) -> impl IntoResponse {
2030 let access_lists = state.handler.access_lists();
2031 let lock = access_lists.read();
2032 let _global = lock.global_list();
2033
2034 let mut allow = Vec::new();
2035 let mut deny = Vec::new();
2036
2037 if is_demo_mode() {
2041 allow.push("192.168.1.0/24".to_string());
2042 deny.push("10.0.0.0/8".to_string());
2043 }
2044
2045 (
2046 StatusCode::OK,
2047 Json(serde_json::json!({
2048 "success": true,
2049 "allow": allow,
2050 "deny": deny
2051 })),
2052 )
2053}
2054
2055async fn sensor_certificates_handler() -> impl IntoResponse {
2057 let mut certs = Vec::new();
2058 if is_demo_mode() {
2059 certs.push(serde_json::json!({
2060 "id": "cert-1",
2061 "name": "Acme wildcard",
2062 "domains": ["*.acme-corp.com"],
2063 "expiresAt": "2027-01-01T00:00:00Z",
2064 "certificatePath": "/etc/ssl/acme.crt",
2065 "keyPath": "/etc/ssl/acme.key"
2066 }));
2067 }
2068 (
2069 StatusCode::OK,
2070 Json(serde_json::json!({ "success": true, "certificates": certs })),
2071 )
2072}
2073
2074async fn sensor_bot_indicators_handler() -> impl IntoResponse {
2076 let indicators = serde_json::json!({
2078 "noJsExecution": 12,
2079 "consistentTiming": 45,
2080 "rapidRequests": 89,
2081 "fingerprintAnomaly": 5,
2082 "missingHeaders": 234,
2083 "suspiciousUserAgent": 67,
2084 "automatedBehavior": 156,
2085 "sessionAnomaly": 8
2086 });
2087 (
2088 StatusCode::OK,
2089 Json(serde_json::json!({ "success": true, "indicators": indicators })),
2090 )
2091}
2092
2093async fn sensor_header_profiles_handler() -> impl IntoResponse {
2095 let stats = serde_json::json!({
2097 "endpointsProfiled": 142,
2098 "anomaliesLast24h": 12,
2099 "topAnomalyTypes": {
2100 "injection_suspected": 5,
2101 "missing_required_header": 3,
2102 "unexpected_header": 2,
2103 "length_anomaly": 2
2104 }
2105 });
2106 (
2107 StatusCode::OK,
2108 Json(serde_json::json!({ "success": true, "data": stats })),
2109 )
2110}
2111
2112async fn sensor_status_handler(State(state): State<AdminState>) -> impl IntoResponse {
2115 if is_demo_mode() {
2117 return (StatusCode::OK, Json(demo_status()));
2118 }
2119
2120 let health = state.handler.handle_health();
2121 let stats = state.handler.handle_stats();
2122 let waf = state.handler.handle_waf_stats();
2123
2124 let dlp_status = state.handler.dlp_scanner().map(|scanner| {
2126 let scanner_stats = scanner.stats();
2127 serde_json::json!({
2128 "enabled": scanner.is_enabled(),
2129 "healthy": true, "totalScans": scanner_stats.total_scans,
2131 "totalMatches": scanner_stats.total_matches,
2132 "patternCount": scanner.pattern_count()
2133 })
2134 });
2135
2136 let response = serde_json::json!({
2138 "status": "running",
2139 "sensorId": "synapse-pingora",
2140 "mode": "proxy",
2141 "uptime": stats.data.as_ref().map(|s| s.uptime_secs).unwrap_or(0),
2142 "requestRate": 0,
2143 "blockRate": waf.data.as_ref().map(|w| w.block_rate_percent).unwrap_or(0.0),
2144 "fallbackRate": 0,
2145 "waf": health.data.as_ref().map(|h| {
2146 serde_json::json!({
2147 "enabled": h.waf.enabled,
2148 "analyzed": h.waf.analyzed,
2149 "blocked": h.waf.blocked
2150 })
2151 }),
2152 "dlp": dlp_status, "proxy": {
2154 "type": "pingora",
2155 "version": env!("CARGO_PKG_VERSION")
2156 }
2157 });
2158
2159 (StatusCode::OK, Json(response))
2160}
2161
2162async fn sensor_config_handler(State(state): State<AdminState>) -> impl IntoResponse {
2165 let sites = state.handler.handle_list_sites();
2166
2167 let response = serde_json::json!({
2168 "success": true,
2169 "data": {
2170 "general": {
2171 "port": 6190,
2172 "sensorId": "synapse-pingora",
2173 "sensorMode": "proxy"
2174 },
2175 "features": {
2176 "atlasCrewMode": false,
2177 "waf": true,
2178 "rateLimit": true,
2179 "accessLists": true
2180 },
2181 "sites": sites.data.map(|s| s.sites).unwrap_or_default()
2182 }
2183 });
2184
2185 (StatusCode::OK, Json(response))
2186}
2187
2188#[derive(Debug, Deserialize)]
2190struct EntitiesQuery {
2191 limit: Option<usize>,
2192}
2193
2194async fn sensor_entities_handler(
2196 Query(params): Query<EntitiesQuery>,
2197 State(state): State<AdminState>,
2198) -> impl IntoResponse {
2199 if is_demo_mode() {
2201 return (StatusCode::OK, Json(demo_entities()));
2202 }
2203
2204 let limit = params.limit.unwrap_or(100);
2205 let entities = state.handler.handle_list_entities(limit);
2206 (
2207 StatusCode::OK,
2208 Json(serde_json::json!({ "entities": entities })),
2209 )
2210}
2211
2212async fn sensor_release_entity_handler(
2214 Path(ip): Path<String>,
2215 State(state): State<AdminState>,
2216) -> impl IntoResponse {
2217 if let Some(entity_manager) = state.handler.entity_manager() {
2218 let released = entity_manager.release_entity(&ip);
2219 if released {
2220 info!("Released entity: {}", ip);
2221 (
2222 StatusCode::OK,
2223 Json(serde_json::json!({
2224 "success": true,
2225 "message": format!("Entity {} released", ip)
2226 })),
2227 )
2228 .into_response()
2229 } else {
2230 not_found_error("Entity", &ip)
2231 }
2232 } else {
2233 service_unavailable("Entity tracking")
2234 }
2235}
2236
2237async fn sensor_release_all_handler(State(state): State<AdminState>) -> impl IntoResponse {
2239 if let Some(entity_manager) = state.handler.entity_manager() {
2240 let count = entity_manager.release_all();
2241 info!("Released {} entities", count);
2242 (
2243 StatusCode::OK,
2244 Json(serde_json::json!({
2245 "success": true,
2246 "released": count,
2247 "message": format!("Released {} entities", count)
2248 })),
2249 )
2250 .into_response()
2251 } else {
2252 service_unavailable("Entity tracking")
2253 }
2254}
2255
2256async fn sensor_metrics_reset_handler(State(state): State<AdminState>) -> impl IntoResponse {
2258 let metrics = state.handler.metrics();
2259 metrics.reset();
2260 info!("Metrics reset");
2261 (
2262 StatusCode::OK,
2263 Json(serde_json::json!({
2264 "success": true,
2265 "message": "All metrics reset to zero"
2266 })),
2267 )
2268}
2269
2270#[derive(Debug, Deserialize)]
2272struct BlocksQuery {
2273 limit: Option<usize>,
2274}
2275
2276#[derive(Debug, Deserialize)]
2278struct RulesQuery {
2279 enabled: Option<bool>,
2280 #[serde(rename = "type")]
2281 rule_type: Option<String>,
2282 limit: Option<usize>,
2283 offset: Option<usize>,
2284}
2285
2286fn default_rule_enabled() -> bool {
2287 true
2288}
2289
2290fn default_rule_priority() -> u32 {
2291 100
2292}
2293
2294#[derive(Debug, Deserialize)]
2295struct RuleCreateRequest {
2296 name: String,
2297 #[serde(rename = "type")]
2298 rule_type: String,
2299 #[serde(default = "default_rule_enabled")]
2300 enabled: bool,
2301 #[serde(default = "default_rule_priority")]
2302 priority: u32,
2303 #[serde(default)]
2304 conditions: Vec<CustomRuleCondition>,
2305 #[serde(default)]
2306 actions: Vec<CustomRuleAction>,
2307 #[serde(default)]
2308 ttl: Option<u64>,
2309}
2310
2311#[derive(Debug, Deserialize, Default)]
2312struct RuleUpdateRequest {
2313 name: Option<String>,
2314 #[serde(rename = "type")]
2315 rule_type: Option<String>,
2316 enabled: Option<bool>,
2317 priority: Option<u32>,
2318 conditions: Option<Vec<CustomRuleCondition>>,
2319 actions: Option<Vec<CustomRuleAction>>,
2320 ttl: Option<u64>,
2321}
2322
2323#[derive(Debug, Deserialize)]
2325struct SignalsQuery {
2326 limit: Option<usize>,
2327 category: Option<String>,
2328 since_ms: Option<u64>,
2329}
2330
2331async fn sensor_blocks_handler(
2333 Query(params): Query<BlocksQuery>,
2334 State(state): State<AdminState>,
2335) -> impl IntoResponse {
2336 let limit = params.limit.unwrap_or(100);
2337 let blocks = state.handler.handle_list_blocks(limit);
2338 (
2339 StatusCode::OK,
2340 Json(serde_json::json!({ "blocks": blocks })),
2341 )
2342}
2343
2344async fn sensor_rules_handler(
2346 Query(params): Query<RulesQuery>,
2347 State(state): State<AdminState>,
2348) -> Response {
2349 let Some(config_mgr) = state.handler.config_manager() else {
2350 return service_unavailable("ConfigManager");
2351 };
2352
2353 let rules = config_mgr.list_rules();
2354 let mut views: Vec<RuleView> = rules.iter().map(RuleView::from_stored).collect();
2355
2356 if let Some(enabled) = params.enabled {
2357 views.retain(|rule| rule.enabled == enabled);
2358 }
2359 if let Some(rule_type) = params.rule_type {
2360 let target = rule_type.to_ascii_uppercase();
2361 views.retain(|rule| rule.rule_type.eq_ignore_ascii_case(&target));
2362 }
2363
2364 let total = views.len();
2365 let offset = params.offset.unwrap_or(0);
2366 let limit = params.limit.unwrap_or(100);
2367 let rules_page: Vec<RuleView> = views.into_iter().skip(offset).take(limit).collect();
2368
2369 (
2370 StatusCode::OK,
2371 Json(serde_json::json!({
2372 "rules": rules_page,
2373 "total": total
2374 })),
2375 )
2376 .into_response()
2377}
2378
2379async fn sensor_rules_create_handler(
2381 State(state): State<AdminState>,
2382 Json(payload): Json<RuleCreateRequest>,
2383) -> impl IntoResponse {
2384 if payload.conditions.is_empty() {
2385 return validation_error("Rule must include at least one condition", None);
2386 }
2387 if payload.actions.is_empty() {
2388 return validation_error("Rule must include at least one action", None);
2389 }
2390
2391 let Some(config_mgr) = state.handler.config_manager() else {
2392 return service_unavailable("ConfigManager");
2393 };
2394
2395 let rule_id = format!("rule_{}", uuid::Uuid::new_v4());
2396 let custom = CustomRuleInput {
2397 id: rule_id,
2398 name: payload.name,
2399 rule_type: payload.rule_type.to_ascii_uppercase(),
2400 enabled: payload.enabled,
2401 priority: payload.priority,
2402 conditions: payload.conditions,
2403 actions: payload.actions,
2404 ttl: payload.ttl,
2405 };
2406
2407 let stored = match StoredRule::from_custom(custom) {
2408 Ok(rule) => rule,
2409 Err(err) => return validation_error("Invalid rule definition", Some(&err)),
2410 };
2411
2412 match config_mgr.create_rule(stored) {
2413 Ok(created) => (StatusCode::CREATED, Json(RuleView::from_stored(&created))).into_response(),
2414 Err(ConfigManagerError::RuleExists(_)) => validation_error("Rule already exists", None),
2415 Err(err) => internal_error("Failed to create rule", Some(&err)),
2416 }
2417}
2418
2419async fn sensor_rules_update_handler(
2421 Path(rule_id): Path<String>,
2422 State(state): State<AdminState>,
2423 Json(payload): Json<RuleUpdateRequest>,
2424) -> impl IntoResponse {
2425 let Some(config_mgr) = state.handler.config_manager() else {
2426 return service_unavailable("ConfigManager");
2427 };
2428
2429 let update = CustomRuleUpdate {
2430 name: payload.name,
2431 rule_type: payload.rule_type.map(|value| value.to_ascii_uppercase()),
2432 enabled: payload.enabled,
2433 priority: payload.priority,
2434 conditions: payload.conditions,
2435 actions: payload.actions,
2436 ttl: payload.ttl,
2437 };
2438
2439 match config_mgr.update_rule(&rule_id, update) {
2440 Ok(updated) => (StatusCode::OK, Json(RuleView::from_stored(&updated))).into_response(),
2441 Err(ConfigManagerError::RuleNotFound(_)) => not_found_error("Rule", &rule_id),
2442 Err(err) => internal_error("Failed to update rule", Some(&err)),
2443 }
2444}
2445
2446async fn sensor_rules_delete_handler(
2448 Path(rule_id): Path<String>,
2449 State(state): State<AdminState>,
2450) -> impl IntoResponse {
2451 let Some(config_mgr) = state.handler.config_manager() else {
2452 return service_unavailable("ConfigManager");
2453 };
2454
2455 match config_mgr.delete_rule(&rule_id) {
2456 Ok(()) => (
2457 StatusCode::OK,
2458 Json(serde_json::json!({
2459 "success": true,
2460 "message": "Rule deleted"
2461 })),
2462 )
2463 .into_response(),
2464 Err(ConfigManagerError::RuleNotFound(_)) => not_found_error("Rule", &rule_id),
2465 Err(err) => internal_error("Failed to delete rule", Some(&err)),
2466 }
2467}
2468
2469async fn sensor_trends_handler(State(state): State<AdminState>) -> impl IntoResponse {
2471 let response = state.handler.handle_trends_summary();
2472 if response.success {
2473 if let Some(data) = response.data {
2474 return (
2475 StatusCode::OK,
2476 Json(serde_json::json!({
2477 "signalCounts": data.signal_counts,
2478 "totalSignals": data.total_signals,
2479 "topSignals": data.top_signal_types,
2480 "timeRange": data.time_range,
2481 "anomalyCount": data.anomaly_count
2482 })),
2483 );
2484 }
2485 }
2486 if let Some(err) = response.error {
2488 log::warn!("TrendsManager not available: {}", err);
2489 }
2490 (
2491 StatusCode::OK,
2492 Json(serde_json::json!({
2493 "signalCounts": {},
2494 "timeline": [],
2495 "topSignals": []
2496 })),
2497 )
2498}
2499
2500async fn sensor_signals_handler(
2502 Query(params): Query<SignalsQuery>,
2503 State(state): State<AdminState>,
2504) -> impl IntoResponse {
2505 let limit = params.limit.unwrap_or(100).min(500);
2506
2507 let category = params
2508 .category
2509 .as_deref()
2510 .and_then(|raw| match raw.to_lowercase().as_str() {
2511 "attack" => Some(SignalCategory::Attack),
2512 "anomaly" => Some(SignalCategory::Anomaly),
2513 "behavior" => Some(SignalCategory::Behavior),
2514 "intelligence" => Some(SignalCategory::Intelligence),
2515 _ => None,
2516 });
2517
2518 let response = state.handler.handle_signals(SignalQueryOptions {
2519 category,
2520 limit: Some(limit),
2521 since_ms: params.since_ms,
2522 });
2523
2524 if response.success {
2525 if let Some(data) = response.data {
2526 return (
2527 StatusCode::OK,
2528 Json(serde_json::json!({
2529 "signals": data.signals,
2530 "summary": data.summary
2531 })),
2532 );
2533 }
2534 }
2535
2536 if let Some(err) = response.error {
2537 log::warn!("SignalManager not available: {}", err);
2538 }
2539
2540 (
2541 StatusCode::OK,
2542 Json(serde_json::json!({
2543 "signals": [],
2544 "summary": {
2545 "total_signals": 0,
2546 "by_category": {},
2547 "top_signal_types": []
2548 }
2549 })),
2550 )
2551}
2552
2553#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
2558pub struct ApparatusReport {
2559 #[serde(rename = "sensorId")]
2561 pub sensor_id: String,
2562 pub timestamp: String,
2564 pub version: Option<String>,
2566 pub actor: ApparatusActor,
2568 pub signal: ApparatusSignal,
2570 pub request: Option<ApparatusRequest>,
2572}
2573
2574#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
2576pub struct ApparatusActor {
2577 pub ip: String,
2579 pub fingerprint: Option<String>,
2581 #[serde(rename = "sessionId")]
2583 pub session_id: Option<String>,
2584}
2585
2586#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
2588pub struct ApparatusSignal {
2589 #[serde(rename = "type")]
2592 pub signal_type: String,
2593 pub severity: String,
2595 pub details: serde_json::Value,
2597}
2598
2599#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
2601pub struct ApparatusRequest {
2602 pub method: String,
2604 pub path: String,
2606 pub headers: Option<HashMap<String, String>>,
2608}
2609
2610impl ApparatusReport {
2611 pub fn validate(&self) -> Result<(), String> {
2612 if self.sensor_id.is_empty() || self.sensor_id.len() > 64 {
2614 return Err("Invalid sensorId length".to_string());
2615 }
2616 if !self
2617 .sensor_id
2618 .chars()
2619 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
2620 {
2621 return Err("Invalid characters in sensorId".to_string());
2622 }
2623
2624 if self.timestamp.len() > 64 {
2626 return Err("Invalid timestamp length".to_string());
2627 }
2628
2629 self.actor.validate()?;
2631
2632 self.signal.validate()?;
2634
2635 if let Some(ref req) = self.request {
2637 req.validate()?;
2638 }
2639
2640 Ok(())
2641 }
2642}
2643
2644impl ApparatusActor {
2645 pub fn validate(&self) -> Result<(), String> {
2646 if self.ip.parse::<IpAddr>().is_err() {
2648 return Err(format!("Invalid IP address: {}", self.ip));
2649 }
2650
2651 if let Some(ref fp) = self.fingerprint {
2653 if fp.len() > 256 {
2654 return Err("Fingerprint too long".to_string());
2655 }
2656 }
2657
2658 if let Some(ref sid) = self.session_id {
2660 if sid.len() > 128 {
2661 return Err("SessionId too long".to_string());
2662 }
2663 }
2664
2665 Ok(())
2666 }
2667}
2668
2669impl ApparatusSignal {
2670 pub fn validate(&self) -> Result<(), String> {
2671 if self.signal_type.is_empty() || self.signal_type.len() > 64 {
2673 return Err("Invalid signal type length".to_string());
2674 }
2675 if !self
2676 .signal_type
2677 .chars()
2678 .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
2679 {
2680 return Err("Invalid characters in signal type".to_string());
2681 }
2682
2683 match self.severity.to_lowercase().as_str() {
2685 "low" | "medium" | "high" | "critical" => {}
2686 _ => return Err(format!("Invalid severity: {}", self.severity)),
2687 }
2688
2689 let serialized = serde_json::to_string(&self.details).unwrap_or_default();
2692 if serialized.len() > 1024 * 10 {
2693 return Err("Details payload too large".to_string());
2695 }
2696
2697 Ok(())
2698 }
2699}
2700
2701impl ApparatusRequest {
2702 pub fn validate(&self) -> Result<(), String> {
2703 if self.method.len() > 10 {
2704 return Err("Invalid HTTP method length".to_string());
2705 }
2706 if self.path.len() > 2048 {
2707 return Err("Path too long".to_string());
2708 }
2709 if let Some(ref headers) = self.headers {
2711 if headers.len() > 50 {
2712 return Err("Too many headers".to_string());
2713 }
2714 }
2715 Ok(())
2716 }
2717}
2718
2719#[cfg(test)]
2720thread_local! {
2721 static FORCE_METADATA_SERIALIZE_ERROR: std::cell::Cell<bool> = std::cell::Cell::new(false);
2722}
2723
2724fn serialize_report_metadata(report: &ApparatusReport) -> serde_json::Value {
2725 #[cfg(test)]
2726 if FORCE_METADATA_SERIALIZE_ERROR.with(|flag| flag.get()) {
2727 warn!(
2728 sensor_id = %report.sensor_id,
2729 "Forced metadata serialization error for testing"
2730 );
2731 return serde_json::json!({});
2732 }
2733
2734 serde_json::to_value(report).unwrap_or_else(|err| {
2735 warn!(
2736 sensor_id = %report.sensor_id,
2737 error = %err,
2738 "Failed to serialize external report metadata"
2739 );
2740 serde_json::json!({})
2741 })
2742}
2743
2744#[tracing::instrument(skip(state, report), fields(sensor_id = %report.sensor_id, signal_type = %report.signal.signal_type))]
2762async fn sensor_report_handler(
2763 State(state): State<AdminState>,
2764 Json(report): Json<ApparatusReport>,
2765) -> Response {
2766 use crate::horizon::ThreatSignal;
2767
2768 if let Err(err) = report.validate() {
2770 warn!(
2771 sensor_id = %report.sensor_id,
2772 error = %err,
2773 "Validation failed for external threat report"
2774 );
2775 return validation_error(&err, None);
2776 }
2777
2778 let signal_type_raw = report.signal.signal_type.trim().to_lowercase();
2779 if !state
2780 .signal_permissions
2781 .is_allowed(&report.sensor_id, &signal_type_raw)
2782 {
2783 warn!(
2784 sensor_id = %report.sensor_id,
2785 signal_type = %signal_type_raw,
2786 "Rejected external signal type"
2787 );
2788 return forbidden_error("Signal type not permitted for sensor");
2789 }
2790
2791 if let Err(not_until) = state.report_rate_limiter.check_key(&report.sensor_id) {
2793 let retry_after = not_until.wait_time_from(governor::clock::Clock::now(
2794 &governor::clock::DefaultClock::default(),
2795 ));
2796 warn!(
2797 sensor_id = %report.sensor_id,
2798 "Signal ingestion rate limit exceeded"
2799 );
2800 return rate_limit_error(retry_after.as_secs().max(1));
2801 }
2802
2803 let signal_type = SignalAdapter::map_type(&report);
2805 let severity = SignalAdapter::map_severity(&report.signal.severity);
2806
2807 let mut signal = ThreatSignal::new(signal_type, severity)
2809 .with_source_ip(&report.actor.ip)
2810 .with_confidence(1.0); if let Some(ref fp) = report.actor.fingerprint {
2813 signal = signal.with_fingerprint(fp);
2814 }
2815
2816 let metadata = serialize_report_metadata(&report);
2818 signal = signal.with_metadata(metadata.clone());
2819
2820 match state
2822 .handler
2823 .signal_dispatcher()
2824 .dispatch(
2825 signal,
2826 SignalCategory::Intelligence,
2827 &report.sensor_id,
2828 Some(format!("External threat reported by {}", report.sensor_id)),
2829 metadata,
2830 )
2831 .await
2832 {
2833 Ok(_) => (
2834 StatusCode::OK,
2835 Json(serde_json::json!({
2836 "success": true,
2837 "id": uuid::Uuid::new_v4().to_string()
2838 })),
2839 )
2840 .into_response(),
2841 Err(err) => {
2842 warn!(
2843 sensor_id = %report.sensor_id,
2844 error = %err,
2845 "Signal dispatch failed"
2846 );
2847 (
2848 StatusCode::SERVICE_UNAVAILABLE,
2849 Json(serde_json::json!({
2850 "success": false,
2851 "error": err
2852 })),
2853 )
2854 .into_response()
2855 }
2856 }
2857}
2858
2859async fn sensor_anomalies_handler(State(state): State<AdminState>) -> impl IntoResponse {
2861 let response = state.handler.handle_trends_anomalies(50);
2862 if response.success {
2863 if let Some(anomalies) = response.data {
2864 let data: Vec<serde_json::Value> = anomalies
2865 .into_iter()
2866 .map(|a| {
2867 serde_json::json!({
2868 "id": format!("anom-{}", &a.detected_at_ms.to_string()[..6]),
2869 "type": a.anomaly_type.to_lowercase().replace("_", "-"),
2870 "severity": a.severity.to_lowercase(),
2871 "description": a.description,
2872 "entityId": a.entities.first().unwrap_or(&"unknown".to_string()),
2873 "riskApplied": 0, "timestamp": chrono::DateTime::from_timestamp_millis(a.detected_at_ms)
2875 .map(|dt| dt.to_rfc3339())
2876 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339())
2877 })
2878 })
2879 .collect();
2880 return (StatusCode::OK, Json(serde_json::json!({ "data": data })));
2881 }
2882 }
2883 if let Some(err) = response.error {
2885 log::warn!("TrendsManager not available for anomalies: {}", err);
2886 }
2887 (StatusCode::OK, Json(serde_json::json!({ "data": [] })))
2888}
2889
2890async fn sensor_campaigns_handler(State(state): State<AdminState>) -> impl IntoResponse {
2892 if is_demo_mode() {
2894 return (StatusCode::OK, Json(demo_campaigns()));
2895 }
2896
2897 let campaigns = match state.handler.campaign_manager() {
2898 Some(manager) => manager
2899 .get_campaigns()
2900 .into_iter()
2901 .map(|c| {
2902 serde_json::json!({
2903 "id": c.id,
2904 "status": format!("{:?}", c.status).to_lowercase(),
2905 "actorCount": c.actor_count,
2906 "confidence": (c.confidence * 100.0) as u8,
2907 "attackTypes": c.correlation_reasons.iter()
2908 .map(|r| format!("{:?}", r.correlation_type).to_lowercase())
2909 .collect::<Vec<_>>(),
2910 "firstSeen": c.first_seen.to_rfc3339(),
2911 "lastActivity": c.last_activity.to_rfc3339(),
2912 "totalRequests": c.total_requests,
2913 "blockedRequests": c.blocked_requests,
2914 "rulesTriggered": c.rules_triggered,
2915 "riskScore": c.risk_score as u8
2916 })
2917 })
2918 .collect::<Vec<_>>(),
2919 None => vec![],
2920 };
2921
2922 (
2923 StatusCode::OK,
2924 Json(serde_json::json!({ "data": campaigns })),
2925 )
2926}
2927
2928async fn sensor_campaign_detail_handler(
2930 State(state): State<AdminState>,
2931 Path(id): Path<String>,
2932) -> impl IntoResponse {
2933 match state.handler.campaign_manager() {
2934 Some(manager) => match manager.get_campaign(&id) {
2935 Some(c) => {
2936 let data = serde_json::json!({
2937 "id": c.id,
2938 "status": format!("{:?}", c.status).to_lowercase(),
2939 "actorCount": c.actor_count,
2940 "confidence": (c.confidence * 100.0) as u8,
2941 "attackTypes": c.correlation_reasons.iter()
2942 .map(|r| format!("{:?}", r.correlation_type).to_lowercase())
2943 .collect::<Vec<_>>(),
2944 "firstSeen": c.first_seen.to_rfc3339(),
2945 "lastActivity": c.last_activity.to_rfc3339(),
2946 "totalRequests": c.total_requests,
2947 "blockedRequests": c.blocked_requests,
2948 "rulesTriggered": c.rules_triggered,
2949 "riskScore": c.risk_score as u8,
2950 "correlationReasons": c.correlation_reasons.iter().map(|r| {
2951 serde_json::json!({
2952 "type": format!("{:?}", r.correlation_type).to_lowercase(),
2953 "confidence": (r.confidence * 100.0) as u8,
2954 "description": r.description
2955 })
2956 }).collect::<Vec<_>>()
2957 });
2958 (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response()
2959 }
2960 None => not_found_error("Campaign", &id),
2961 },
2962 None => service_unavailable("Campaign correlation"),
2963 }
2964}
2965
2966#[allow(dead_code)]
2968async fn _sensor_campaign_detail_handler_mock(Path(id): Path<String>) -> impl IntoResponse {
2969 let now = chrono::Utc::now();
2970
2971 let campaign_data = match id.as_str() {
2973 "camp-001" => Some(serde_json::json!({
2974 "id": "camp-001",
2975 "status": "active",
2976 "actorCount": 12,
2977 "confidence": 87,
2978 "attackTypes": ["credential_stuffing", "rate_abuse"],
2979 "firstSeen": (now - chrono::Duration::hours(4)).to_rfc3339(),
2980 "lastActivity": (now - chrono::Duration::minutes(8)).to_rfc3339(),
2981 "totalRequests": 2450,
2982 "blockedRequests": 1890,
2983 "rulesTriggered": 156,
2984 "riskScore": 78,
2985 "correlationReasons": [
2986 {
2987 "type": "shared_fingerprint",
2988 "confidence": 92,
2989 "description": "12 actors sharing identical browser fingerprint despite different IPs",
2990 "actors": ["192.168.1.100", "192.168.1.101", "192.168.1.102", "10.0.0.50", "10.0.0.51"]
2991 },
2992 {
2993 "type": "timing_correlation",
2994 "confidence": 85,
2995 "description": "Request patterns show coordinated timing within 50ms windows",
2996 "actors": ["192.168.1.100", "192.168.1.101", "192.168.1.102"]
2997 },
2998 {
2999 "type": "behavioral_similarity",
3000 "confidence": 78,
3001 "description": "Identical request sequences targeting /api/auth endpoints",
3002 "actors": ["192.168.1.100", "10.0.0.50", "10.0.0.51", "172.16.0.10"]
3003 }
3004 ]
3005 })),
3006 "camp-002" => Some(serde_json::json!({
3007 "id": "camp-002",
3008 "status": "active",
3009 "actorCount": 5,
3010 "confidence": 72,
3011 "attackTypes": ["sql_injection", "path_traversal"],
3012 "firstSeen": (now - chrono::Duration::hours(2)).to_rfc3339(),
3013 "lastActivity": (now - chrono::Duration::minutes(15)).to_rfc3339(),
3014 "totalRequests": 380,
3015 "blockedRequests": 342,
3016 "rulesTriggered": 89,
3017 "riskScore": 85,
3018 "correlationReasons": [
3019 {
3020 "type": "attack_sequence",
3021 "confidence": 88,
3022 "description": "Sequential SQLi probes followed by path traversal attempts",
3023 "actors": ["203.0.113.10", "203.0.113.11", "203.0.113.12"]
3024 },
3025 {
3026 "type": "shared_user_agent",
3027 "confidence": 65,
3028 "description": "Uncommon user agent string shared across all actors",
3029 "actors": ["203.0.113.10", "203.0.113.11", "203.0.113.12", "203.0.113.13", "203.0.113.14"]
3030 }
3031 ]
3032 })),
3033 "camp-003" => Some(serde_json::json!({
3034 "id": "camp-003",
3035 "status": "detected",
3036 "actorCount": 3,
3037 "confidence": 65,
3038 "attackTypes": ["enumeration", "scraping"],
3039 "firstSeen": (now - chrono::Duration::hours(6)).to_rfc3339(),
3040 "lastActivity": (now - chrono::Duration::minutes(45)).to_rfc3339(),
3041 "totalRequests": 8500,
3042 "blockedRequests": 2100,
3043 "rulesTriggered": 42,
3044 "riskScore": 45,
3045 "correlationReasons": [
3046 {
3047 "type": "behavioral_similarity",
3048 "confidence": 70,
3049 "description": "Systematic enumeration of /api/users/* endpoints",
3050 "actors": ["198.51.100.1", "198.51.100.2", "198.51.100.3"]
3051 }
3052 ]
3053 })),
3054 "camp-004" => Some(serde_json::json!({
3055 "id": "camp-004",
3056 "status": "resolved",
3057 "actorCount": 8,
3058 "confidence": 91,
3059 "attackTypes": ["xss", "csrf"],
3060 "firstSeen": (now - chrono::Duration::hours(12)).to_rfc3339(),
3061 "lastActivity": (now - chrono::Duration::hours(3)).to_rfc3339(),
3062 "totalRequests": 1200,
3063 "blockedRequests": 1180,
3064 "rulesTriggered": 234,
3065 "riskScore": 92,
3066 "resolvedAt": (now - chrono::Duration::hours(2)).to_rfc3339(),
3067 "resolvedReason": "All actors blocked and added to blocklist",
3068 "correlationReasons": [
3069 {
3070 "type": "shared_fingerprint",
3071 "confidence": 95,
3072 "description": "All actors using identical headless browser configuration",
3073 "actors": ["45.33.32.1", "45.33.32.2", "45.33.32.3", "45.33.32.4"]
3074 },
3075 {
3076 "type": "network_proximity",
3077 "confidence": 88,
3078 "description": "All IPs from same AS (AS12345 - Known bad actor network)",
3079 "actors": ["45.33.32.1", "45.33.32.2", "45.33.32.3", "45.33.32.4", "45.33.32.5", "45.33.32.6", "45.33.32.7", "45.33.32.8"]
3080 }
3081 ]
3082 })),
3083 _ => None,
3084 };
3085
3086 match campaign_data {
3087 Some(data) => (StatusCode::OK, Json(serde_json::json!({ "data": data }))).into_response(),
3088 None => not_found_error("Campaign", &id),
3089 }
3090}
3091
3092async fn sensor_campaign_actors_handler(
3094 State(state): State<AdminState>,
3095 Path(id): Path<String>,
3096) -> impl IntoResponse {
3097 match state.handler.campaign_manager() {
3098 Some(manager) => {
3099 let actors = manager.get_campaign_actors(&id);
3100 let actor_data: Vec<serde_json::Value> = actors
3101 .into_iter()
3102 .map(|ip| {
3103 serde_json::json!({
3104 "ip": ip.to_string(),
3105 "risk": 50, "sessionCount": 1,
3107 "fingerprintCount": 1,
3108 "jsExecuted": false,
3109 "suspicious": true,
3110 "lastActivity": chrono::Utc::now().to_rfc3339(),
3111 "joinedAt": chrono::Utc::now().to_rfc3339(),
3112 "role": "member",
3113 "requestsInCampaign": 0,
3114 "blockedInCampaign": 0
3115 })
3116 })
3117 .collect();
3118
3119 (
3120 StatusCode::OK,
3121 Json(serde_json::json!({ "actors": actor_data })),
3122 )
3123 }
3124 None => (StatusCode::OK, Json(serde_json::json!({ "actors": [] }))),
3125 }
3126}
3127
3128#[derive(Debug, Deserialize)]
3130struct GraphQuery {
3131 #[serde(default = "default_graph_limit")]
3133 limit: usize,
3134 #[serde(default)]
3136 offset: usize,
3137 #[serde(default = "default_hash_identifiers")]
3139 hash_identifiers: bool,
3140}
3141
3142fn default_graph_limit() -> usize {
3143 500
3144}
3145fn default_hash_identifiers() -> bool {
3146 true
3147}
3148
3149async fn sensor_campaign_graph_handler(
3152 State(state): State<AdminState>,
3153 Path(id): Path<String>,
3154 Query(query): Query<GraphQuery>,
3155) -> impl IntoResponse {
3156 match state.handler.campaign_manager() {
3157 Some(manager) => {
3158 let graph = manager.get_campaign_graph_paginated(
3159 &id,
3160 Some(query.limit),
3161 Some(query.offset),
3162 query.hash_identifiers,
3163 );
3164 (
3165 StatusCode::OK,
3166 Json(serde_json::json!({
3167 "data": {
3168 "nodes": graph.nodes,
3169 "edges": graph.edges
3170 },
3171 "pagination": {
3172 "total": graph.total_nodes,
3173 "limit": query.limit,
3174 "offset": query.offset,
3175 "hasMore": graph.has_more
3176 },
3177 "snapshotVersion": graph.snapshot_version
3178 })),
3179 )
3180 .into_response()
3181 }
3182 None => service_unavailable("Campaign correlation"),
3183 }
3184}
3185
3186async fn sensor_campaign_timeline_handler(Path(id): Path<String>) -> impl IntoResponse {
3188 let now = chrono::Utc::now();
3189
3190 let events: Vec<serde_json::Value> = match id.as_str() {
3192 "camp-001" => vec![
3193 serde_json::json!({
3194 "timestamp": (now - chrono::Duration::hours(4)).to_rfc3339(),
3195 "type": "actor_joined",
3196 "actorIp": "192.168.1.100",
3197 "description": "First actor detected - credential stuffing pattern identified",
3198 "risk": 45
3199 }),
3200 serde_json::json!({
3201 "timestamp": (now - chrono::Duration::hours(3) - chrono::Duration::minutes(45)).to_rfc3339(),
3202 "type": "detection",
3203 "actorIp": "192.168.1.100",
3204 "description": "Campaign correlation triggered - shared fingerprint detected",
3205 "risk": 55
3206 }),
3207 serde_json::json!({
3208 "timestamp": (now - chrono::Duration::hours(3)).to_rfc3339(),
3209 "type": "actor_joined",
3210 "actorIp": "192.168.1.101",
3211 "description": "Second actor joined - same fingerprint cluster",
3212 "risk": 62
3213 }),
3214 serde_json::json!({
3215 "timestamp": (now - chrono::Duration::hours(3)).to_rfc3339(),
3216 "type": "actor_joined",
3217 "actorIp": "192.168.1.102",
3218 "description": "Third actor joined - timing correlation confirmed",
3219 "risk": 68
3220 }),
3221 serde_json::json!({
3222 "timestamp": (now - chrono::Duration::hours(2) - chrono::Duration::minutes(30)).to_rfc3339(),
3223 "type": "escalation",
3224 "actorIp": "192.168.1.100",
3225 "description": "Campaign escalated to high priority - rate abuse detected",
3226 "risk": 75,
3227 "ruleId": 941100
3228 }),
3229 serde_json::json!({
3230 "timestamp": (now - chrono::Duration::hours(2)).to_rfc3339(),
3231 "type": "block",
3232 "actorIp": "192.168.1.100",
3233 "description": "Actor blocked - risk threshold exceeded",
3234 "risk": 85,
3235 "ruleId": 941100
3236 }),
3237 serde_json::json!({
3238 "timestamp": (now - chrono::Duration::minutes(30)).to_rfc3339(),
3239 "type": "attack",
3240 "actorIp": "10.0.0.50",
3241 "description": "Continued attack from new IP in fingerprint cluster",
3242 "risk": 72,
3243 "ruleId": 942100
3244 }),
3245 ],
3246 "camp-002" => vec![
3247 serde_json::json!({
3248 "timestamp": (now - chrono::Duration::hours(2)).to_rfc3339(),
3249 "type": "actor_joined",
3250 "actorIp": "203.0.113.10",
3251 "description": "SQL injection probe detected from new actor",
3252 "risk": 65,
3253 "ruleId": 942100
3254 }),
3255 serde_json::json!({
3256 "timestamp": (now - chrono::Duration::hours(1) - chrono::Duration::minutes(45)).to_rfc3339(),
3257 "type": "attack",
3258 "actorIp": "203.0.113.10",
3259 "description": "Path traversal attempt following SQLi probe",
3260 "risk": 78,
3261 "ruleId": 930110
3262 }),
3263 serde_json::json!({
3264 "timestamp": (now - chrono::Duration::hours(1)).to_rfc3339(),
3265 "type": "actor_joined",
3266 "actorIp": "203.0.113.11",
3267 "description": "Second actor joined - same attack sequence detected",
3268 "risk": 72
3269 }),
3270 serde_json::json!({
3271 "timestamp": (now - chrono::Duration::minutes(30)).to_rfc3339(),
3272 "type": "block",
3273 "actorIp": "203.0.113.10",
3274 "description": "Actor blocked after repeated SQLi attempts",
3275 "risk": 92,
3276 "ruleId": 942100
3277 }),
3278 ],
3279 _ => vec![],
3280 };
3281
3282 (StatusCode::OK, Json(serde_json::json!({ "data": events })))
3283}
3284
3285async fn sensor_bandwidth_handler(State(state): State<AdminState>) -> impl IntoResponse {
3287 let metrics = state.handler.metrics();
3288 let stats = metrics.get_bandwidth_stats();
3289
3290 let timeline: Vec<serde_json::Value> = stats
3292 .timeline
3293 .iter()
3294 .filter(|p| p.timestamp > 0)
3295 .map(|p| {
3296 serde_json::json!({
3297 "timestamp": p.timestamp,
3298 "bytesIn": p.bytes_in,
3299 "bytesOut": p.bytes_out,
3300 "requestCount": p.request_count
3301 })
3302 })
3303 .collect();
3304
3305 (
3306 StatusCode::OK,
3307 Json(serde_json::json!({
3308 "totalBytes": stats.total_bytes,
3309 "totalBytesIn": stats.total_bytes_in,
3310 "totalBytesOut": stats.total_bytes_out,
3311 "avgBytesPerRequest": stats.avg_bytes_per_request,
3312 "maxRequestSize": stats.max_request_size,
3313 "maxResponseSize": stats.max_response_size,
3314 "requestCount": stats.request_count,
3315 "timeline": timeline
3316 })),
3317 )
3318}
3319
3320#[derive(Debug, Deserialize)]
3322struct ActorsQuery {
3323 limit: Option<usize>,
3324 ip: Option<String>,
3325 fingerprint: Option<String>,
3326 min_risk: Option<f64>,
3327}
3328
3329#[derive(Debug, Deserialize)]
3331struct ActorTimelineQuery {
3332 limit: Option<usize>,
3333}
3334
3335fn actor_to_json(actor: &crate::actor::ActorState) -> serde_json::Value {
3336 let mut ips: Vec<String> = actor.ips.iter().map(|ip| ip.to_string()).collect();
3337 ips.sort();
3338 let mut fingerprints: Vec<String> = actor.fingerprints.iter().cloned().collect();
3339 fingerprints.sort();
3340
3341 serde_json::json!({
3342 "actorId": actor.actor_id.clone(),
3343 "riskScore": actor.risk_score,
3344 "ruleMatches": actor.rule_matches.iter().map(|rm| {
3345 serde_json::json!({
3346 "ruleId": rm.rule_id.clone(),
3347 "timestamp": rm.timestamp,
3348 "riskContribution": rm.risk_contribution,
3349 "category": rm.category.clone()
3350 })
3351 }).collect::<Vec<_>>(),
3352 "anomalyCount": actor.anomaly_count,
3353 "sessionIds": actor.session_ids.clone(),
3354 "firstSeen": actor.first_seen,
3355 "lastSeen": actor.last_seen,
3356 "ips": ips,
3357 "fingerprints": fingerprints,
3358 "isBlocked": actor.is_blocked,
3359 "blockReason": actor.block_reason.clone(),
3360 "blockedSince": actor.blocked_since
3361 })
3362}
3363
3364fn session_to_json(
3365 session: &crate::session::SessionState,
3366 full_token_hash: bool,
3367) -> serde_json::Value {
3368 let token_hash = if full_token_hash {
3369 session.token_hash.clone()
3370 } else {
3371 session.token_hash.chars().take(8).collect::<String>()
3372 };
3373
3374 serde_json::json!({
3375 "sessionId": session.session_id.clone(),
3376 "tokenHash": token_hash,
3377 "actorId": session.actor_id.clone(),
3378 "creationTime": session.creation_time,
3379 "lastActivity": session.last_activity,
3380 "requestCount": session.request_count,
3381 "boundJa4": session.bound_ja4.clone(),
3382 "boundIp": session.bound_ip.map(|ip| ip.to_string()),
3383 "isSuspicious": session.is_suspicious,
3384 "hijackAlerts": session.hijack_alerts.iter().map(|alert| {
3385 serde_json::json!({
3386 "sessionId": alert.session_id.clone(),
3387 "alertType": format!("{:?}", alert.alert_type),
3388 "originalValue": alert.original_value.clone(),
3389 "newValue": alert.new_value.clone(),
3390 "timestamp": alert.timestamp,
3391 "confidence": alert.confidence
3392 })
3393 }).collect::<Vec<_>>()
3394 })
3395}
3396
3397async fn sensor_actors_handler(
3399 Query(params): Query<ActorsQuery>,
3400 State(state): State<AdminState>,
3401) -> impl IntoResponse {
3402 let limit = params.limit.unwrap_or(100);
3403
3404 match state.handler.actor_manager() {
3405 Some(manager) => {
3406 let actors = if let Some(ip_str) = params.ip.as_deref() {
3407 match ip_str.parse::<IpAddr>() {
3408 Ok(ip) => manager.get_actor_by_ip(ip).into_iter().collect(),
3409 Err(_) => {
3410 return validation_error("Invalid IP address", None);
3411 }
3412 }
3413 } else if let Some(fp) = params.fingerprint.as_deref() {
3414 manager.get_actor_by_fingerprint(fp).into_iter().collect()
3415 } else if let Some(min_risk) = params.min_risk {
3416 manager.list_by_min_risk(min_risk, limit, 0)
3417 } else {
3418 manager.list_actors(limit, 0)
3419 };
3420 let actor_data: Vec<serde_json::Value> = actors
3421 .into_iter()
3422 .map(|actor| actor_to_json(&actor))
3423 .collect();
3424
3425 let stats = manager.stats();
3427 (StatusCode::OK, Json(serde_json::json!({
3428 "actors": actor_data,
3429 "stats": {
3430 "totalActors": stats.total_actors.load(std::sync::atomic::Ordering::Relaxed),
3431 "blockedActors": stats.blocked_actors.load(std::sync::atomic::Ordering::Relaxed),
3432 "correlationsMade": stats.correlations_made.load(std::sync::atomic::Ordering::Relaxed),
3433 "evictions": stats.evictions.load(std::sync::atomic::Ordering::Relaxed),
3434 "totalCreated": stats.total_created.load(std::sync::atomic::Ordering::Relaxed),
3435 "totalRuleMatches": stats.total_rule_matches.load(std::sync::atomic::Ordering::Relaxed)
3436 }
3437 }))).into_response()
3438 }
3439 None => (
3440 StatusCode::OK,
3441 Json(serde_json::json!({ "actors": [], "stats": null })),
3442 )
3443 .into_response(),
3444 }
3445}
3446
3447async fn sensor_actor_detail_handler(
3449 State(state): State<AdminState>,
3450 Path(actor_id): Path<String>,
3451) -> impl IntoResponse {
3452 match state.handler.actor_manager() {
3453 Some(manager) => match manager.get_actor(&actor_id) {
3454 Some(actor) => (
3455 StatusCode::OK,
3456 Json(serde_json::json!({ "actor": actor_to_json(&actor) })),
3457 )
3458 .into_response(),
3459 None => not_found_error("Actor", &actor_id),
3460 },
3461 None => service_unavailable("Actor tracking"),
3462 }
3463}
3464
3465async fn sensor_actor_timeline_handler(
3467 Query(params): Query<ActorTimelineQuery>,
3468 State(state): State<AdminState>,
3469 Path(actor_id): Path<String>,
3470) -> impl IntoResponse {
3471 let limit = params.limit.unwrap_or(100);
3472
3473 let Some(manager) = state.handler.actor_manager() else {
3474 return service_unavailable("Actor tracking");
3475 };
3476
3477 let Some(actor) = manager.get_actor(&actor_id) else {
3478 return not_found_error("Actor", &actor_id);
3479 };
3480
3481 let mut events: Vec<serde_json::Value> = Vec::new();
3482
3483 for rule in &actor.rule_matches {
3484 events.push(serde_json::json!({
3485 "timestamp": rule.timestamp,
3486 "eventType": "rule_match",
3487 "ruleId": rule.rule_id.clone(),
3488 "category": rule.category.clone(),
3489 "riskDelta": rule.risk_contribution
3490 }));
3491 }
3492
3493 if let Some(blocked_since) = actor.blocked_since {
3494 events.push(serde_json::json!({
3495 "timestamp": blocked_since,
3496 "eventType": "actor_blocked",
3497 "reason": actor.block_reason.clone(),
3498 "riskScore": actor.risk_score
3499 }));
3500 }
3501
3502 if let Some(session_manager) = state.handler.session_manager() {
3503 for session_id in &actor.session_ids {
3504 if let Some(session) = session_manager.get_session_by_id(session_id) {
3505 events.push(serde_json::json!({
3506 "timestamp": session.creation_time,
3507 "eventType": "session_bind",
3508 "sessionId": session.session_id.clone(),
3509 "actorId": session.actor_id.clone(),
3510 "boundJa4": session.bound_ja4.clone(),
3511 "boundIp": session.bound_ip.map(|ip| ip.to_string())
3512 }));
3513
3514 if session.is_suspicious {
3515 for alert in &session.hijack_alerts {
3516 events.push(serde_json::json!({
3517 "timestamp": alert.timestamp,
3518 "eventType": "session_alert",
3519 "sessionId": alert.session_id.clone(),
3520 "alertType": format!("{:?}", alert.alert_type),
3521 "confidence": alert.confidence
3522 }));
3523 }
3524 }
3525 }
3526 }
3527 }
3528
3529 if let Some(block_log) = state.handler.block_log() {
3530 let actor_ips: Vec<String> = actor.ips.iter().map(|ip| ip.to_string()).collect();
3531 let actor_fps: Vec<String> = actor.fingerprints.iter().cloned().collect();
3532 for event in block_log.recent(1000) {
3533 let ip_match = actor_ips.iter().any(|ip| ip == &event.client_ip);
3534 let fp_match = event
3535 .fingerprint
3536 .as_ref()
3537 .map(|fp| actor_fps.contains(fp))
3538 .unwrap_or(false);
3539
3540 if ip_match || fp_match {
3541 events.push(serde_json::json!({
3542 "timestamp": event.timestamp,
3543 "eventType": "block",
3544 "clientIp": event.client_ip.clone(),
3545 "method": event.method.clone(),
3546 "path": event.path.clone(),
3547 "riskScore": event.risk_score,
3548 "matchedRules": event.matched_rules.clone(),
3549 "blockReason": event.block_reason.clone(),
3550 "fingerprint": event.fingerprint.clone()
3551 }));
3552 }
3553 }
3554 }
3555
3556 events.sort_by(|a, b| {
3557 let a_ts = a.get("timestamp").and_then(|v| v.as_u64()).unwrap_or(0);
3558 let b_ts = b.get("timestamp").and_then(|v| v.as_u64()).unwrap_or(0);
3559 b_ts.cmp(&a_ts)
3560 });
3561
3562 if events.len() > limit {
3563 events.truncate(limit);
3564 }
3565
3566 (
3567 StatusCode::OK,
3568 Json(serde_json::json!({
3569 "actorId": actor_id,
3570 "events": events
3571 })),
3572 )
3573 .into_response()
3574}
3575
3576#[derive(Debug, Deserialize)]
3578struct SessionsQuery {
3579 limit: Option<usize>,
3580 actor_id: Option<String>,
3581 suspicious: Option<bool>,
3582}
3583
3584async fn sensor_sessions_handler(
3586 Query(params): Query<SessionsQuery>,
3587 State(state): State<AdminState>,
3588) -> impl IntoResponse {
3589 let limit = params.limit.unwrap_or(100);
3590
3591 match state.handler.session_manager() {
3592 Some(manager) => {
3593 let mut sessions = if let Some(actor_id) = params.actor_id.as_deref() {
3594 let mut actor_sessions = manager.get_actor_sessions(actor_id);
3595 if params.suspicious.unwrap_or(false) {
3596 actor_sessions.retain(|session| session.is_suspicious);
3597 }
3598 actor_sessions
3599 } else if params.suspicious.unwrap_or(false) {
3600 manager.list_suspicious_sessions()
3601 } else {
3602 manager.list_sessions(limit, 0)
3603 };
3604
3605 if params.actor_id.is_some() || params.suspicious.unwrap_or(false) {
3606 sessions.sort_by_key(|s| std::cmp::Reverse(s.last_activity));
3607 sessions.truncate(limit);
3608 }
3609
3610 let session_data: Vec<serde_json::Value> = sessions
3611 .into_iter()
3612 .map(|session| session_to_json(&session, false))
3613 .collect();
3614
3615 let stats = manager.stats();
3617 (
3618 StatusCode::OK,
3619 Json(serde_json::json!({
3620 "sessions": session_data,
3621 "stats": {
3622 "totalSessions": stats.total_sessions.load(std::sync::atomic::Ordering::Relaxed),
3623 "activeSessions": stats.active_sessions.load(std::sync::atomic::Ordering::Relaxed),
3624 "suspiciousSessions": stats.suspicious_sessions.load(std::sync::atomic::Ordering::Relaxed),
3625 "expiredSessions": stats.expired_sessions.load(std::sync::atomic::Ordering::Relaxed),
3626 "hijackAlerts": stats.hijack_alerts.load(std::sync::atomic::Ordering::Relaxed),
3627 "evictions": stats.evictions.load(std::sync::atomic::Ordering::Relaxed),
3628 "totalCreated": stats.total_created.load(std::sync::atomic::Ordering::Relaxed),
3629 "totalInvalidated": stats.total_invalidated.load(std::sync::atomic::Ordering::Relaxed)
3630 }
3631 })),
3632 )
3633 }
3634 None => (
3635 StatusCode::OK,
3636 Json(serde_json::json!({ "sessions": [], "stats": null })),
3637 ),
3638 }
3639}
3640
3641async fn sensor_session_detail_handler(
3643 State(state): State<AdminState>,
3644 Path(session_id): Path<String>,
3645) -> impl IntoResponse {
3646 match state.handler.session_manager() {
3647 Some(manager) => match manager.get_session_by_id(&session_id) {
3648 Some(session) => (
3649 StatusCode::OK,
3650 Json(serde_json::json!({ "session": session_to_json(&session, true) })),
3651 )
3652 .into_response(),
3653 None => not_found_error("Session", &session_id),
3654 },
3655 None => service_unavailable("Session tracking"),
3656 }
3657}
3658
3659async fn sensor_stuffing_handler() -> impl IntoResponse {
3661 let now = chrono::Utc::now().timestamp_millis() as u64;
3662
3663 let stats = serde_json::json!({
3665 "entityCount": 156,
3666 "distributedAttackCount": 3,
3667 "takeoverAlertCount": 5,
3668 "eventCount": 42,
3669 "totalFailures": 1847,
3670 "totalSuccesses": 12453,
3671 "suspiciousEntities": 28
3672 });
3673
3674 let takeover_alerts = vec![
3676 serde_json::json!({
3677 "entityId": "192.168.1.105",
3678 "endpoint": "/api/auth/login",
3679 "priorFailures": 12,
3680 "failureWindowMs": 300000,
3681 "successAt": now - 180000,
3682 "severity": "critical"
3683 }),
3684 serde_json::json!({
3685 "entityId": "10.0.0.42",
3686 "endpoint": "/api/auth/login",
3687 "priorFailures": 8,
3688 "failureWindowMs": 300000,
3689 "successAt": now - 420000,
3690 "severity": "high"
3691 }),
3692 serde_json::json!({
3693 "entityId": "172.16.0.88",
3694 "endpoint": "/api/users/login",
3695 "priorFailures": 5,
3696 "failureWindowMs": 300000,
3697 "successAt": now - 900000,
3698 "severity": "medium"
3699 }),
3700 serde_json::json!({
3701 "entityId": "192.168.1.200",
3702 "endpoint": "/api/auth/login",
3703 "priorFailures": 15,
3704 "failureWindowMs": 300000,
3705 "successAt": now - 1800000,
3706 "severity": "critical"
3707 }),
3708 serde_json::json!({
3709 "entityId": "10.0.0.99",
3710 "endpoint": "/api/v2/auth",
3711 "priorFailures": 6,
3712 "failureWindowMs": 300000,
3713 "successAt": now - 3600000,
3714 "severity": "high"
3715 }),
3716 ];
3717
3718 let distributed_attacks = vec![
3720 serde_json::json!({
3721 "fingerprint": "fp_8a3b2c1d4e5f6789",
3722 "endpoint": "/api/auth/login",
3723 "entities": ["192.168.1.10", "192.168.1.11", "192.168.1.12", "192.168.1.13", "192.168.1.14"],
3724 "totalFailures": 245,
3725 "windowStart": now - 3600000,
3726 "lastActivity": now - 120000,
3727 "correlationScore": 0.92
3728 }),
3729 serde_json::json!({
3730 "fingerprint": "fp_2d3e4f5a6b7c8901",
3731 "endpoint": "/api/users/authenticate",
3732 "entities": ["10.0.0.50", "10.0.0.51", "10.0.0.52"],
3733 "totalFailures": 156,
3734 "windowStart": now - 7200000,
3735 "lastActivity": now - 600000,
3736 "correlationScore": 0.78
3737 }),
3738 serde_json::json!({
3739 "fingerprint": "fp_9f8e7d6c5b4a3210",
3740 "endpoint": "/api/auth/login",
3741 "entities": ["172.16.0.20", "172.16.0.21", "172.16.0.22", "172.16.0.23"],
3742 "totalFailures": 89,
3743 "windowStart": now - 1800000,
3744 "lastActivity": now - 300000,
3745 "correlationScore": 0.85
3746 }),
3747 ];
3748
3749 (
3750 StatusCode::OK,
3751 Json(serde_json::json!({
3752 "stats": stats,
3753 "takeoverAlerts": takeover_alerts,
3754 "distributedAttacks": distributed_attacks
3755 })),
3756 )
3757}
3758
3759async fn sensor_system_config_handler(State(state): State<AdminState>) -> impl IntoResponse {
3761 let sites = state.handler.handle_list_sites();
3762 let kernel_keys = parse_kernel_keys(None);
3763 let mut kernel_params = HashMap::new();
3764 let mut kernel_errors = HashMap::new();
3765 for key in kernel_keys {
3766 match read_sysctl_value(&key) {
3767 Ok(value) => {
3768 kernel_params.insert(key, value);
3769 }
3770 Err(err) => {
3771 kernel_errors.insert(key, err);
3772 }
3773 }
3774 }
3775
3776 (
3777 StatusCode::OK,
3778 Json(serde_json::json!({
3779 "success": true,
3780 "data": {
3781 "general": {
3782 "port": 6190,
3783 "sensorId": "synapse-pingora",
3784 "sensorMode": "proxy",
3785 "demoMode": false
3786 },
3787 "waf": {
3788 "enabled": true,
3789 "allowIpSpoofing": false,
3790 "trustedIpHeaders": ["X-Forwarded-For", "X-Real-IP"],
3791 "trustPrivateProxyRanges": true,
3792 "trustedProxyCidrs": []
3793 },
3794 "features": {
3795 "atlasCrewMode": false,
3796 "waf": true,
3797 "rateLimit": true,
3798 "accessLists": true,
3799 "campaigns": false,
3800 "actors": false,
3801 "anomalies": false
3802 },
3803 "kernel": {
3804 "parameters": kernel_params,
3805 "errors": kernel_errors
3806 },
3807 "runtimeConfig": {
3808 "risk": {
3809 "autoblockThreshold": 80,
3810 "riskDecayPerMinute": 5.0,
3811 "maxRiskHistory": 100
3812 },
3813 "state": {
3814 "maxBlockHistory": 500,
3815 "maxIpsTracked": 10000,
3816 "maxKeysPerIp": 50,
3817 "maxValuesPerKey": 500,
3818 "cleanupWindowMs": 300000
3819 },
3820 "session": {
3821 "enabled": true,
3822 "maxSessions": 10000,
3823 "expirationMs": 1800000,
3824 "cookieName": "synapse_session",
3825 "headerName": "X-Session-Id",
3826 "cleanupIntervalMs": 60000
3827 },
3828 "trends": {
3829 "enabled": true,
3830 "bucketSizeMs": 60000,
3831 "retentionHours": 24,
3832 "maxSignalsPerBucket": 5000,
3833 "anomalyCheckIntervalMs": 30000
3834 },
3835 "anomalyRisk": {
3836 "fingerprintChange": 25,
3837 "sessionSharing": 30,
3838 "tokenReuse": 20,
3839 "velocitySpike": 35,
3840 "rotationPattern": 40,
3841 "timingAnomaly": 15,
3842 "impossibleTravel": 50,
3843 "oversizedRequest": 20,
3844 "oversizedResponse": 25,
3845 "bandwidthSpike": 30,
3846 "exfiltrationPattern": 45,
3847 "uploadPattern": 35
3848 },
3849 "payload": {
3850 "enabled": true,
3851 "windowSizeMs": 60000,
3852 "retentionWindows": 60,
3853 "maxEndpoints": 1000,
3854 "maxEntities": 10000,
3855 "oversizeThreshold": 3.0,
3856 "spikeThreshold": 5.0,
3857 "warmupRequests": 50,
3858 "exfiltrationRatio": 100,
3859 "uploadRatio": 50,
3860 "minLargePayload": 100000
3861 },
3862 "credentialStuffing": {
3863 "enabled": true,
3864 "failureWindowMs": 300000,
3865 "failureThresholdSuspicious": 5,
3866 "failureThresholdHigh": 10,
3867 "failureThresholdBlock": 20,
3868 "distributedMinIps": 3,
3869 "distributedWindowMs": 600000,
3870 "takeoverWindowMs": 300000,
3871 "takeoverMinFailures": 3,
3872 "lowSlowMinHours": 6,
3873 "lowSlowMinPerHour": 2,
3874 "maxEntities": 10000,
3875 "broadcastIntervalMs": 5000
3876 },
3877 "ha": {
3878 "sensorMode": "standalone",
3879 "peerUrl": null,
3880 "syncIntervalMs": 100,
3881 "heartbeatIntervalMs": 5000,
3882 "reconnectBaseDelayMs": 1000,
3883 "reconnectMaxDelayMs": 30000,
3884 "maxQueueSize": 10000,
3885 "maxClockDriftMs": 300000,
3886 "maxMessageSize": 1000000,
3887 "messageRateLimit": 1000,
3888 "enableSplitBrainDetection": true,
3889 "heartbeatTimeoutMs": 15000,
3890 "primaryElectionMode": "manual"
3891 },
3892 "dashboard": {
3893 "pollIntervalMs": 1000,
3894 "wsHeartbeatIntervalMs": 30000,
3895 "wsMaxClients": 50
3896 },
3897 "nginx": {
3898 "listenPort": 6190,
3899 "statusPort": 6191,
3900 "statusAllow": ["127.0.0.1", "::1"],
3901 "proxyReadTimeoutMs": 60000,
3902 "proxySendTimeoutMs": 60000,
3903 "clientBodyBufferSizeKb": 128,
3904 "clientMaxBodySizeMb": 10,
3905 "gzipEnabled": true,
3906 "sslEnabled": false,
3907 "certificateId": null,
3908 "accessListId": null,
3909 "customDirectives": null
3910 }
3911 },
3912 "startupFlags": [],
3913 "sites": sites.data.map(|s| s.sites).unwrap_or_default()
3914 }
3915 })),
3916 )
3917}
3918
3919async fn sensor_system_overview_handler(State(state): State<AdminState>) -> impl IntoResponse {
3922 let health = state.handler.handle_health();
3923 let uptime_secs = health.data.as_ref().map(|h| h.uptime_secs).unwrap_or(0);
3924
3925 let mut sys = System::new_all();
3927 sys.refresh_all();
3928
3929 let cpu_cores = sys.cpus().len();
3930 let global_cpu = sys.global_cpu_usage();
3931 let load_avg = System::load_average();
3932
3933 let per_core: Vec<_> = sys
3935 .cpus()
3936 .iter()
3937 .enumerate()
3938 .map(|(i, cpu)| serde_json::json!({ "id": i, "usage": cpu.cpu_usage() }))
3939 .collect();
3940
3941 let total_mem = sys.total_memory();
3943 let used_mem = sys.used_memory();
3944 let free_mem = sys.free_memory();
3945 let mem_percent = if total_mem > 0 {
3946 (used_mem as f64 / total_mem as f64) * 100.0
3947 } else {
3948 0.0
3949 };
3950
3951 let disks = Disks::new_with_refreshed_list();
3953 let disk_info = disks.list().first().map(|d| {
3954 let total = d.total_space();
3955 let free = d.available_space();
3956 let used = total.saturating_sub(free);
3957 let percent = if total > 0 {
3958 (used as f64 / total as f64) * 100.0
3959 } else {
3960 0.0
3961 };
3962 serde_json::json!({
3963 "total": total,
3964 "used": used,
3965 "free": free,
3966 "usagePercent": percent
3967 })
3968 });
3969
3970 let networks = Networks::new_with_refreshed_list();
3972 let interfaces: Vec<_> = networks
3973 .iter()
3974 .map(|(name, data)| {
3975 serde_json::json!({
3976 "name": name,
3977 "ip": "0.0.0.0",
3978 "mac": data.mac_address().to_string(),
3979 "family": "IPv4",
3980 "internal": name == "lo" || name == "lo0"
3981 })
3982 })
3983 .collect();
3984
3985 let pid = std::process::id();
3987 let process_mem = sys
3988 .process(sysinfo::Pid::from_u32(pid))
3989 .map(|p| p.memory())
3990 .unwrap_or(0);
3991
3992 (
3993 StatusCode::OK,
3994 Json(serde_json::json!({
3995 "success": true,
3996 "data": {
3997 "system": {
3998 "hostname": System::host_name().unwrap_or_else(|| "synapse-pingora".to_string()),
3999 "platform": std::env::consts::OS,
4000 "arch": std::env::consts::ARCH,
4001 "release": System::os_version().unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string()),
4002 "uptime": uptime_secs,
4003 "loadAvg": [load_avg.one, load_avg.five, load_avg.fifteen]
4004 },
4005 "resources": {
4006 "cpu": {
4007 "model": sys.cpus().first().map(|c| c.brand()).unwrap_or("Unknown"),
4008 "cores": cpu_cores,
4009 "usage": global_cpu,
4010 "perCore": per_core
4011 },
4012 "memory": {
4013 "total": total_mem,
4014 "used": used_mem,
4015 "free": free_mem,
4016 "usagePercent": mem_percent
4017 },
4018 "disk": disk_info
4019 },
4020 "network": {
4021 "interfaces": interfaces,
4022 "primaryIp": "127.0.0.1"
4023 },
4024 "process": {
4025 "pid": pid,
4026 "uptime": uptime_secs,
4027 "memoryUsage": {
4028 "rss": process_mem,
4029 "heapTotal": 0,
4030 "heapUsed": 0
4031 }
4032 }
4033 }
4034 })),
4035 )
4036}
4037
4038async fn sensor_system_performance_handler(State(_state): State<AdminState>) -> impl IntoResponse {
4041 let mut sys = System::new_all();
4042 sys.refresh_all();
4043
4044 let cpu_cores = sys.cpus().len();
4045 let global_cpu = sys.global_cpu_usage();
4046
4047 let per_core: Vec<_> = sys
4049 .cpus()
4050 .iter()
4051 .enumerate()
4052 .map(|(i, cpu)| serde_json::json!({ "id": i, "usage": cpu.cpu_usage() }))
4053 .collect();
4054
4055 let total_mem = sys.total_memory();
4057 let used_mem = sys.used_memory();
4058 let free_mem = sys.free_memory();
4059 let mem_percent = if total_mem > 0 {
4060 (used_mem as f64 / total_mem as f64) * 100.0
4061 } else {
4062 0.0
4063 };
4064
4065 let disks = Disks::new_with_refreshed_list();
4067 let disk_info = disks.list().first().map(|d| {
4068 let total = d.total_space();
4069 let free = d.available_space();
4070 let used = total.saturating_sub(free);
4071 let percent = if total > 0 {
4072 (used as f64 / total as f64) * 100.0
4073 } else {
4074 0.0
4075 };
4076 serde_json::json!({
4077 "total": total,
4078 "used": used,
4079 "free": free,
4080 "usagePercent": percent
4081 })
4082 });
4083
4084 record_metrics_sample();
4086
4087 let history: Vec<_> = METRICS_HISTORY.read().iter().cloned().collect();
4089
4090 (
4091 StatusCode::OK,
4092 Json(serde_json::json!({
4093 "success": true,
4094 "data": {
4095 "current": {
4096 "cpu": {
4097 "usage": global_cpu,
4098 "perCore": per_core,
4099 "model": sys.cpus().first().map(|c| c.brand()).unwrap_or("Unknown"),
4100 "cores": cpu_cores
4101 },
4102 "memory": {
4103 "total": total_mem,
4104 "used": used_mem,
4105 "free": free_mem,
4106 "usagePercent": mem_percent
4107 },
4108 "disk": disk_info
4109 },
4110 "history": history
4111 }
4112 })),
4113 )
4114}
4115
4116async fn sensor_system_network_handler() -> impl IntoResponse {
4119 let networks = Networks::new_with_refreshed_list();
4120
4121 let interfaces: Vec<_> = networks
4123 .iter()
4124 .map(|(name, data)| {
4125 serde_json::json!({
4126 "name": name,
4127 "ip": "0.0.0.0",
4128 "mac": data.mac_address().to_string(),
4129 "family": "IPv4",
4130 "internal": name == "lo" || name == "lo0",
4131 "rxBytes": data.total_received(),
4132 "txBytes": data.total_transmitted(),
4133 "rxPackets": data.total_packets_received(),
4134 "txPackets": data.total_packets_transmitted()
4135 })
4136 })
4137 .collect();
4138
4139 (
4140 StatusCode::OK,
4141 Json(serde_json::json!({
4142 "success": true,
4143 "data": {
4144 "interfaces": interfaces,
4145 "connections": [],
4146 "summary": {
4147 "total": 0,
4148 "established": 0,
4149 "listening": 1,
4150 "timeWait": 0,
4151 "closeWait": 0
4152 },
4153 "dns": {
4154 "servers": ["8.8.8.8", "8.8.4.4"],
4155 "search": []
4156 }
4157 }
4158 })),
4159 )
4160}
4161
4162async fn sensor_system_processes_handler() -> impl IntoResponse {
4165 let mut sys = System::new_all();
4166 sys.refresh_all();
4167
4168 let current_pid = std::process::id();
4169 let total_mem = sys.total_memory() as f64;
4170
4171 let mut processes: Vec<_> = sys.processes().iter()
4173 .map(|(pid, proc)| {
4174 let mem_percent = if total_mem > 0.0 { (proc.memory() as f64 / total_mem) * 100.0 } else { 0.0 };
4175 serde_json::json!({
4176 "pid": pid.as_u32(),
4177 "name": proc.name().to_string_lossy(),
4178 "user": proc.user_id().map(|u| u.to_string()).unwrap_or_default(),
4179 "cpu": proc.cpu_usage(),
4180 "memory": mem_percent,
4181 "status": format!("{:?}", proc.status()).to_lowercase(),
4182 "command": proc.cmd().iter().map(|s| s.to_string_lossy()).collect::<Vec<_>>().join(" ")
4183 })
4184 })
4185 .collect();
4186
4187 processes.sort_by(|a, b| {
4189 let cpu_a = a.get("cpu").and_then(|v| v.as_f64()).unwrap_or(0.0);
4190 let cpu_b = b.get("cpu").and_then(|v| v.as_f64()).unwrap_or(0.0);
4191 cpu_b
4192 .partial_cmp(&cpu_a)
4193 .unwrap_or(std::cmp::Ordering::Equal)
4194 });
4195 processes.truncate(20);
4196
4197 let synapse_proc = sys.process(sysinfo::Pid::from_u32(current_pid));
4199 let atlascrew_services: Vec<_> = synapse_proc
4200 .map(|p| {
4201 let mem_percent = if total_mem > 0.0 {
4202 (p.memory() as f64 / total_mem) * 100.0
4203 } else {
4204 0.0
4205 };
4206 vec![serde_json::json!({
4207 "name": "synapse-pingora",
4208 "status": "running",
4209 "pid": current_pid,
4210 "cpu": p.cpu_usage(),
4211 "memory": mem_percent
4212 })]
4213 })
4214 .unwrap_or_default();
4215
4216 let mut running = 0;
4218 let mut sleeping = 0;
4219 let mut stopped = 0;
4220 let mut zombie = 0;
4221 for proc in sys.processes().values() {
4222 match proc.status() {
4223 sysinfo::ProcessStatus::Run => running += 1,
4224 sysinfo::ProcessStatus::Sleep => sleeping += 1,
4225 sysinfo::ProcessStatus::Stop => stopped += 1,
4226 sysinfo::ProcessStatus::Zombie => zombie += 1,
4227 _ => sleeping += 1,
4228 }
4229 }
4230
4231 (
4232 StatusCode::OK,
4233 Json(serde_json::json!({
4234 "success": true,
4235 "data": {
4236 "processes": processes,
4237 "services": {
4238 "atlascrew": atlascrew_services,
4239 "system": []
4240 },
4241 "summary": {
4242 "total": sys.processes().len(),
4243 "running": running,
4244 "sleeping": sleeping,
4245 "stopped": stopped,
4246 "zombie": zombie
4247 }
4248 }
4249 })),
4250 )
4251}
4252
4253#[derive(Debug, serde::Deserialize)]
4255struct LogsQuery {
4256 #[serde(default = "default_log_limit")]
4257 limit: usize,
4258 #[serde(default)]
4259 level: Option<String>,
4260 #[serde(default)]
4261 source: Option<String>,
4262}
4263
4264fn default_log_limit() -> usize {
4265 100
4266}
4267
4268async fn sensor_system_logs_handler(Query(params): Query<LogsQuery>) -> impl IntoResponse {
4269 let logs = LOG_BUFFER.read();
4271 let limit = params.limit.min(200);
4272
4273 let filtered: Vec<_> = logs
4275 .iter()
4276 .filter(|log| {
4277 params
4278 .level
4279 .as_ref()
4280 .map(|l| log.level == *l)
4281 .unwrap_or(true)
4282 })
4283 .take(limit)
4284 .cloned()
4285 .collect();
4286
4287 (
4288 StatusCode::OK,
4289 Json(serde_json::json!({
4290 "success": true,
4291 "data": {
4292 "logs": filtered,
4293 "hasMore": logs.len() > limit
4294 }
4295 })),
4296 )
4297}
4298
4299async fn logs_handler(Query(params): Query<LogsQuery>) -> impl IntoResponse {
4305 let logs = LOG_BUFFER.read();
4306 let limit = params.limit.min(500);
4307
4308 let filtered: Vec<_> = logs
4309 .iter()
4310 .filter(|log| {
4311 let level_match = params
4312 .level
4313 .as_ref()
4314 .map(|l| log.level == *l)
4315 .unwrap_or(true);
4316 let source_match = params
4317 .source
4318 .as_ref()
4319 .map(|s| log.source == *s)
4320 .unwrap_or(true);
4321 level_match && source_match
4322 })
4323 .rev() .take(limit)
4325 .cloned()
4326 .collect();
4327
4328 (
4329 StatusCode::OK,
4330 Json(serde_json::json!({
4331 "success": true,
4332 "data": {
4333 "logs": filtered,
4334 "total": logs.len(),
4335 "sources": ["http", "waf", "system", "access"]
4336 }
4337 })),
4338 )
4339}
4340
4341async fn logs_by_source_handler(
4343 Path(source): Path<String>,
4344 Query(params): Query<LogsQuery>,
4345) -> impl IntoResponse {
4346 let logs = LOG_BUFFER.read();
4347 let limit = params.limit.min(500);
4348
4349 let filtered: Vec<_> = logs
4350 .iter()
4351 .filter(|log| log.source == source)
4352 .filter(|log| {
4353 params
4354 .level
4355 .as_ref()
4356 .map(|l| log.level == *l)
4357 .unwrap_or(true)
4358 })
4359 .rev()
4360 .take(limit)
4361 .cloned()
4362 .collect();
4363
4364 (
4365 StatusCode::OK,
4366 Json(serde_json::json!({
4367 "success": true,
4368 "data": {
4369 "logs": filtered,
4370 "source": source
4371 }
4372 })),
4373 )
4374}
4375
4376async fn diagnostic_bundle_handler(State(state): State<AdminState>) -> impl IntoResponse {
4382 let mut sys = System::new_all();
4383 sys.refresh_all();
4384
4385 let system_info = serde_json::json!({
4387 "hostname": System::host_name().unwrap_or_default(),
4388 "os": System::name().unwrap_or_default(),
4389 "os_version": System::os_version().unwrap_or_default(),
4390 "kernel_version": System::kernel_version().unwrap_or_default(),
4391 "cpu_count": sys.cpus().len(),
4392 "total_memory_mb": sys.total_memory() / 1024 / 1024,
4393 "used_memory_mb": sys.used_memory() / 1024 / 1024,
4394 "uptime_secs": System::uptime()
4395 });
4396
4397 let waf_stats = state.handler.handle_waf_stats();
4399
4400 let health = state.handler.handle_health();
4402
4403 let logs: Vec<_> = LOG_BUFFER.read().iter().cloned().collect();
4405
4406 let entities = state.handler.handle_list_entities(100);
4408
4409 let sites = state.handler.handle_list_sites();
4411
4412 let dlp_stats = state.handler.dlp_scanner().map(|scanner| {
4414 let stats = scanner.stats();
4415 serde_json::json!({
4416 "enabled": scanner.is_enabled(),
4417 "pattern_count": scanner.pattern_count(),
4418 "total_scans": stats.total_scans,
4419 "total_matches": stats.total_matches
4420 })
4421 });
4422
4423 let bundle = serde_json::json!({
4424 "generated_at": chrono::Utc::now().to_rfc3339(),
4425 "version": env!("CARGO_PKG_VERSION"),
4426 "system": system_info,
4427 "health": health,
4428 "waf_stats": waf_stats,
4429 "dlp_stats": dlp_stats,
4430 "sites": sites,
4431 "entities": entities,
4432 "recent_logs": logs.into_iter().rev().take(200).collect::<Vec<_>>()
4433 });
4434
4435 let body = serde_json::to_string_pretty(&bundle).unwrap_or_default();
4437 let filename = format!(
4438 "synapse-diagnostic-{}.json",
4439 chrono::Utc::now().format("%Y%m%d-%H%M%S")
4440 );
4441 let disposition = format!("attachment; filename=\"{}\"", filename);
4442 build_response(StatusCode::OK, body, "application/json", Some(disposition))
4443}
4444
4445async fn config_export_handler(State(state): State<AdminState>) -> axum::response::Response {
4451 let config_paths = [
4453 "config.yaml",
4454 "config.yml",
4455 "/etc/synapse-pingora/config.yaml",
4456 ];
4457
4458 for path in &config_paths {
4459 if let Ok(content) = std::fs::read_to_string(path) {
4460 let filename = format!(
4461 "synapse-config-{}.yaml",
4462 chrono::Utc::now().format("%Y%m%d-%H%M%S")
4463 );
4464 let disposition = format!("attachment; filename=\"{}\"", filename);
4465 return build_response(
4466 StatusCode::OK,
4467 content,
4468 "application/x-yaml",
4469 Some(disposition),
4470 );
4471 }
4472 }
4473
4474 let sites = state.handler.handle_list_sites();
4476 let config_json = serde_json::json!({
4477 "exported_at": chrono::Utc::now().to_rfc3339(),
4478 "sites": sites,
4479 "note": "Runtime configuration export - no config file found"
4480 });
4481
4482 let filename = format!(
4483 "synapse-config-{}.json",
4484 chrono::Utc::now().format("%Y%m%d-%H%M%S")
4485 );
4486 let disposition = format!("attachment; filename=\"{}\"", filename);
4487 let body = serde_json::to_string_pretty(&config_json).unwrap_or_default();
4488 build_response(StatusCode::OK, body, "application/json", Some(disposition))
4489}
4490
4491async fn config_import_handler(State(state): State<AdminState>, body: String) -> impl IntoResponse {
4493 let config_value: serde_json::Value = match serde_yaml::from_str(&body) {
4496 Ok(v) => v,
4497 Err(yaml_err) => match serde_json::from_str(&body) {
4498 Ok(v) => v,
4499 Err(json_err) => {
4500 tracing::warn!(
4502 "Config import parse failed - YAML: {}, JSON: {}",
4503 yaml_err,
4504 json_err
4505 );
4506 return validation_error(
4507 "Invalid configuration format. Expected valid YAML or JSON.",
4508 Some(&json_err),
4509 );
4510 }
4511 },
4512 };
4513
4514 if config_value.get("sites").is_none() && config_value.get("server").is_none() {
4516 return validation_error(
4517 "Configuration must contain 'sites' or 'server' section",
4518 None,
4519 );
4520 }
4521
4522 let config_file: crate::config::ConfigFile = match serde_json::from_value(config_value) {
4524 Ok(c) => c,
4525 Err(e) => {
4526 tracing::warn!("Config import deserialization failed: {}", e);
4527 return validation_error(
4528 "Configuration structure is invalid. Check field names and types.",
4529 Some(&e),
4530 );
4531 }
4532 };
4533
4534 match state.handler.config_manager() {
4536 Some(config_mgr) => match config_mgr.update_full_config(config_file) {
4537 Ok(result) => {
4538 info!(
4539 "Config imported successfully: applied={}, persisted={}, warnings={}",
4540 result.applied,
4541 result.persisted,
4542 result.warnings.len()
4543 );
4544 (
4545 StatusCode::OK,
4546 Json(serde_json::json!({
4547 "success": true,
4548 "message": "Configuration imported and applied successfully.",
4549 "applied": result.applied,
4550 "persisted": result.persisted,
4551 "rebuild_required": result.rebuild_required,
4552 "warnings": result.warnings
4553 })),
4554 )
4555 .into_response()
4556 }
4557 Err(e) => {
4558 tracing::warn!("Config import application failed: {}", e);
4559 internal_error(
4560 "Failed to apply configuration. See server logs for details.",
4561 Some(&e),
4562 )
4563 }
4564 },
4565 None => service_unavailable("ConfigManager"),
4566 }
4567}
4568
4569async fn sensor_dlp_stats_handler(State(state): State<AdminState>) -> impl IntoResponse {
4572 match state.handler.dlp_scanner() {
4573 Some(scanner) => {
4574 let stats = scanner.stats();
4575
4576 let match_bucket = match stats.total_matches {
4578 0 => "none",
4579 1..=10 => "low (1-10)",
4580 11..=100 => "moderate (11-100)",
4581 101..=1000 => "high (101-1000)",
4582 _ => "critical (1000+)",
4583 };
4584
4585 (
4586 StatusCode::OK,
4587 Json(serde_json::json!({
4588 "totalScans": stats.total_scans,
4589 "totalMatches": stats.total_matches,
4590 "matchBucket": match_bucket,
4591 "patternCount": scanner.pattern_count(),
4592 })),
4593 )
4594 .into_response()
4595 }
4596 None => {
4597 let mut problem = ProblemDetails::new(
4598 StatusCode::NOT_FOUND,
4599 "DLP scanning feature is not enabled on this sensor",
4600 );
4601 problem.code = Some(error_codes::NOT_FOUND.to_string());
4602 problem.instance = Some("/_sensor/dlp/stats".to_string());
4603 (StatusCode::NOT_FOUND, Json(problem)).into_response()
4604 }
4605 }
4606}
4607
4608#[derive(Debug, Deserialize)]
4610struct ViolationsQuery {
4611 #[serde(default = "default_violations_limit")]
4613 limit: usize,
4614 #[serde(default)]
4616 cursor: Option<u64>,
4617}
4618
4619fn default_violations_limit() -> usize {
4620 50
4621}
4622
4623async fn sensor_dlp_violations_handler(
4626 State(state): State<AdminState>,
4627 Query(query): Query<ViolationsQuery>,
4628) -> impl IntoResponse {
4629 match state.handler.dlp_scanner() {
4630 Some(scanner) => {
4631 let all_violations = scanner.get_recent_violations().await;
4632
4633 let filtered: Vec<_> = if let Some(cursor) = query.cursor {
4635 all_violations
4636 .into_iter()
4637 .filter(|v| v.timestamp > cursor)
4638 .collect()
4639 } else {
4640 all_violations
4641 };
4642
4643 let total = filtered.len();
4645 let violations: Vec<_> = filtered.into_iter().take(query.limit).collect();
4646
4647 let next_cursor = violations.last().map(|v| v.timestamp);
4649
4650 (
4651 StatusCode::OK,
4652 Json(serde_json::json!({
4653 "violations": violations,
4654 "pagination": {
4655 "total": total,
4656 "limit": query.limit,
4657 "nextCursor": next_cursor,
4658 "hasMore": total > query.limit
4659 }
4660 })),
4661 )
4662 .into_response()
4663 }
4664 None => {
4665 let mut problem = ProblemDetails::new(
4666 StatusCode::NOT_FOUND,
4667 "DLP scanning feature is not enabled on this sensor",
4668 );
4669 problem.code = Some(error_codes::NOT_FOUND.to_string());
4670 problem.instance = Some("/_sensor/dlp/violations".to_string());
4671 (StatusCode::NOT_FOUND, Json(problem)).into_response()
4672 }
4673 }
4674}
4675
4676#[derive(Debug, Deserialize)]
4682struct DiscoveryQuery {
4683 #[serde(default = "default_discovery_limit")]
4684 limit: usize,
4685}
4686
4687fn default_discovery_limit() -> usize {
4688 20
4689}
4690
4691async fn profiling_templates_handler(State(state): State<AdminState>) -> impl IntoResponse {
4693 let endpoint_stats = state.handler.metrics().get_endpoint_stats();
4695
4696 let templates: Vec<serde_json::Value> = endpoint_stats
4698 .into_iter()
4699 .map(|(path, stats)| {
4700 let service_id = infer_service_id(&path);
4702 let tags = infer_endpoint_tags(&path);
4704
4705 serde_json::json!({
4706 "template": path,
4707 "matchCount": stats.hit_count,
4708 "examples": [path.clone()],
4709 "firstSeen": stats.first_seen,
4710 "lastSeen": stats.last_seen,
4711 "serviceId": service_id,
4712 "tags": tags,
4713 "methods": stats.methods
4714 })
4715 })
4716 .collect();
4717
4718 (
4719 StatusCode::OK,
4720 Json(serde_json::json!({
4721 "templates": templates,
4722 "count": templates.len()
4723 })),
4724 )
4725}
4726
4727fn infer_service_id(path: &str) -> &'static str {
4729 if path.contains("/auth") {
4730 "auth-service"
4731 } else if path.contains("/admin") {
4732 "admin-service"
4733 } else if path.contains("/users") || path.contains("/user") {
4734 "user-service"
4735 } else if path.contains("/product") {
4736 "product-service"
4737 } else if path.contains("/order") {
4738 "order-service"
4739 } else if path.contains("/payment") || path.contains("/checkout") {
4740 "payment-service"
4741 } else if path.contains("/search") {
4742 "search-service"
4743 } else if path.contains("/banking") {
4744 "banking-service"
4745 } else if path.contains("/healthcare") {
4746 "healthcare-service"
4747 } else if path.contains("/ecommerce") {
4748 "ecommerce-service"
4749 } else if path.contains("/genai") {
4750 "genai-service"
4751 } else {
4752 "api-gateway"
4753 }
4754}
4755
4756fn infer_endpoint_tags(path: &str) -> Vec<&'static str> {
4758 let mut tags = vec!["REST"];
4759
4760 if path.contains("/auth") || path.contains("/login") {
4761 tags.push("Auth");
4762 if path.contains("/login") {
4763 tags.push("Critical");
4764 }
4765 }
4766 if path.contains("/admin") {
4767 tags.push("Admin");
4768 tags.push("Internal");
4769 }
4770 if path.contains("/user") || path.contains("/account") {
4771 tags.push("PII");
4772 }
4773 if path.contains("/payment") || path.contains("/checkout") || path.contains("/banking") {
4774 tags.push("PCI");
4775 tags.push("Critical");
4776 }
4777 if path.contains("/healthcare") || path.contains("/records") {
4778 tags.push("PHI");
4779 tags.push("Critical");
4780 }
4781 if !path.contains("/admin") && !path.contains("/internal") {
4782 tags.push("Public");
4783 }
4784
4785 tags
4786}
4787
4788async fn profiling_baselines_handler(State(_state): State<AdminState>) -> impl IntoResponse {
4790 let now = chrono::Utc::now().timestamp_millis() as u64;
4791
4792 let profiles = get_profiles();
4794
4795 if !profiles.is_empty() {
4796 let baselines: Vec<serde_json::Value> = profiles
4798 .iter()
4799 .map(|p| {
4800 let (p50, p95, p99) = p.payload_size.percentiles();
4802
4803 let status_codes: Vec<[u32; 2]> = p
4805 .status_codes
4806 .iter()
4807 .map(|(&code, &count)| [code as u32, count])
4808 .collect();
4809
4810 let time_window_mins =
4812 ((p.last_updated_ms.saturating_sub(p.first_seen_ms)) as f64 / 60000.0).max(1.0);
4813 let avg_rpm = p.sample_count as f64 / time_window_mins;
4814
4815 serde_json::json!({
4816 "template": p.template,
4817 "totalRequests": p.sample_count,
4818 "avgRequestsPerMinute": (avg_rpm * 100.0).round() / 100.0,
4819 "p50ResponseTime": p50 as u64,
4820 "p95ResponseTime": p95 as u64,
4821 "p99ResponseTime": p99 as u64,
4822 "statusCodes": status_codes,
4823 "firstSeen": p.first_seen_ms,
4824 "lastSeen": p.last_updated_ms
4825 })
4826 })
4827 .collect();
4828
4829 return (
4830 StatusCode::OK,
4831 Json(serde_json::json!({
4832 "baselines": baselines,
4833 "count": baselines.len()
4834 })),
4835 );
4836 }
4837
4838 let baselines = vec![
4840 serde_json::json!({
4841 "template": "/api/users",
4842 "totalRequests": 90,
4843 "avgRequestsPerMinute": 1.5,
4844 "p50ResponseTime": 45,
4845 "p95ResponseTime": 120,
4846 "p99ResponseTime": 250,
4847 "statusCodes": [[200, 85], [404, 4], [500, 1]],
4848 "firstSeen": now - 3600000,
4849 "lastSeen": now
4850 }),
4851 serde_json::json!({
4852 "template": "/api/users/{id}",
4853 "totalRequests": 156,
4854 "avgRequestsPerMinute": 2.6,
4855 "p50ResponseTime": 32,
4856 "p95ResponseTime": 95,
4857 "p99ResponseTime": 180,
4858 "statusCodes": [[200, 140], [404, 12], [500, 4]],
4859 "firstSeen": now - 7200000,
4860 "lastSeen": now - 60000
4861 }),
4862 serde_json::json!({
4863 "template": "/api/products/{id}",
4864 "totalRequests": 45,
4865 "avgRequestsPerMinute": 0.75,
4866 "p50ResponseTime": 28,
4867 "p95ResponseTime": 85,
4868 "p99ResponseTime": 150,
4869 "statusCodes": [[200, 40], [404, 5]],
4870 "firstSeen": now - 7200000,
4871 "lastSeen": now - 300000
4872 }),
4873 serde_json::json!({
4874 "template": "/api/auth/login",
4875 "totalRequests": 120,
4876 "avgRequestsPerMinute": 2.0,
4877 "p50ResponseTime": 180,
4878 "p95ResponseTime": 450,
4879 "p99ResponseTime": 800,
4880 "statusCodes": [[200, 95], [401, 20], [429, 5]],
4881 "firstSeen": now - 86400000,
4882 "lastSeen": now - 60000
4883 }),
4884 serde_json::json!({
4885 "template": "/api/auth/refresh",
4886 "totalRequests": 85,
4887 "avgRequestsPerMinute": 1.4,
4888 "p50ResponseTime": 65,
4889 "p95ResponseTime": 150,
4890 "p99ResponseTime": 280,
4891 "statusCodes": [[200, 80], [401, 5]],
4892 "firstSeen": now - 86400000,
4893 "lastSeen": now - 120000
4894 }),
4895 serde_json::json!({
4896 "template": "/api/admin/users",
4897 "totalRequests": 15,
4898 "avgRequestsPerMinute": 0.25,
4899 "p50ResponseTime": 120,
4900 "p95ResponseTime": 350,
4901 "p99ResponseTime": 500,
4902 "statusCodes": [[200, 10], [403, 5]],
4903 "firstSeen": now - 1800000,
4904 "lastSeen": now - 120000
4905 }),
4906 serde_json::json!({
4907 "template": "/api/search",
4908 "totalRequests": 200,
4909 "avgRequestsPerMinute": 3.3,
4910 "p50ResponseTime": 85,
4911 "p95ResponseTime": 220,
4912 "p99ResponseTime": 400,
4913 "statusCodes": [[200, 195], [400, 5]],
4914 "firstSeen": now - 172800000,
4915 "lastSeen": now
4916 }),
4917 serde_json::json!({
4918 "template": "/api/orders",
4919 "totalRequests": 67,
4920 "avgRequestsPerMinute": 1.1,
4921 "p50ResponseTime": 95,
4922 "p95ResponseTime": 280,
4923 "p99ResponseTime": 450,
4924 "statusCodes": [[200, 60], [401, 5], [500, 2]],
4925 "firstSeen": now - 43200000,
4926 "lastSeen": now - 180000
4927 }),
4928 serde_json::json!({
4929 "template": "/api/orders/{id}",
4930 "totalRequests": 134,
4931 "avgRequestsPerMinute": 2.2,
4932 "p50ResponseTime": 55,
4933 "p95ResponseTime": 140,
4934 "p99ResponseTime": 280,
4935 "statusCodes": [[200, 120], [404, 10], [401, 4]],
4936 "firstSeen": now - 43200000,
4937 "lastSeen": now - 60000
4938 }),
4939 serde_json::json!({
4940 "template": "/api/checkout",
4941 "totalRequests": 42,
4942 "avgRequestsPerMinute": 0.7,
4943 "p50ResponseTime": 320,
4944 "p95ResponseTime": 800,
4945 "p99ResponseTime": 1200,
4946 "statusCodes": [[200, 35], [400, 4], [500, 3]],
4947 "firstSeen": now - 21600000,
4948 "lastSeen": now - 300000
4949 }),
4950 ];
4951
4952 (
4953 StatusCode::OK,
4954 Json(serde_json::json!({
4955 "baselines": baselines,
4956 "count": baselines.len()
4957 })),
4958 )
4959}
4960
4961async fn profiling_schemas_handler(State(_state): State<AdminState>) -> impl IntoResponse {
4963 let real_schemas = get_schemas();
4965
4966 let schemas: Vec<serde_json::Value> = real_schemas
4968 .iter()
4969 .map(|s| {
4970 serde_json::json!({
4971 "template": s.template,
4972 "sampleCount": s.sample_count,
4973 "requestFieldCount": s.request_schema.len(),
4974 "responseFieldCount": s.response_schema.len(),
4975 "lastUpdated": s.last_updated_ms,
4976 "version": s.version
4977 })
4978 })
4979 .collect();
4980
4981 (
4982 StatusCode::OK,
4983 Json(serde_json::json!({
4984 "schemas": schemas,
4985 "count": schemas.len()
4986 })),
4987 )
4988}
4989
4990async fn profiling_discovery_handler(Query(params): Query<DiscoveryQuery>) -> impl IntoResponse {
4992 let now = chrono::Utc::now().timestamp_millis() as u64;
4993 let limit = params.limit.min(100);
4994
4995 let all_events = vec![
4997 serde_json::json!({
4998 "type": "endpoint_discovered",
4999 "template": "/api/checkout",
5000 "timestamp": now - 300000,
5001 "details": "New endpoint discovered from traffic analysis"
5002 }),
5003 serde_json::json!({
5004 "type": "schema_changed",
5005 "template": "/api/users/{id}",
5006 "timestamp": now - 600000,
5007 "details": "Response schema gained 2 new fields",
5008 "version": 3
5009 }),
5010 serde_json::json!({
5011 "type": "endpoint_discovered",
5012 "template": "/api/orders/{id}/items",
5013 "timestamp": now - 900000,
5014 "details": "New nested endpoint discovered"
5015 }),
5016 serde_json::json!({
5017 "type": "schema_version",
5018 "template": "/api/auth/login",
5019 "timestamp": now - 1200000,
5020 "details": "Response now includes refresh_token field",
5021 "version": 2
5022 }),
5023 serde_json::json!({
5024 "type": "endpoint_discovered",
5025 "template": "/api/webhooks/stripe",
5026 "timestamp": now - 1800000,
5027 "details": "Webhook endpoint discovered"
5028 }),
5029 serde_json::json!({
5030 "type": "schema_changed",
5031 "template": "/api/products/{id}",
5032 "timestamp": now - 2400000,
5033 "details": "Added inventory_count to response",
5034 "version": 4
5035 }),
5036 serde_json::json!({
5037 "type": "endpoint_discovered",
5038 "template": "/api/admin/settings",
5039 "timestamp": now - 3600000,
5040 "details": "Admin settings endpoint found"
5041 }),
5042 serde_json::json!({
5043 "type": "schema_changed",
5044 "template": "/api/orders",
5045 "timestamp": now - 7200000,
5046 "details": "Request now accepts filter parameter",
5047 "version": 2
5048 }),
5049 ];
5050
5051 let events: Vec<_> = all_events.into_iter().take(limit).collect();
5052
5053 (
5054 StatusCode::OK,
5055 Json(serde_json::json!({
5056 "events": events,
5057 "count": events.len()
5058 })),
5059 )
5060}
5061
5062async fn profiling_anomalies_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5064 let now = chrono::Utc::now().timestamp_millis() as u64;
5065
5066 let endpoints = vec![
5068 serde_json::json!({
5069 "template": "/api/auth/login",
5070 "anomalyScore": 45.5,
5071 "anomalyCount": 8,
5072 "totalRequests": 120,
5073 "recentAnomalies": [
5074 {
5075 "timestamp": now - 120000,
5076 "type": "high_failure_rate",
5077 "severity": "high",
5078 "score": 72.0,
5079 "detail": "401 responses at 16.7% (threshold: 10%)"
5080 },
5081 {
5082 "timestamp": now - 600000,
5083 "type": "rate_spike",
5084 "severity": "medium",
5085 "score": 45.0,
5086 "detail": "Request rate 3.2x above baseline"
5087 }
5088 ]
5089 }),
5090 serde_json::json!({
5091 "template": "/api/admin/users",
5092 "anomalyScore": 65.2,
5093 "anomalyCount": 5,
5094 "totalRequests": 15,
5095 "recentAnomalies": [
5096 {
5097 "timestamp": now - 180000,
5098 "type": "unauthorized_access",
5099 "severity": "high",
5100 "score": 85.0,
5101 "detail": "403 responses from new IP range"
5102 },
5103 {
5104 "timestamp": now - 300000,
5105 "type": "unusual_timing",
5106 "severity": "medium",
5107 "score": 52.0,
5108 "detail": "Access outside normal business hours"
5109 }
5110 ]
5111 }),
5112 serde_json::json!({
5113 "template": "/api/checkout",
5114 "anomalyScore": 38.7,
5115 "anomalyCount": 3,
5116 "totalRequests": 42,
5117 "recentAnomalies": [
5118 {
5119 "timestamp": now - 450000,
5120 "type": "response_time_spike",
5121 "severity": "medium",
5122 "score": 55.0,
5123 "detail": "P99 latency 2.5x above baseline"
5124 }
5125 ]
5126 }),
5127 serde_json::json!({
5128 "template": "/api/search",
5129 "anomalyScore": 22.1,
5130 "anomalyCount": 2,
5131 "totalRequests": 200,
5132 "recentAnomalies": [
5133 {
5134 "timestamp": now - 900000,
5135 "type": "payload_size",
5136 "severity": "low",
5137 "score": 28.0,
5138 "detail": "Unusually large query parameters"
5139 }
5140 ]
5141 }),
5142 serde_json::json!({
5143 "template": "/api/users/{id}",
5144 "anomalyScore": 12.5,
5145 "anomalyCount": 1,
5146 "totalRequests": 156,
5147 "recentAnomalies": [
5148 {
5149 "timestamp": now - 1800000,
5150 "type": "enumeration_pattern",
5151 "severity": "low",
5152 "score": 32.0,
5153 "detail": "Sequential ID access pattern detected"
5154 }
5155 ]
5156 }),
5157 ];
5158
5159 (
5160 StatusCode::OK,
5161 Json(serde_json::json!({
5162 "endpoints": endpoints,
5163 "count": endpoints.len()
5164 })),
5165 )
5166}
5167
5168async fn profiles_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5171 let now = chrono::Utc::now().timestamp_millis();
5176 let seed_profiles = vec![
5177 serde_json::json!({
5178 "template": "/api/users",
5179 "payload_size": { "mean": 256.0, "std_dev": 64.0, "min": 128, "max": 512 },
5180 "expected_params": { "page": 45, "limit": 45, "sort": 23 },
5181 "content_types": { "application/json": 89, "text/html": 1 },
5182 "status_codes": { "200": 85, "404": 4, "500": 1 },
5183 "endpoint_risk": 12.5,
5184 "sample_count": 90,
5185 "first_seen_ms": now - 3600000,
5186 "last_updated_ms": now
5187 }),
5188 serde_json::json!({
5189 "template": "/api/products/{id}",
5190 "payload_size": { "mean": 1024.0, "std_dev": 256.0, "min": 512, "max": 2048 },
5191 "expected_params": { "fields": 30, "include": 15 },
5192 "content_types": { "application/json": 45 },
5193 "status_codes": { "200": 40, "404": 5 },
5194 "endpoint_risk": 8.2,
5195 "sample_count": 45,
5196 "first_seen_ms": now - 7200000,
5197 "last_updated_ms": now - 300000
5198 }),
5199 serde_json::json!({
5200 "template": "/api/auth/login",
5201 "payload_size": { "mean": 128.0, "std_dev": 32.0, "min": 64, "max": 256 },
5202 "expected_params": {},
5203 "content_types": { "application/json": 120 },
5204 "status_codes": { "200": 95, "401": 20, "429": 5 },
5205 "endpoint_risk": 65.8,
5206 "sample_count": 120,
5207 "first_seen_ms": now - 86400000,
5208 "last_updated_ms": now - 60000
5209 }),
5210 serde_json::json!({
5211 "template": "/api/admin/users",
5212 "payload_size": { "mean": 512.0, "std_dev": 128.0, "min": 256, "max": 1024 },
5213 "expected_params": { "role": 8, "status": 5 },
5214 "content_types": { "application/json": 15 },
5215 "status_codes": { "200": 10, "403": 5 },
5216 "endpoint_risk": 78.5,
5217 "sample_count": 15,
5218 "first_seen_ms": now - 1800000,
5219 "last_updated_ms": now - 120000
5220 }),
5221 serde_json::json!({
5222 "template": "/api/search",
5223 "payload_size": { "mean": 64.0, "std_dev": 16.0, "min": 32, "max": 128 },
5224 "expected_params": { "q": 200, "page": 150, "limit": 150, "category": 80 },
5225 "content_types": { "application/json": 200 },
5226 "status_codes": { "200": 195, "400": 5 },
5227 "endpoint_risk": 25.3,
5228 "sample_count": 200,
5229 "first_seen_ms": now - 172800000,
5230 "last_updated_ms": now
5231 }),
5232 ];
5233 Json(serde_json::json!({
5234 "success": true,
5235 "data": seed_profiles
5236 }))
5237}
5238
5239async fn save_profiles_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5243 wrap_response(crate::api::ApiResponse::<String>::err(
5246 "Profile persistence endpoint not yet integrated with new profiler module. \
5247 Use GET /api/profiler/profiles to view current profiles.",
5248 ))
5249}
5250
5251async fn api_profiles_reset_handler(State(state): State<AdminState>) -> impl IntoResponse {
5253 state.handler.metrics().reset_profiles();
5256 info!("Endpoint profiles reset");
5257 wrap_response(crate::api::ApiResponse::ok(
5258 "Endpoint profiles reset successfully".to_string(),
5259 ))
5260}
5261
5262async fn api_schemas_reset_handler(State(state): State<AdminState>) -> impl IntoResponse {
5264 state.handler.metrics().reset_schemas();
5267 info!("Schema learner reset");
5268 wrap_response(crate::api::ApiResponse::ok(
5269 "Schema learner reset successfully".to_string(),
5270 ))
5271}
5272
5273fn wrap_response<T: serde::Serialize>(response: ApiResponse<T>) -> Response {
5275 if response.success {
5276 return (StatusCode::OK, Json(response)).into_response();
5277 }
5278
5279 let detail = response
5280 .error
5281 .unwrap_or_else(|| "Request failed".to_string());
5282 let mut problem = ProblemDetails::new(StatusCode::INTERNAL_SERVER_ERROR, detail);
5283 problem.code = Some(error_codes::INTERNAL_ERROR.to_string());
5284 (StatusCode::INTERNAL_SERVER_ERROR, Json(problem)).into_response()
5285}
5286
5287#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5289pub struct TestResult {
5290 pub success: bool,
5291 pub message: String,
5292}
5293
5294#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5296pub struct RestartResult {
5297 pub success: bool,
5298 pub message: String,
5299}
5300
5301#[derive(Debug, Deserialize)]
5303pub struct EvaluateRequest {
5304 pub method: String,
5306 pub uri: String,
5308 #[serde(default)]
5310 pub headers: Vec<(String, String)>,
5311 #[serde(default)]
5313 pub body: Option<String>,
5314 #[serde(default = "default_client_ip")]
5316 pub client_ip: String,
5317}
5318
5319fn default_client_ip() -> String {
5320 "127.0.0.1".to_string()
5321}
5322
5323const DEFAULT_TRACE_MAX_EVENTS: usize = 2000;
5324
5325fn default_trace_max_events() -> usize {
5326 DEFAULT_TRACE_MAX_EVENTS
5327}
5328
5329#[derive(Debug, Deserialize)]
5330#[serde(rename_all = "snake_case")]
5331struct DebuggerOptions {
5332 #[serde(default = "default_trace_max_events")]
5333 max_events: usize,
5334}
5335
5336#[derive(Debug, Deserialize)]
5337#[serde(tag = "type", rename_all = "snake_case")]
5338enum DebuggerCommand {
5339 Evaluate {
5340 id: Option<String>,
5341 request: EvaluateRequest,
5342 options: Option<DebuggerOptions>,
5343 },
5344}
5345
5346#[derive(Debug, Serialize)]
5347#[serde(tag = "type", rename_all = "snake_case")]
5348enum DebuggerMessage {
5349 Event {
5350 id: String,
5351 event: TraceEvent,
5352 },
5353 Done {
5354 id: String,
5355 result: DebuggerDonePayload,
5356 },
5357 Error {
5358 id: Option<String>,
5359 message: String,
5360 },
5361}
5362
5363#[derive(Debug, Serialize)]
5364struct DebuggerDonePayload {
5365 blocked: bool,
5366 risk_score: u16,
5367 matched_rules: Vec<u32>,
5368 block_reason: Option<String>,
5369 detection_time_us: u64,
5370 action: String,
5371 verdict: String,
5372}
5373
5374struct ChannelTraceSink {
5375 sender: mpsc::UnboundedSender<TraceEvent>,
5376 max_events: usize,
5377 emitted: usize,
5378 truncated: bool,
5379}
5380
5381impl ChannelTraceSink {
5382 fn new(sender: mpsc::UnboundedSender<TraceEvent>, max_events: usize) -> Self {
5383 Self {
5384 sender,
5385 max_events: max_events.max(1),
5386 emitted: 0,
5387 truncated: false,
5388 }
5389 }
5390}
5391
5392impl TraceSink for ChannelTraceSink {
5393 fn record(&mut self, event: TraceEvent) {
5394 let is_terminal = matches!(event, TraceEvent::EvaluationFinished { .. });
5395
5396 if self.emitted >= self.max_events && !is_terminal {
5397 if !self.truncated {
5398 self.truncated = true;
5399 let _ = self.sender.send(TraceEvent::Truncated {
5400 limit: self.max_events,
5401 });
5402 }
5403 return;
5404 }
5405
5406 self.emitted += 1;
5407 let _ = self.sender.send(event);
5408 }
5409}
5410
5411async fn waf_debugger_ws_handler(
5412 State(state): State<AdminState>,
5413 ws: WebSocketUpgrade,
5414) -> impl IntoResponse {
5415 ws.on_upgrade(|socket| handle_waf_debugger_socket(state, socket))
5416}
5417
5418async fn handle_waf_debugger_socket(state: AdminState, socket: WebSocket) {
5419 let (mut sender, mut receiver) = socket.split();
5420
5421 while let Some(message) = receiver.next().await {
5422 let message = match message {
5423 Ok(msg) => msg,
5424 Err(_) => break,
5425 };
5426
5427 match message {
5428 Message::Text(text) => {
5429 let command: DebuggerCommand = match serde_json::from_str(&text) {
5430 Ok(cmd) => cmd,
5431 Err(err) => {
5432 let _ = send_debugger_message(
5433 &mut sender,
5434 DebuggerMessage::Error {
5435 id: None,
5436 message: format!("Invalid command: {err}"),
5437 },
5438 )
5439 .await;
5440 continue;
5441 }
5442 };
5443
5444 match command {
5445 DebuggerCommand::Evaluate {
5446 id,
5447 request,
5448 options,
5449 } => {
5450 let request_id =
5451 id.unwrap_or_else(|| format!("trace_{}", fastrand::u64(..)));
5452 let max_events = options
5453 .map(|opts| opts.max_events)
5454 .unwrap_or(DEFAULT_TRACE_MAX_EVENTS);
5455
5456 if run_trace_session(&state, &mut sender, request_id, request, max_events)
5457 .await
5458 .is_err()
5459 {
5460 break;
5461 }
5462 }
5463 }
5464 }
5465 Message::Ping(payload) => {
5466 let _ = sender.send(Message::Pong(payload)).await;
5467 }
5468 Message::Close(_) => break,
5469 _ => {}
5470 }
5471 }
5472}
5473
5474async fn run_trace_session(
5475 state: &AdminState,
5476 sender: &mut futures_util::stream::SplitSink<WebSocket, Message>,
5477 request_id: String,
5478 request: EvaluateRequest,
5479 max_events: usize,
5480) -> Result<(), ()> {
5481 let body_bytes = decode_evaluate_body(request.body.as_deref());
5482 let (event_tx, mut event_rx) = mpsc::unbounded_channel();
5483 let handler = Arc::clone(&state.handler);
5484 let method = request.method.clone();
5485 let uri = request.uri.clone();
5486 let headers = request.headers.clone();
5487 let client_ip = request.client_ip.clone();
5488
5489 let mut handle = tokio::task::spawn_blocking(move || {
5490 let mut sink = ChannelTraceSink::new(event_tx, max_events);
5491 handler.evaluate_request_trace(
5492 &method,
5493 &uri,
5494 &headers,
5495 body_bytes.as_deref(),
5496 &client_ip,
5497 &mut sink,
5498 )
5499 });
5500
5501 loop {
5502 tokio::select! {
5503 event = event_rx.recv() => {
5504 if let Some(event) = event {
5505 send_debugger_message(sender, DebuggerMessage::Event {
5506 id: request_id.clone(),
5507 event,
5508 }).await.map_err(|_| ())?;
5509 }
5510 }
5511 result = &mut handle => {
5512 let result = match result {
5513 Ok(result) => result,
5514 Err(err) => {
5515 send_debugger_message(sender, DebuggerMessage::Error {
5516 id: Some(request_id.clone()),
5517 message: format!("Evaluation failed: {err}"),
5518 }).await.map_err(|_| ())?;
5519 return Ok(());
5520 }
5521 };
5522
5523 while let Ok(event) = event_rx.try_recv() {
5524 send_debugger_message(sender, DebuggerMessage::Event {
5525 id: request_id.clone(),
5526 event,
5527 }).await.map_err(|_| ())?;
5528 }
5529
5530 let Some(result) = result else {
5531 send_debugger_message(sender, DebuggerMessage::Error {
5532 id: Some(request_id.clone()),
5533 message: "WAF evaluation unavailable".to_string(),
5534 }).await.map_err(|_| ())?;
5535 return Ok(());
5536 };
5537
5538 let verdict = if result.blocked {
5539 "block"
5540 } else if result.risk_score > 50 {
5541 "warn"
5542 } else {
5543 "pass"
5544 };
5545
5546 let done = DebuggerDonePayload {
5547 blocked: result.blocked,
5548 risk_score: result.risk_score,
5549 matched_rules: result.matched_rules,
5550 block_reason: result.block_reason,
5551 detection_time_us: result.detection_time_us,
5552 action: verdict.to_string(),
5553 verdict: verdict.to_string(),
5554 };
5555
5556 send_debugger_message(sender, DebuggerMessage::Done {
5557 id: request_id,
5558 result: done,
5559 }).await.map_err(|_| ())?;
5560
5561 return Ok(());
5562 }
5563 }
5564 }
5565}
5566
5567async fn send_debugger_message(
5568 sender: &mut futures_util::stream::SplitSink<WebSocket, Message>,
5569 message: DebuggerMessage,
5570) -> Result<(), ()> {
5571 let payload = serde_json::to_string(&message).map_err(|_| ())?;
5572 sender.send(Message::Text(payload)).await.map_err(|_| ())
5573}
5574
5575async fn sensor_evaluate_handler(
5603 State(state): State<AdminState>,
5604 Json(request): Json<EvaluateRequest>,
5605) -> impl IntoResponse {
5606 let body_bytes = decode_evaluate_body(request.body.as_deref());
5607
5608 match state.handler.evaluate_request(
5610 &request.method,
5611 &request.uri,
5612 &request.headers,
5613 body_bytes.as_deref(),
5614 &request.client_ip,
5615 ) {
5616 Some(result) => {
5617 let verdict = if result.blocked {
5619 "block"
5620 } else if result.risk_score > 50 {
5621 "warn"
5622 } else {
5623 "pass"
5624 };
5625
5626 (
5627 StatusCode::OK,
5628 Json(serde_json::json!({
5629 "blocked": result.blocked,
5630 "riskScore": result.risk_score,
5631 "matchedRules": result.matched_rules,
5632 "blockReason": result.block_reason,
5633 "detectionTimeUs": result.detection_time_us,
5634 "verdict": verdict,
5635 "input": {
5636 "method": request.method,
5637 "uri": request.uri,
5638 "headerCount": request.headers.len(),
5639 "bodyLength": body_bytes.as_ref().map(|b| b.len()).unwrap_or(0),
5640 "clientIp": request.client_ip
5641 }
5642 })),
5643 )
5644 .into_response()
5645 }
5646 None => {
5647 service_unavailable("WAF evaluation")
5649 }
5650 }
5651}
5652
5653fn decode_evaluate_body(body: Option<&str>) -> Option<Vec<u8>> {
5654 body.map(|value| match base64_decode(value) {
5655 Ok(decoded) => decoded,
5656 Err(_) => value.as_bytes().to_vec(),
5657 })
5658}
5659
5660fn base64_decode(input: &str) -> Result<Vec<u8>, ()> {
5662 const BASE64_CHARS: &[u8] =
5665 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
5666
5667 let trimmed = input.trim();
5669 if trimmed.is_empty() {
5670 return Ok(Vec::new());
5671 }
5672
5673 if !trimmed.bytes().all(|b| BASE64_CHARS.contains(&b)) {
5675 return Err(());
5676 }
5677
5678 let mut output = Vec::with_capacity(trimmed.len() * 3 / 4);
5680 let mut buffer: u32 = 0;
5681 let mut bits_collected: u32 = 0;
5682
5683 for byte in trimmed.bytes() {
5684 if byte == b'=' {
5685 break;
5686 }
5687
5688 let value = match byte {
5689 b'A'..=b'Z' => byte - b'A',
5690 b'a'..=b'z' => byte - b'a' + 26,
5691 b'0'..=b'9' => byte - b'0' + 52,
5692 b'+' => 62,
5693 b'/' => 63,
5694 _ => return Err(()),
5695 };
5696
5697 buffer = (buffer << 6) | (value as u32);
5698 bits_collected += 6;
5699
5700 if bits_collected >= 8 {
5701 bits_collected -= 8;
5702 output.push((buffer >> bits_collected) as u8);
5703 buffer &= (1 << bits_collected) - 1;
5704 }
5705 }
5706
5707 Ok(output)
5708}
5709
5710async fn api_profiles_list_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5716 let profiles = get_profiles();
5718
5719 let now_ms = std::time::SystemTime::now()
5720 .duration_since(std::time::UNIX_EPOCH)
5721 .map(|d| d.as_millis() as u64)
5722 .unwrap_or(0);
5723
5724 let profiles_json: Vec<serde_json::Value> = profiles
5725 .iter()
5726 .map(|p| {
5727 serde_json::json!({
5728 "template": p.template,
5729 "sampleCount": p.sample_count,
5730 "firstSeenMs": p.first_seen_ms,
5731 "lastUpdatedMs": p.last_updated_ms,
5732 "payloadSize": {
5733 "mean": p.payload_size.mean(),
5734 "variance": p.payload_size.variance(),
5735 "stdDev": p.payload_size.stddev(),
5736 "count": p.payload_size.count()
5737 },
5738 "expectedParams": p.expected_params,
5739 "contentTypes": p.content_types,
5740 "statusCodes": p.status_codes,
5741 "endpointRisk": p.endpoint_risk,
5742 "currentRps": p.request_rate.current_rate(now_ms)
5743 })
5744 })
5745 .collect();
5746
5747 (
5748 StatusCode::OK,
5749 Json(serde_json::json!({
5750 "success": true,
5751 "data": {
5752 "profiles": profiles_json,
5753 "count": profiles_json.len()
5754 }
5755 })),
5756 )
5757}
5758
5759async fn api_profiles_detail_handler(
5761 State(_state): State<AdminState>,
5762 Path(template): Path<String>,
5763) -> impl IntoResponse {
5764 let decoded_template = urlencoding::decode(&template)
5766 .map(|s| s.into_owned())
5767 .unwrap_or(template);
5768
5769 let profiles = get_profiles();
5770
5771 let now_ms = std::time::SystemTime::now()
5772 .duration_since(std::time::UNIX_EPOCH)
5773 .map(|d| d.as_millis() as u64)
5774 .unwrap_or(0);
5775
5776 match profiles.iter().find(|p| p.template == decoded_template) {
5777 Some(p) => (
5778 StatusCode::OK,
5779 Json(serde_json::json!({
5780 "success": true,
5781 "data": {
5782 "template": p.template,
5783 "sampleCount": p.sample_count,
5784 "firstSeenMs": p.first_seen_ms,
5785 "lastUpdatedMs": p.last_updated_ms,
5786 "payloadSize": {
5787 "mean": p.payload_size.mean(),
5788 "variance": p.payload_size.variance(),
5789 "stdDev": p.payload_size.stddev(),
5790 "count": p.payload_size.count()
5791 },
5792 "expectedParams": p.expected_params,
5793 "contentTypes": p.content_types,
5794 "statusCodes": p.status_codes,
5795 "endpointRisk": p.endpoint_risk,
5796 "requestRate": {
5797 "currentRps": p.request_rate.current_rate(now_ms),
5798 "windowMs": 60000
5799 }
5800 }
5801 })),
5802 )
5803 .into_response(),
5804 None => not_found_error("Profile", &decoded_template),
5805 }
5806}
5807
5808async fn api_schemas_list_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5810 let schemas = get_schemas();
5811
5812 let schemas_json: Vec<serde_json::Value> = schemas
5813 .iter()
5814 .map(|s| {
5815 serde_json::json!({
5816 "template": s.template,
5817 "sampleCount": s.sample_count,
5818 "lastUpdatedMs": s.last_updated_ms,
5819 "version": s.version,
5820 "requestFieldCount": s.request_schema.len(),
5821 "responseFieldCount": s.response_schema.len()
5822 })
5823 })
5824 .collect();
5825
5826 (
5827 StatusCode::OK,
5828 Json(serde_json::json!({
5829 "success": true,
5830 "data": {
5831 "schemas": schemas_json,
5832 "count": schemas_json.len()
5833 }
5834 })),
5835 )
5836}
5837
5838async fn api_schemas_detail_handler(
5840 State(_state): State<AdminState>,
5841 Path(template): Path<String>,
5842) -> impl IntoResponse {
5843 let decoded_template = urlencoding::decode(&template)
5845 .map(|s| s.into_owned())
5846 .unwrap_or(template);
5847
5848 let schemas = get_schemas();
5849
5850 match schemas.iter().find(|s| s.template == decoded_template) {
5851 Some(s) => (
5852 StatusCode::OK,
5853 Json(serde_json::json!({
5854 "success": true,
5855 "data": {
5856 "template": s.template,
5857 "sampleCount": s.sample_count,
5858 "lastUpdatedMs": s.last_updated_ms,
5859 "version": s.version,
5860 "requestSchema": s.request_schema.iter().map(|(k, v)| {
5861 (k.clone(), serde_json::json!({
5862 "dominantType": format!("{:?}", v.dominant_type()),
5863 "seenCount": v.seen_count
5864 }))
5865 }).collect::<serde_json::Map<String, serde_json::Value>>(),
5866 "responseSchema": s.response_schema.iter().map(|(k, v)| {
5867 (k.clone(), serde_json::json!({
5868 "dominantType": format!("{:?}", v.dominant_type()),
5869 "seenCount": v.seen_count
5870 }))
5871 }).collect::<serde_json::Map<String, serde_json::Value>>()
5872 }
5873 })),
5874 )
5875 .into_response(),
5876 None => not_found_error("Schema", &decoded_template),
5877 }
5878}
5879
5880#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
5886struct DlpConfigRequest {
5887 enabled: Option<bool>,
5888 fast_mode: Option<bool>,
5889 scan_text_only: Option<bool>,
5890 max_scan_size: Option<usize>,
5891 max_body_inspection_bytes: Option<usize>,
5892 max_matches: Option<usize>,
5893 custom_keywords: Option<Vec<String>>,
5894}
5895
5896async fn config_dlp_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5898 (
5899 StatusCode::OK,
5900 Json(serde_json::json!({
5901 "success": true,
5902 "data": {
5903 "enabled": true,
5904 "fast_mode": false,
5905 "scan_text_only": true,
5906 "max_scan_size": 5242880,
5907 "max_body_inspection_bytes": 8192,
5908 "max_matches": 100,
5909 "custom_keywords": []
5910 }
5911 })),
5912 )
5913}
5914
5915async fn config_dlp_put_handler(
5917 State(_state): State<AdminState>,
5918 Json(config): Json<DlpConfigRequest>,
5919) -> impl IntoResponse {
5920 info!(
5922 "DLP config update: enabled={:?}, fast_mode={:?}",
5923 config.enabled, config.fast_mode
5924 );
5925 (
5926 StatusCode::OK,
5927 Json(serde_json::json!({
5928 "success": true,
5929 "message": "DLP configuration updated"
5930 })),
5931 )
5932}
5933
5934#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
5936struct BlockPageConfigRequest {
5937 company_name: Option<String>,
5938 support_email: Option<String>,
5939 logo_url: Option<String>,
5940 show_request_id: Option<bool>,
5941 show_timestamp: Option<bool>,
5942 show_client_ip: Option<bool>,
5943 show_rule_id: Option<bool>,
5944 custom_css: Option<String>,
5945}
5946
5947async fn config_block_page_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
5949 (
5950 StatusCode::OK,
5951 Json(serde_json::json!({
5952 "success": true,
5953 "data": {
5954 "company_name": null,
5955 "support_email": null,
5956 "logo_url": null,
5957 "show_request_id": true,
5958 "show_timestamp": true,
5959 "show_client_ip": false,
5960 "show_rule_id": false,
5961 "custom_css": null
5962 }
5963 })),
5964 )
5965}
5966
5967async fn config_block_page_put_handler(
5969 State(_state): State<AdminState>,
5970 Json(config): Json<BlockPageConfigRequest>,
5971) -> impl IntoResponse {
5972 info!(
5973 "Block page config update: company={:?}",
5974 config.company_name
5975 );
5976 (
5977 StatusCode::OK,
5978 Json(serde_json::json!({
5979 "success": true,
5980 "message": "Block page configuration updated"
5981 })),
5982 )
5983}
5984
5985#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
5987struct CrawlerConfigRequest {
5988 enabled: Option<bool>,
5989 verify_legitimate_crawlers: Option<bool>,
5990 block_bad_bots: Option<bool>,
5991 dns_failure_policy: Option<String>,
5992 dns_cache_ttl_secs: Option<u64>,
5993 dns_timeout_ms: Option<u64>,
5994 max_concurrent_dns_lookups: Option<usize>,
5995 dns_failure_risk_penalty: Option<u32>,
5996}
5997
5998async fn config_crawler_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
6000 (
6001 StatusCode::OK,
6002 Json(serde_json::json!({
6003 "success": true,
6004 "data": {
6005 "enabled": true,
6006 "verify_legitimate_crawlers": true,
6007 "block_bad_bots": true,
6008 "dns_failure_policy": "apply_risk_penalty",
6009 "dns_cache_ttl_secs": 300,
6010 "dns_timeout_ms": 2000,
6011 "max_concurrent_dns_lookups": 100,
6012 "dns_failure_risk_penalty": 20
6013 }
6014 })),
6015 )
6016}
6017
6018async fn config_crawler_put_handler(
6020 State(_state): State<AdminState>,
6021 Json(config): Json<CrawlerConfigRequest>,
6022) -> impl IntoResponse {
6023 info!(
6024 "Crawler config update: enabled={:?}, block_bad_bots={:?}",
6025 config.enabled, config.block_bad_bots
6026 );
6027 (
6028 StatusCode::OK,
6029 Json(serde_json::json!({
6030 "success": true,
6031 "message": "Crawler configuration updated"
6032 })),
6033 )
6034}
6035
6036#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
6038struct TarpitConfigRequest {
6039 enabled: Option<bool>,
6040 base_delay_ms: Option<u64>,
6041 max_delay_ms: Option<u64>,
6042 progressive_multiplier: Option<f64>,
6043 max_concurrent_tarpits: Option<usize>,
6044 decay_threshold_ms: Option<u64>,
6045}
6046
6047async fn config_tarpit_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
6049 (
6050 StatusCode::OK,
6051 Json(serde_json::json!({
6052 "success": true,
6053 "data": {
6054 "enabled": true,
6055 "base_delay_ms": 1000,
6056 "max_delay_ms": 30000,
6057 "progressive_multiplier": 1.5,
6058 "max_concurrent_tarpits": 1000,
6059 "decay_threshold_ms": 300000
6060 }
6061 })),
6062 )
6063}
6064
6065async fn config_tarpit_put_handler(
6067 State(_state): State<AdminState>,
6068 Json(config): Json<TarpitConfigRequest>,
6069) -> impl IntoResponse {
6070 info!(
6071 "Tarpit config update: enabled={:?}, base_delay={:?}ms",
6072 config.enabled, config.base_delay_ms
6073 );
6074 (
6075 StatusCode::OK,
6076 Json(serde_json::json!({
6077 "success": true,
6078 "message": "Tarpit configuration updated"
6079 })),
6080 )
6081}
6082
6083#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
6085struct TravelConfigRequest {
6086 max_speed_kmh: Option<f64>,
6087 min_distance_km: Option<f64>,
6088 history_window_ms: Option<u64>,
6089 max_history_per_user: Option<usize>,
6090}
6091
6092async fn config_travel_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
6094 (
6095 StatusCode::OK,
6096 Json(serde_json::json!({
6097 "success": true,
6098 "data": {
6099 "max_speed_kmh": 800.0,
6100 "min_distance_km": 100.0,
6101 "history_window_ms": 86400000,
6102 "max_history_per_user": 100
6103 }
6104 })),
6105 )
6106}
6107
6108async fn config_travel_put_handler(
6110 State(_state): State<AdminState>,
6111 Json(config): Json<TravelConfigRequest>,
6112) -> impl IntoResponse {
6113 info!(
6114 "Travel config update: max_speed={:?}km/h",
6115 config.max_speed_kmh
6116 );
6117 (
6118 StatusCode::OK,
6119 Json(serde_json::json!({
6120 "success": true,
6121 "message": "Impossible travel configuration updated"
6122 })),
6123 )
6124}
6125
6126#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
6128struct EntityConfigRequest {
6129 enabled: Option<bool>,
6130 max_entities: Option<usize>,
6131 risk_decay_per_minute: Option<f64>,
6132 block_threshold: Option<f64>,
6133 max_risk: Option<f64>,
6134 max_rules_per_entity: Option<usize>,
6135}
6136
6137async fn config_entity_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
6139 (
6140 StatusCode::OK,
6141 Json(serde_json::json!({
6142 "success": true,
6143 "data": {
6144 "enabled": true,
6145 "max_entities": 100000,
6146 "risk_decay_per_minute": 10.0,
6147 "block_threshold": 70.0,
6148 "max_risk": 100.0,
6149 "max_rules_per_entity": 50
6150 }
6151 })),
6152 )
6153}
6154
6155async fn config_entity_put_handler(
6157 State(_state): State<AdminState>,
6158 Json(config): Json<EntityConfigRequest>,
6159) -> impl IntoResponse {
6160 info!(
6161 "Entity config update: max_entities={:?}, block_threshold={:?}",
6162 config.max_entities, config.block_threshold
6163 );
6164 (
6165 StatusCode::OK,
6166 Json(serde_json::json!({
6167 "success": true,
6168 "message": "Entity store configuration updated"
6169 })),
6170 )
6171}
6172
6173async fn config_integrations_get_handler(State(_state): State<AdminState>) -> impl IntoResponse {
6175 let config = INTEGRATIONS_GETTER
6176 .read()
6177 .as_ref()
6178 .map(|getter| getter())
6179 .unwrap_or_else(|| IntegrationsConfig {
6180 horizon_hub_url: String::new(),
6181 horizon_api_key: String::new(),
6182 tunnel_url: String::new(),
6183 tunnel_api_key: String::new(),
6184 apparatus_url: String::new(),
6185 });
6186
6187 (
6188 StatusCode::OK,
6189 Json(serde_json::json!({
6190 "success": true,
6191 "data": config
6192 })),
6193 )
6194}
6195
6196async fn config_integrations_put_handler(
6198 State(_state): State<AdminState>,
6199 Json(config): Json<IntegrationsConfig>,
6200) -> impl IntoResponse {
6201 if let Some(setter) = INTEGRATIONS_SETTER.read().as_ref() {
6202 setter(config);
6203 (
6204 StatusCode::OK,
6205 Json(serde_json::json!({
6206 "success": true,
6207 "message": "Integrations configuration updated"
6208 })),
6209 )
6210 } else {
6211 (
6212 StatusCode::SERVICE_UNAVAILABLE,
6213 Json(serde_json::json!({
6214 "success": false,
6215 "message": "Configuration updates not supported by this sensor instance"
6216 })),
6217 )
6218 }
6219}
6220
6221const DEFAULT_KERNEL_KEYS: &[&str] = &[
6226 "net.core.somaxconn",
6227 "net.ipv4.tcp_max_syn_backlog",
6228 "net.ipv4.tcp_fin_timeout",
6229 "net.ipv4.tcp_keepalive_time",
6230 "vm.swappiness",
6231];
6232
6233#[derive(Debug, Deserialize)]
6234struct KernelConfigQuery {
6235 keys: Option<String>,
6236}
6237
6238#[derive(Debug)]
6239struct KernelConfigPayload {
6240 params: HashMap<String, String>,
6241 persist: bool,
6242 warnings: Vec<String>,
6243}
6244
6245fn is_valid_sysctl_key(key: &str) -> bool {
6246 !key.is_empty()
6247 && key
6248 .chars()
6249 .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_')
6250}
6251
6252fn is_valid_sysctl_value(value: &str) -> bool {
6253 !value.trim().is_empty() && !value.contains('\n') && !value.contains('\r')
6254}
6255
6256fn sysctl_path_for_key(key: &str) -> PathBuf {
6257 PathBuf::from("/proc/sys").join(key.replace('.', "/"))
6258}
6259
6260fn read_sysctl_value(key: &str) -> Result<String, String> {
6261 if !is_valid_sysctl_key(key) {
6262 return Err("invalid sysctl key".to_string());
6263 }
6264
6265 if cfg!(target_os = "linux") {
6266 let path = sysctl_path_for_key(key);
6267 if path.exists() {
6268 return std::fs::read_to_string(&path)
6269 .map(|value| value.trim().to_string())
6270 .map_err(|err| err.to_string());
6271 }
6272 }
6273
6274 let output = Command::new("sysctl")
6275 .arg("-n")
6276 .arg(key)
6277 .output()
6278 .map_err(|err| err.to_string())?;
6279
6280 if output.status.success() {
6281 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
6282 } else {
6283 Err(String::from_utf8_lossy(&output.stderr).trim().to_string())
6284 }
6285}
6286
6287fn write_sysctl_value(key: &str, value: &str) -> Result<(), String> {
6288 if !is_valid_sysctl_key(key) {
6289 return Err("invalid sysctl key".to_string());
6290 }
6291 if !is_valid_sysctl_value(value) {
6292 return Err("invalid sysctl value".to_string());
6293 }
6294
6295 if cfg!(target_os = "linux") {
6296 let path = sysctl_path_for_key(key);
6297 if path.exists() {
6298 return std::fs::write(&path, value).map_err(|err| err.to_string());
6299 }
6300 }
6301
6302 let assignment = format!("{}={}", key, value);
6303 let output = Command::new("sysctl")
6304 .arg("-w")
6305 .arg(&assignment)
6306 .output()
6307 .map_err(|err| err.to_string())?;
6308
6309 if output.status.success() {
6310 Ok(())
6311 } else {
6312 Err(String::from_utf8_lossy(&output.stderr).trim().to_string())
6313 }
6314}
6315
6316fn parse_kernel_payload(payload: serde_json::Value) -> KernelConfigPayload {
6317 let mut params = HashMap::new();
6318 let mut warnings = Vec::new();
6319 let mut persist = false;
6320
6321 match payload {
6322 serde_json::Value::Object(mut obj) => {
6323 if let Some(flag) = obj.remove("persist") {
6324 persist = flag.as_bool().unwrap_or(false);
6325 }
6326
6327 let params_value = obj
6328 .remove("params")
6329 .or_else(|| obj.remove("settings"))
6330 .unwrap_or(serde_json::Value::Object(obj));
6331
6332 if let serde_json::Value::Object(map) = params_value {
6333 for (key, value) in map {
6334 let string_value = match value {
6335 serde_json::Value::String(s) => s,
6336 serde_json::Value::Number(n) => n.to_string(),
6337 serde_json::Value::Bool(b) => b.to_string(),
6338 _other => {
6339 warnings.push(format!("Skipping non-scalar value for {}", key));
6340 continue;
6341 }
6342 };
6343 params.insert(key, string_value);
6344 }
6345 } else {
6346 warnings.push("Kernel config payload missing params object".to_string());
6347 }
6348 }
6349 _ => {
6350 warnings.push("Kernel config payload must be an object".to_string());
6351 }
6352 }
6353
6354 KernelConfigPayload {
6355 params,
6356 persist,
6357 warnings,
6358 }
6359}
6360
6361fn parse_kernel_keys(query: Option<String>) -> Vec<String> {
6362 if let Some(keys) = query {
6363 let parsed: Vec<String> = keys
6364 .split(',')
6365 .map(|k| k.trim().to_string())
6366 .filter(|k| !k.is_empty())
6367 .collect();
6368 if !parsed.is_empty() {
6369 return parsed;
6370 }
6371 }
6372 DEFAULT_KERNEL_KEYS
6373 .iter()
6374 .map(|k| (*k).to_string())
6375 .collect()
6376}
6377
6378fn load_sysctl_entries(path: &std::path::Path) -> HashMap<String, String> {
6379 let mut entries = HashMap::new();
6380 if let Ok(contents) = std::fs::read_to_string(path) {
6381 for line in contents.lines() {
6382 let line = line.trim();
6383 if line.is_empty() || line.starts_with('#') {
6384 continue;
6385 }
6386 if let Some((key, value)) = line.split_once('=') {
6387 entries.insert(key.trim().to_string(), value.trim().to_string());
6388 }
6389 }
6390 }
6391 entries
6392}
6393
6394fn build_sysctl_config_lines(entries: &HashMap<String, String>) -> String {
6395 let mut lines = Vec::new();
6396 lines.push("# Synapse Pingora managed sysctl settings".to_string());
6397 let mut keys: Vec<_> = entries.keys().cloned().collect();
6398 keys.sort();
6399 for key in keys {
6400 if let Some(value) = entries.get(&key) {
6401 lines.push(format!("{} = {}", key, value));
6402 }
6403 }
6404 lines.join("\n")
6405}
6406
6407fn persist_kernel_params(params: &HashMap<String, String>) -> Result<(), String> {
6408 if params.is_empty() {
6409 return Ok(());
6410 }
6411
6412 let path = std::env::var("SYNAPSE_SYSCTL_CONFIG_PATH")
6413 .unwrap_or_else(|_| "/etc/sysctl.d/synapse-pingora.conf".to_string());
6414 let path = std::path::Path::new(&path);
6415 let wal_path = path.with_extension("wal");
6416
6417 let mut entries = load_sysctl_entries(path);
6418 for (key, value) in params {
6419 entries.insert(key.clone(), value.clone());
6420 }
6421
6422 append_sysctl_wal(&wal_path, params)?;
6423 let contents = build_sysctl_config_lines(&entries);
6424 write_file_with_fsync(path, &contents)
6425 .map_err(|err| format!("Failed to persist sysctl settings: {}", err))?;
6426 clear_wal(&wal_path)?;
6427 Ok(())
6428}
6429
6430fn current_timestamp_ms() -> u64 {
6431 use std::time::{SystemTime, UNIX_EPOCH};
6432 SystemTime::now()
6433 .duration_since(UNIX_EPOCH)
6434 .map(|duration| duration.as_millis() as u64)
6435 .unwrap_or(0)
6436}
6437
6438fn append_sysctl_wal(
6439 path: &std::path::Path,
6440 params: &HashMap<String, String>,
6441) -> Result<(), String> {
6442 use std::io::Write;
6443
6444 if let Some(parent) = path.parent() {
6445 std::fs::create_dir_all(parent)
6446 .map_err(|err| format!("Failed to create WAL directory: {}", err))?;
6447 }
6448
6449 let mut file = std::fs::OpenOptions::new()
6450 .create(true)
6451 .append(true)
6452 .open(path)
6453 .map_err(|err| format!("Failed to open WAL file: {}", err))?;
6454
6455 let payload = serde_json::json!({
6456 "timestamp_ms": current_timestamp_ms(),
6457 "type": "sysctl_update",
6458 "params": params,
6459 });
6460 let serialized = serde_json::to_vec(&payload)
6461 .map_err(|err| format!("Failed to serialize WAL entry: {}", err))?;
6462 file.write_all(&serialized)
6463 .and_then(|_| file.write_all(b"\n"))
6464 .and_then(|_| file.sync_all())
6465 .map_err(|err| format!("Failed to persist WAL entry: {}", err))?;
6466
6467 Ok(())
6468}
6469
6470fn parse_sysctl_wal_params(value: &serde_json::Value) -> Option<HashMap<String, String>> {
6471 if value.get("type").and_then(|t| t.as_str()) != Some("sysctl_update") {
6472 return None;
6473 }
6474 let params = value.get("params")?.as_object()?;
6475 let mut parsed = HashMap::new();
6476 for (key, value) in params {
6477 let string_value = match value {
6478 serde_json::Value::String(s) => s.clone(),
6479 serde_json::Value::Number(n) => n.to_string(),
6480 serde_json::Value::Bool(b) => b.to_string(),
6481 _ => continue,
6482 };
6483 parsed.insert(key.clone(), string_value);
6484 }
6485 if parsed.is_empty() {
6486 None
6487 } else {
6488 Some(parsed)
6489 }
6490}
6491
6492fn recover_sysctl_wal() -> Result<(), String> {
6493 let path = std::env::var("SYNAPSE_SYSCTL_CONFIG_PATH")
6494 .unwrap_or_else(|_| "/etc/sysctl.d/synapse-pingora.conf".to_string());
6495 let path = std::path::Path::new(&path);
6496 let wal_path = path.with_extension("wal");
6497
6498 if !wal_path.exists() {
6499 return Ok(());
6500 }
6501
6502 let contents = std::fs::read_to_string(&wal_path)
6503 .map_err(|err| format!("Failed to read WAL file: {}", err))?;
6504 if contents.trim().is_empty() {
6505 return Ok(());
6506 }
6507
6508 let mut last_params: Option<HashMap<String, String>> = None;
6509 for line in contents.lines() {
6510 let value: serde_json::Value = match serde_json::from_str(line) {
6511 Ok(value) => value,
6512 Err(err) => {
6513 warn!("Skipping invalid sysctl WAL entry: {}", err);
6514 continue;
6515 }
6516 };
6517 if let Some(params) = parse_sysctl_wal_params(&value) {
6518 last_params = Some(params);
6519 }
6520 }
6521
6522 let Some(params) = last_params else {
6523 return Ok(());
6524 };
6525
6526 let mut entries = load_sysctl_entries(path);
6527 for (key, value) in params {
6528 entries.insert(key, value);
6529 }
6530
6531 let contents = build_sysctl_config_lines(&entries);
6532 write_file_with_fsync(path, &contents)
6533 .map_err(|err| format!("Failed to recover sysctl settings: {}", err))?;
6534 clear_wal(&wal_path)?;
6535 Ok(())
6536}
6537
6538fn write_file_with_fsync(path: &std::path::Path, contents: &str) -> Result<(), std::io::Error> {
6539 use std::io::Write;
6540
6541 let mut file = std::fs::OpenOptions::new()
6542 .create(true)
6543 .truncate(true)
6544 .write(true)
6545 .open(path)?;
6546 file.write_all(contents.as_bytes())?;
6547 file.sync_all()?;
6548 Ok(())
6549}
6550
6551fn clear_wal(path: &std::path::Path) -> Result<(), String> {
6552 use std::io::Write;
6553
6554 let mut file = std::fs::OpenOptions::new()
6555 .create(true)
6556 .truncate(true)
6557 .write(true)
6558 .open(path)
6559 .map_err(|err| format!("Failed to open WAL file: {}", err))?;
6560 file.write_all(b"")
6561 .and_then(|_| file.sync_all())
6562 .map_err(|err| format!("Failed to clear WAL file: {}", err))?;
6563 Ok(())
6564}
6565
6566async fn config_kernel_get_handler(
6568 Query(query): Query<KernelConfigQuery>,
6569 State(_state): State<AdminState>,
6570) -> impl IntoResponse {
6571 let keys = parse_kernel_keys(query.keys);
6572 let mut values = HashMap::new();
6573 let mut errors = HashMap::new();
6574
6575 for key in keys {
6576 match read_sysctl_value(&key) {
6577 Ok(value) => {
6578 values.insert(key, value);
6579 }
6580 Err(err) => {
6581 errors.insert(key, err);
6582 }
6583 }
6584 }
6585
6586 (
6587 StatusCode::OK,
6588 Json(serde_json::json!({
6589 "success": true,
6590 "data": {
6591 "parameters": values,
6592 "errors": errors
6593 }
6594 })),
6595 )
6596}
6597
6598async fn config_kernel_put_handler(
6600 State(_state): State<AdminState>,
6601 Json(payload): Json<serde_json::Value>,
6602) -> impl IntoResponse {
6603 let parsed = parse_kernel_payload(payload);
6604 if parsed.params.is_empty() {
6605 return (
6606 StatusCode::BAD_REQUEST,
6607 Json(serde_json::json!({
6608 "success": false,
6609 "error": "No kernel parameters provided",
6610 "warnings": parsed.warnings
6611 })),
6612 );
6613 }
6614
6615 let mut applied = HashMap::new();
6616 let mut failed = HashMap::new();
6617
6618 for (key, value) in parsed.params {
6619 if !is_valid_sysctl_key(&key) {
6620 failed.insert(key, "invalid sysctl key".to_string());
6621 continue;
6622 }
6623 if !is_valid_sysctl_value(&value) {
6624 failed.insert(key, "invalid sysctl value".to_string());
6625 continue;
6626 }
6627
6628 match write_sysctl_value(&key, &value) {
6629 Ok(()) => {
6630 record_log_with_source(
6631 "info",
6632 LogSource::System,
6633 format!("Kernel parameter {} set to {}", key, value),
6634 );
6635 applied.insert(key, value);
6636 }
6637 Err(err) => {
6638 record_log_with_source(
6639 "warn",
6640 LogSource::System,
6641 format!("Kernel parameter {} update failed: {}", key, err),
6642 );
6643 failed.insert(key, err);
6644 }
6645 }
6646 }
6647
6648 let mut persist_error = None;
6649 if parsed.persist && !applied.is_empty() {
6650 if let Err(err) = persist_kernel_params(&applied) {
6651 persist_error = Some(err);
6652 }
6653 }
6654
6655 (
6656 StatusCode::OK,
6657 Json(serde_json::json!({
6658 "success": failed.is_empty() && persist_error.is_none(),
6659 "data": {
6660 "applied": applied,
6661 "failed": failed,
6662 "persisted": parsed.persist && persist_error.is_none(),
6663 "persistError": persist_error
6664 },
6665 "warnings": parsed.warnings
6666 })),
6667 )
6668}
6669
6670#[cfg(test)]
6671mod tests {
6672 use super::*;
6673 use crate::horizon::{Severity, SignalType};
6674 use crate::intelligence::signal_manager::{SignalManager, SignalManagerConfig};
6675 use crate::intelligence::SignalQueryOptions;
6676 use axum::body::Body;
6677 use http::Request;
6678 use http_body_util::BodyExt;
6679 use std::num::NonZeroU32;
6680 use tempfile::tempdir;
6681 use tower::util::ServiceExt;
6682
6683 fn create_test_app() -> Router {
6684 use governor::{Quota, RateLimiter};
6685 use std::num::NonZeroU32;
6686
6687 let handler = Arc::new(ApiHandler::builder().build());
6688 let quota = Quota::per_minute(NonZeroU32::new(1000).unwrap());
6690 let admin_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
6691 let public_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
6692 let report_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
6693 let auth_failure_limiter = Arc::new(RateLimiter::keyed(quota));
6694 let state = AdminState {
6695 handler,
6696 admin_api_key: "test-key".to_string(),
6697 admin_auth_disabled: false,
6698 admin_scopes: scopes::ALL.iter().map(|s| (*s).to_string()).collect(),
6699 signal_permissions: Arc::new(SignalPermissions::default()),
6700 admin_rate_limiter,
6701 public_rate_limiter,
6702 report_rate_limiter,
6703 auth_failure_limiter,
6704 };
6705
6706 Router::new()
6707 .route("/health", get(health_handler))
6708 .route("/metrics", get(metrics_handler))
6709 .route("/sites", get(sites_handler))
6710 .route("/stats", get(stats_handler))
6711 .route("/waf/stats", get(waf_stats_handler))
6712 .route("/reload", post(reload_handler))
6713 .route("/test", post(test_handler))
6714 .route("/restart", post(restart_handler))
6715 .route("/", get(root_handler))
6716 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
6717 .layer(middleware::from_fn(security_headers))
6718 .with_state(state)
6719 }
6720
6721 fn create_test_app_with_sensor_report_with_scopes(
6722 scopes: Vec<String>,
6723 report_quota: NonZeroU32,
6724 ) -> (Router, Arc<SignalManager>) {
6725 use governor::{Quota, RateLimiter};
6726
6727 let signal_manager = Arc::new(SignalManager::new(SignalManagerConfig::default()));
6728 let signal_manager_ref = Arc::clone(&signal_manager);
6729 let handler = Arc::new(ApiHandler::builder().signal_manager(signal_manager).build());
6730 let quota = Quota::per_minute(NonZeroU32::new(1000).unwrap());
6731 let admin_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
6732 let public_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
6733 let report_rate_limiter = Arc::new(RateLimiter::keyed(Quota::per_minute(report_quota)));
6734 let auth_failure_limiter = Arc::new(RateLimiter::keyed(quota));
6735 let state = AdminState {
6736 handler,
6737 admin_api_key: "test-key".to_string(),
6738 admin_auth_disabled: false,
6739 admin_scopes: scopes,
6740 signal_permissions: Arc::new(SignalPermissions::default()),
6741 admin_rate_limiter,
6742 public_rate_limiter,
6743 report_rate_limiter,
6744 auth_failure_limiter,
6745 };
6746
6747 let router = Router::new()
6748 .route("/_sensor/report", post(sensor_report_handler))
6749 .route_layer(middleware::from_fn_with_state(
6750 state.clone(),
6751 require_sensor_write,
6752 ))
6753 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
6754 .route_layer(middleware::from_fn_with_state(
6755 state.clone(),
6756 rate_limit_admin,
6757 ))
6758 .with_state(state);
6759
6760 (router, signal_manager_ref)
6761 }
6762
6763 fn create_test_app_with_sensor_report() -> (Router, Arc<SignalManager>) {
6764 create_test_app_with_sensor_report_with_scopes(
6765 scopes::ALL.iter().map(|s| (*s).to_string()).collect(),
6766 NonZeroU32::new(1000).unwrap(),
6767 )
6768 }
6769
6770 #[tokio::test]
6771 async fn test_health_endpoint() {
6772 let app = create_test_app();
6773
6774 let response = app
6775 .oneshot(
6776 Request::builder()
6777 .uri("/health")
6778 .header("X-Admin-Key", "test-key")
6779 .body(Body::empty())
6780 .unwrap(),
6781 )
6782 .await
6783 .unwrap();
6784
6785 assert_eq!(response.status(), StatusCode::OK);
6786 }
6787
6788 #[test]
6789 fn test_recover_sysctl_wal_merges_entries() {
6790 let dir = tempdir().unwrap();
6791 let config_path = dir.path().join("sysctl.conf");
6792 let wal_path = config_path.with_extension("wal");
6793
6794 std::fs::write(&config_path, "net.ipv4.tcp_syncookies = 1\n").unwrap();
6795 let wal_entry = serde_json::json!({
6796 "timestamp_ms": 1,
6797 "type": "sysctl_update",
6798 "params": {
6799 "net.ipv4.tcp_syncookies": "0",
6800 "net.ipv4.ip_forward": "1"
6801 }
6802 });
6803 std::fs::write(&wal_path, format!("{}\n", wal_entry)).unwrap();
6804
6805 let previous = std::env::var("SYNAPSE_SYSCTL_CONFIG_PATH").ok();
6806 std::env::set_var(
6807 "SYNAPSE_SYSCTL_CONFIG_PATH",
6808 config_path.to_string_lossy().to_string(),
6809 );
6810
6811 let result = recover_sysctl_wal();
6812
6813 if let Some(value) = previous {
6814 std::env::set_var("SYNAPSE_SYSCTL_CONFIG_PATH", value);
6815 } else {
6816 std::env::remove_var("SYNAPSE_SYSCTL_CONFIG_PATH");
6817 }
6818
6819 result.unwrap();
6820
6821 let contents = std::fs::read_to_string(&config_path).unwrap();
6822 assert!(contents.contains("net.ipv4.tcp_syncookies = 0"));
6823 assert!(contents.contains("net.ipv4.ip_forward = 1"));
6824
6825 let wal_contents = std::fs::read_to_string(&wal_path).unwrap();
6826 assert!(wal_contents.trim().is_empty());
6827 }
6828
6829 #[tokio::test]
6830 async fn test_metrics_endpoint() {
6831 let app = create_test_app();
6832
6833 let response = app
6834 .oneshot(
6835 Request::builder()
6836 .uri("/metrics")
6837 .header("X-Admin-Key", "test-key")
6838 .body(Body::empty())
6839 .unwrap(),
6840 )
6841 .await
6842 .unwrap();
6843
6844 assert_eq!(response.status(), StatusCode::OK);
6845 }
6846
6847 #[tokio::test]
6848 async fn test_stats_endpoint() {
6849 let app = create_test_app();
6850
6851 let response = app
6852 .oneshot(
6853 Request::builder()
6854 .uri("/stats")
6855 .header("X-Admin-Key", "test-key")
6856 .body(Body::empty())
6857 .unwrap(),
6858 )
6859 .await
6860 .unwrap();
6861
6862 assert_eq!(response.status(), StatusCode::OK);
6863 }
6864
6865 #[tokio::test]
6866 async fn test_waf_stats_endpoint() {
6867 let app = create_test_app();
6868
6869 let response = app
6870 .oneshot(
6871 Request::builder()
6872 .uri("/waf/stats")
6873 .header("X-Admin-Key", "test-key")
6874 .body(Body::empty())
6875 .unwrap(),
6876 )
6877 .await
6878 .unwrap();
6879
6880 assert_eq!(response.status(), StatusCode::OK);
6881 }
6882
6883 #[tokio::test]
6884 async fn test_reload_endpoint() {
6885 let app = create_test_app();
6886
6887 let response = app
6888 .oneshot(
6889 Request::builder()
6890 .method("POST")
6891 .uri("/reload")
6892 .header("X-Admin-Key", "test-key")
6893 .body(Body::empty())
6894 .unwrap(),
6895 )
6896 .await
6897 .unwrap();
6898
6899 assert!(
6901 response.status() == StatusCode::OK
6902 || response.status() == StatusCode::INTERNAL_SERVER_ERROR
6903 );
6904 }
6905
6906 #[tokio::test]
6907 async fn test_test_endpoint() {
6908 let app = create_test_app();
6909
6910 let response = app
6911 .oneshot(
6912 Request::builder()
6913 .method("POST")
6914 .uri("/test")
6915 .header("X-Admin-Key", "test-key")
6916 .body(Body::empty())
6917 .unwrap(),
6918 )
6919 .await
6920 .unwrap();
6921
6922 assert_eq!(response.status(), StatusCode::OK);
6923 }
6924
6925 #[tokio::test]
6926 async fn test_restart_endpoint() {
6927 let app = create_test_app();
6928
6929 let response = app
6930 .oneshot(
6931 Request::builder()
6932 .method("POST")
6933 .uri("/restart")
6934 .header("X-Admin-Key", "test-key")
6935 .body(Body::empty())
6936 .unwrap(),
6937 )
6938 .await
6939 .unwrap();
6940
6941 assert_eq!(response.status(), StatusCode::OK);
6942 }
6943
6944 #[tokio::test]
6945 async fn test_root_endpoint() {
6946 let app = create_test_app();
6947
6948 let response = app
6949 .oneshot(
6950 Request::builder()
6951 .uri("/")
6952 .header("X-Admin-Key", "test-key")
6953 .body(Body::empty())
6954 .unwrap(),
6955 )
6956 .await
6957 .unwrap();
6958
6959 assert_eq!(response.status(), StatusCode::OK);
6960 }
6961
6962 #[tokio::test]
6963 async fn test_sensor_report_rejects_disallowed_signal_type() {
6964 let (app, _manager) = create_test_app_with_sensor_report();
6965
6966 let payload = serde_json::json!({
6967 "sensorId": "sensor-1",
6968 "timestamp": "2026-02-03T00:00:00Z",
6969 "actor": {
6970 "ip": "10.0.0.1",
6971 "fingerprint": null,
6972 "sessionId": null
6973 },
6974 "signal": {
6975 "type": "internal_only",
6976 "severity": "high",
6977 "details": {}
6978 },
6979 "request": null
6980 });
6981
6982 let response = app
6983 .oneshot(
6984 Request::builder()
6985 .method("POST")
6986 .uri("/_sensor/report")
6987 .header("Content-Type", "application/json")
6988 .header("X-Admin-Key", "test-key")
6989 .body(Body::from(payload.to_string()))
6990 .unwrap(),
6991 )
6992 .await
6993 .unwrap();
6994
6995 assert_eq!(response.status(), StatusCode::FORBIDDEN);
6996 }
6997
6998 #[tokio::test]
6999 async fn test_sensor_report_allows_known_signal_type() {
7000 let (app, _manager) = create_test_app_with_sensor_report();
7001
7002 let payload = serde_json::json!({
7003 "sensorId": "sensor-1",
7004 "timestamp": "2026-02-03T00:00:00Z",
7005 "actor": {
7006 "ip": "10.0.0.1",
7007 "fingerprint": null,
7008 "sessionId": null
7009 },
7010 "signal": {
7011 "type": "honeypot_hit",
7012 "severity": "high",
7013 "details": {}
7014 },
7015 "request": null
7016 });
7017
7018 let response = app
7019 .oneshot(
7020 Request::builder()
7021 .method("POST")
7022 .uri("/_sensor/report")
7023 .header("Content-Type", "application/json")
7024 .header("X-Admin-Key", "test-key")
7025 .body(Body::from(payload.to_string()))
7026 .unwrap(),
7027 )
7028 .await
7029 .unwrap();
7030
7031 assert_eq!(response.status(), StatusCode::OK);
7032 }
7033
7034 #[tokio::test]
7035 async fn test_sensor_report_requires_auth() {
7036 let (app, _manager) = create_test_app_with_sensor_report();
7037
7038 let payload = serde_json::json!({
7039 "sensorId": "sensor-1",
7040 "timestamp": "2026-02-03T00:00:00Z",
7041 "actor": { "ip": "10.0.0.1" },
7042 "signal": { "type": "honeypot_hit", "severity": "high", "details": {} }
7043 });
7044
7045 let response = app
7046 .oneshot(
7047 Request::builder()
7048 .method("POST")
7049 .uri("/_sensor/report")
7050 .header("Content-Type", "application/json")
7051 .body(Body::from(payload.to_string()))
7052 .unwrap(),
7053 )
7054 .await
7055 .unwrap();
7056
7057 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
7058 }
7059
7060 #[tokio::test]
7061 async fn test_sensor_report_requires_sensor_write_scope() {
7062 let (app, _manager) = create_test_app_with_sensor_report_with_scopes(
7063 vec![scopes::ADMIN_READ.to_string()],
7064 NonZeroU32::new(1000).unwrap(),
7065 );
7066
7067 let payload = serde_json::json!({
7068 "sensorId": "sensor-1",
7069 "timestamp": "2026-02-03T00:00:00Z",
7070 "actor": { "ip": "10.0.0.1" },
7071 "signal": { "type": "honeypot_hit", "severity": "high", "details": {} }
7072 });
7073
7074 let response = app
7075 .oneshot(
7076 Request::builder()
7077 .method("POST")
7078 .uri("/_sensor/report")
7079 .header("Content-Type", "application/json")
7080 .header("X-Admin-Key", "test-key")
7081 .body(Body::from(payload.to_string()))
7082 .unwrap(),
7083 )
7084 .await
7085 .unwrap();
7086
7087 assert_eq!(response.status(), StatusCode::FORBIDDEN);
7088 }
7089
7090 #[tokio::test]
7091 async fn test_sensor_report_rate_limits_per_sensor() {
7092 let (app, _manager) = create_test_app_with_sensor_report_with_scopes(
7093 scopes::ALL.iter().map(|s| (*s).to_string()).collect(),
7094 NonZeroU32::new(1).unwrap(),
7095 );
7096
7097 let payload = serde_json::json!({
7098 "sensorId": "sensor-1",
7099 "timestamp": "2026-02-03T00:00:00Z",
7100 "actor": { "ip": "10.0.0.1" },
7101 "signal": { "type": "honeypot_hit", "severity": "high", "details": {} }
7102 });
7103
7104 let first = app
7105 .clone()
7106 .oneshot(
7107 Request::builder()
7108 .method("POST")
7109 .uri("/_sensor/report")
7110 .header("Content-Type", "application/json")
7111 .header("X-Admin-Key", "test-key")
7112 .body(Body::from(payload.to_string()))
7113 .unwrap(),
7114 )
7115 .await
7116 .unwrap();
7117
7118 assert_eq!(first.status(), StatusCode::OK);
7119
7120 let second = app
7121 .oneshot(
7122 Request::builder()
7123 .method("POST")
7124 .uri("/_sensor/report")
7125 .header("Content-Type", "application/json")
7126 .header("X-Admin-Key", "test-key")
7127 .body(Body::from(payload.to_string()))
7128 .unwrap(),
7129 )
7130 .await
7131 .unwrap();
7132
7133 assert_eq!(second.status(), StatusCode::TOO_MANY_REQUESTS);
7134 }
7135
7136 #[tokio::test]
7137 async fn test_sensor_report_validation_error() {
7138 let (app, _manager) = create_test_app_with_sensor_report();
7139
7140 let payload = serde_json::json!({
7141 "sensorId": "sensor-1",
7142 "timestamp": "2026-02-03T00:00:00Z",
7143 "actor": { "ip": "not-an-ip" },
7144 "signal": { "type": "honeypot_hit", "severity": "high", "details": {} }
7145 });
7146
7147 let response = app
7148 .oneshot(
7149 Request::builder()
7150 .method("POST")
7151 .uri("/_sensor/report")
7152 .header("Content-Type", "application/json")
7153 .header("X-Admin-Key", "test-key")
7154 .body(Body::from(payload.to_string()))
7155 .unwrap(),
7156 )
7157 .await
7158 .unwrap();
7159
7160 let status = response.status();
7161 let body = response.into_body().collect().await.unwrap().to_bytes();
7162 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
7163
7164 assert_eq!(status, StatusCode::BAD_REQUEST);
7165 assert_eq!(json["code"], error_codes::VALIDATION_ERROR);
7166 }
7167
7168 #[test]
7169 fn test_signal_type_mapping() {
7170 use crate::signals::adapter::SignalAdapter;
7171
7172 let mut report = ApparatusReport {
7173 sensor_id: "sensor-1".to_string(),
7174 timestamp: "2026-02-03T00:00:00Z".to_string(),
7175 version: Some("1.0.0".to_string()),
7176 actor: ApparatusActor {
7177 ip: "1.2.3.4".to_string(),
7178 fingerprint: None,
7179 session_id: None,
7180 },
7181 signal: ApparatusSignal {
7182 signal_type: "honeypot_hit".to_string(),
7183 severity: "high".to_string(),
7184 details: serde_json::json!({}),
7185 },
7186 request: None,
7187 };
7188
7189 assert!(matches!(
7190 SignalAdapter::map_type(&report),
7191 SignalType::IpThreat
7192 ));
7193
7194 report.signal.signal_type = "trap_trigger".to_string();
7195 assert!(matches!(
7196 SignalAdapter::map_type(&report),
7197 SignalType::BotSignature
7198 ));
7199
7200 report.signal.signal_type = "protocol_probe".to_string();
7201 assert!(matches!(
7202 SignalAdapter::map_type(&report),
7203 SignalType::TemplateDiscovery
7204 ));
7205
7206 report.signal.signal_type = "dlp_match".to_string();
7207 assert!(matches!(
7208 SignalAdapter::map_type(&report),
7209 SignalType::SchemaViolation
7210 ));
7211 }
7212
7213 #[test]
7214 fn test_severity_mapping() {
7215 use crate::signals::adapter::SignalAdapter;
7216 assert!(matches!(SignalAdapter::map_severity("low"), Severity::Low));
7217 assert!(matches!(
7218 SignalAdapter::map_severity("medium"),
7219 Severity::Medium
7220 ));
7221 assert!(matches!(
7222 SignalAdapter::map_severity("high"),
7223 Severity::High
7224 ));
7225 assert!(matches!(
7226 SignalAdapter::map_severity("critical"),
7227 Severity::Critical
7228 ));
7229 assert!(matches!(
7230 SignalAdapter::map_severity("unknown"),
7231 Severity::Medium
7232 ));
7233 }
7234
7235 #[tokio::test]
7236 async fn test_sensor_report_records_metadata() {
7237 let (app, manager) = create_test_app_with_sensor_report();
7238
7239 let payload = serde_json::json!({
7240 "sensorId": "sensor-1",
7241 "timestamp": "2026-02-03T00:00:00Z",
7242 "actor": {
7243 "ip": "10.0.0.1",
7244 "fingerprint": "fp-123",
7245 "sessionId": "session-1"
7246 },
7247 "signal": {
7248 "type": "honeypot_hit",
7249 "severity": "high",
7250 "details": { "key": "value" }
7251 }
7252 });
7253
7254 let response = app
7255 .oneshot(
7256 Request::builder()
7257 .method("POST")
7258 .uri("/_sensor/report")
7259 .header("Content-Type", "application/json")
7260 .header("X-Admin-Key", "test-key")
7261 .body(Body::from(payload.to_string()))
7262 .unwrap(),
7263 )
7264 .await
7265 .unwrap();
7266
7267 assert_eq!(response.status(), StatusCode::OK);
7268
7269 let signals = manager.list_signals(SignalQueryOptions::default());
7270 assert!(!signals.is_empty());
7271 let metadata = &signals[0].metadata;
7272 assert_eq!(metadata["actor"]["fingerprint"], "fp-123");
7273 assert_eq!(metadata["actor"]["sessionId"], "session-1");
7274 assert_eq!(metadata["signal"]["severity"], "high");
7275 }
7276
7277 #[tokio::test]
7278 async fn test_sensor_report_ignores_horizon_client_error() {
7279 let (app, manager) = create_test_app_with_sensor_report();
7280
7281 let payload = serde_json::json!({
7282 "sensorId": "sensor-1",
7283 "timestamp": "2026-02-03T00:00:00Z",
7284 "actor": { "ip": "10.0.0.1" },
7285 "signal": { "type": "honeypot_hit", "severity": "high", "details": {} }
7286 });
7287
7288 let response = app
7289 .oneshot(
7290 Request::builder()
7291 .method("POST")
7292 .uri("/_sensor/report")
7293 .header("Content-Type", "application/json")
7294 .header("X-Admin-Key", "test-key")
7295 .body(Body::from(payload.to_string()))
7296 .unwrap(),
7297 )
7298 .await
7299 .unwrap();
7300
7301 assert_eq!(response.status(), StatusCode::OK);
7302 let signals = manager.list_signals(SignalQueryOptions::default());
7303 assert!(!signals.is_empty());
7304 }
7305
7306 #[tokio::test]
7307 async fn test_sensor_report_metadata_serialization_error() {
7308 let (app, manager) = create_test_app_with_sensor_report();
7309
7310 FORCE_METADATA_SERIALIZE_ERROR.with(|flag| flag.set(true));
7311
7312 let payload = serde_json::json!({
7313 "sensorId": "sensor-1",
7314 "timestamp": "2026-02-03T00:00:00Z",
7315 "actor": { "ip": "10.0.0.1" },
7316 "signal": { "type": "honeypot_hit", "severity": "high", "details": {} }
7317 });
7318
7319 let response = app
7320 .oneshot(
7321 Request::builder()
7322 .method("POST")
7323 .uri("/_sensor/report")
7324 .header("Content-Type", "application/json")
7325 .header("X-Admin-Key", "test-key")
7326 .body(Body::from(payload.to_string()))
7327 .unwrap(),
7328 )
7329 .await
7330 .unwrap();
7331
7332 FORCE_METADATA_SERIALIZE_ERROR.with(|flag| flag.set(false));
7333
7334 assert_eq!(response.status(), StatusCode::OK);
7335 let signals = manager.list_signals(SignalQueryOptions::default());
7336 assert!(!signals.is_empty());
7337 assert!(signals[0]
7338 .metadata
7339 .as_object()
7340 .map(|m| m.is_empty())
7341 .unwrap_or(false));
7342 }
7343
7344 #[test]
7345 fn test_apparatus_report_validation() {
7346 let valid_report = ApparatusReport {
7347 sensor_id: "valid-sensor".to_string(),
7348 timestamp: "2026-02-03T00:00:00Z".to_string(),
7349 version: Some("1.0.0".to_string()),
7350 actor: ApparatusActor {
7351 ip: "1.2.3.4".to_string(),
7352 fingerprint: Some("valid-fp".to_string()),
7353 session_id: Some("valid-session".to_string()),
7354 },
7355 signal: ApparatusSignal {
7356 signal_type: "honeypot_hit".to_string(),
7357 severity: "high".to_string(),
7358 details: serde_json::json!({"key": "value"}),
7359 },
7360 request: Some(ApparatusRequest {
7361 method: "GET".to_string(),
7362 path: "/api/test".to_string(),
7363 headers: Some(HashMap::new()),
7364 }),
7365 };
7366
7367 assert!(valid_report.validate().is_ok());
7368
7369 let mut report = valid_report.clone();
7371 report.sensor_id = "invalid sensor!".to_string();
7372 assert!(report.validate().is_err());
7373
7374 let mut report = valid_report.clone();
7376 report.actor.ip = "not-an-ip".to_string();
7377 assert!(report.validate().is_err());
7378
7379 let mut report = valid_report.clone();
7381 report.actor.fingerprint = Some("a".repeat(300));
7382 assert!(report.validate().is_err());
7383
7384 let mut report = valid_report.clone();
7386 report.signal.signal_type = "a".repeat(100);
7387 assert!(report.validate().is_err());
7388
7389 let mut report = valid_report.clone();
7391 report.signal.severity = "extreme".to_string();
7392 assert!(report.validate().is_err());
7393
7394 let mut report = valid_report.clone();
7396 report.signal.details = serde_json::json!({"data": "a".repeat(20000)});
7397 assert!(report.validate().is_err());
7398
7399 let mut report = valid_report.clone();
7401 report.request.as_mut().unwrap().path = "a".repeat(3000);
7402 assert!(report.validate().is_err());
7403 }
7404
7405 #[test]
7410 fn test_base64_decode_valid_input() {
7411 let result = base64_decode("SGVsbG8=");
7413 assert!(result.is_ok());
7414 assert_eq!(result.unwrap(), b"Hello");
7415 }
7416
7417 #[test]
7418 fn test_base64_decode_empty_input() {
7419 let result = base64_decode("");
7420 assert!(result.is_ok());
7421 assert_eq!(result.unwrap(), Vec::<u8>::new());
7422 }
7423
7424 #[test]
7425 fn test_base64_decode_whitespace_only() {
7426 let result = base64_decode(" ");
7427 assert!(result.is_ok());
7428 assert_eq!(result.unwrap(), Vec::<u8>::new());
7429 }
7430
7431 #[test]
7432 fn test_base64_decode_with_padding() {
7433 let result = base64_decode("SGk=");
7435 assert!(result.is_ok());
7436 assert_eq!(result.unwrap(), b"Hi");
7437 }
7438
7439 #[test]
7440 fn test_base64_decode_no_padding() {
7441 let result = base64_decode("TWFu");
7443 assert!(result.is_ok());
7444 assert_eq!(result.unwrap(), b"Man");
7445 }
7446
7447 #[test]
7448 fn test_base64_decode_double_padding() {
7449 let result = base64_decode("TQ==");
7451 assert!(result.is_ok());
7452 assert_eq!(result.unwrap(), b"M");
7453 }
7454
7455 #[test]
7456 fn test_base64_decode_with_plus_and_slash() {
7457 let result = base64_decode("LysvKw==");
7460 assert!(result.is_ok());
7461 assert_eq!(result.unwrap(), b"/+/+");
7462 }
7463
7464 #[test]
7465 fn test_base64_decode_invalid_characters() {
7466 let result = base64_decode("SGVsbG8h!");
7468 assert!(result.is_err());
7469 }
7470
7471 #[test]
7472 fn test_base64_decode_non_base64_string() {
7473 let result = base64_decode("Hello World");
7475 assert!(result.is_err());
7476 }
7477
7478 #[test]
7479 fn test_base64_decode_json_body() {
7480 let result = base64_decode("eyJrZXkiOiAidmFsdWUifQ==");
7483 assert!(result.is_ok());
7484 let decoded = String::from_utf8(result.unwrap()).unwrap();
7485 assert_eq!(decoded, "{\"key\": \"value\"}");
7486 }
7487
7488 #[test]
7489 fn test_base64_decode_binary_data() {
7490 let result = base64_decode("AAEC/w==");
7492 assert!(result.is_ok());
7493 assert_eq!(result.unwrap(), vec![0x00, 0x01, 0x02, 0xFF]);
7494 }
7495
7496 #[test]
7497 fn test_base64_decode_longer_string() {
7498 let encoded = "VGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIHRoZSBsYXp5IGRvZw==";
7500 let result = base64_decode(encoded);
7501 assert!(result.is_ok());
7502 let decoded = String::from_utf8(result.unwrap()).unwrap();
7503 assert_eq!(decoded, "The quick brown fox jumps over the lazy dog");
7504 }
7505
7506 #[test]
7511 fn test_evaluate_request_full_deserialization() {
7512 let json = r#"{
7513 "method": "POST",
7514 "uri": "/api/users?id=1",
7515 "headers": [["Content-Type", "application/json"], ["Authorization", "Bearer token"]],
7516 "body": "eyJ1c2VybmFtZSI6ICJ0ZXN0In0=",
7517 "client_ip": "192.168.1.100"
7518 }"#;
7519
7520 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7521
7522 assert_eq!(request.method, "POST");
7523 assert_eq!(request.uri, "/api/users?id=1");
7524 assert_eq!(request.headers.len(), 2);
7525 assert_eq!(request.headers[0].0, "Content-Type");
7526 assert_eq!(request.headers[0].1, "application/json");
7527 assert_eq!(request.headers[1].0, "Authorization");
7528 assert_eq!(request.headers[1].1, "Bearer token");
7529 assert_eq!(
7530 request.body,
7531 Some("eyJ1c2VybmFtZSI6ICJ0ZXN0In0=".to_string())
7532 );
7533 assert_eq!(request.client_ip, "192.168.1.100");
7534 }
7535
7536 #[test]
7537 fn test_evaluate_request_minimal() {
7538 let json = r#"{
7539 "method": "GET",
7540 "uri": "/api/health"
7541 }"#;
7542
7543 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7544
7545 assert_eq!(request.method, "GET");
7546 assert_eq!(request.uri, "/api/health");
7547 assert!(request.headers.is_empty()); assert!(request.body.is_none()); assert_eq!(request.client_ip, "127.0.0.1"); }
7551
7552 #[test]
7553 fn test_evaluate_request_with_empty_headers() {
7554 let json = r#"{
7555 "method": "DELETE",
7556 "uri": "/api/resource/123",
7557 "headers": []
7558 }"#;
7559
7560 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7561
7562 assert_eq!(request.method, "DELETE");
7563 assert_eq!(request.uri, "/api/resource/123");
7564 assert!(request.headers.is_empty());
7565 }
7566
7567 #[test]
7568 fn test_evaluate_request_with_null_body() {
7569 let json = r#"{
7570 "method": "PUT",
7571 "uri": "/api/update",
7572 "body": null
7573 }"#;
7574
7575 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7576
7577 assert_eq!(request.method, "PUT");
7578 assert!(request.body.is_none());
7579 }
7580
7581 #[test]
7582 fn test_evaluate_request_sql_injection_payload() {
7583 let json = r#"{
7584 "method": "GET",
7585 "uri": "/api/users?id=1' OR '1'='1",
7586 "client_ip": "10.0.0.42"
7587 }"#;
7588
7589 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7590
7591 assert_eq!(request.method, "GET");
7592 assert_eq!(request.uri, "/api/users?id=1' OR '1'='1");
7593 assert_eq!(request.client_ip, "10.0.0.42");
7594 }
7595
7596 #[test]
7597 fn test_evaluate_request_xss_payload() {
7598 let json = r#"{
7599 "method": "POST",
7600 "uri": "/api/comment",
7601 "body": "PHNjcmlwdD5hbGVydCgnWFNTJyk8L3NjcmlwdD4=",
7602 "headers": [["Content-Type", "text/html"]]
7603 }"#;
7604
7605 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7606
7607 assert_eq!(request.method, "POST");
7608 assert_eq!(request.uri, "/api/comment");
7609 let body = request.body.unwrap();
7611 let decoded = base64_decode(&body).unwrap();
7612 let decoded_str = String::from_utf8(decoded).unwrap();
7613 assert_eq!(decoded_str, "<script>alert('XSS')</script>");
7614 }
7615
7616 #[test]
7617 fn test_evaluate_request_path_traversal() {
7618 let json = r#"{
7619 "method": "GET",
7620 "uri": "/api/files/../../../etc/passwd"
7621 }"#;
7622
7623 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7624
7625 assert_eq!(request.uri, "/api/files/../../../etc/passwd");
7626 }
7627
7628 #[test]
7629 fn test_evaluate_request_many_headers() {
7630 let json = r#"{
7631 "method": "GET",
7632 "uri": "/api/test",
7633 "headers": [
7634 ["Accept", "application/json"],
7635 ["Accept-Encoding", "gzip, deflate"],
7636 ["Accept-Language", "en-US,en;q=0.9"],
7637 ["Cache-Control", "no-cache"],
7638 ["Connection", "keep-alive"],
7639 ["Host", "example.com"],
7640 ["User-Agent", "Mozilla/5.0"],
7641 ["X-Custom-Header", "custom-value"]
7642 ]
7643 }"#;
7644
7645 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7646
7647 assert_eq!(request.headers.len(), 8);
7648 assert_eq!(request.headers[0].0, "Accept");
7649 assert_eq!(request.headers[7].0, "X-Custom-Header");
7650 }
7651
7652 #[test]
7653 fn test_evaluate_request_ipv6_client() {
7654 let json = r#"{
7655 "method": "GET",
7656 "uri": "/api/test",
7657 "client_ip": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
7658 }"#;
7659
7660 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7661
7662 assert_eq!(request.client_ip, "2001:0db8:85a3:0000:0000:8a2e:0370:7334");
7663 }
7664
7665 #[test]
7666 fn test_evaluate_request_unicode_uri() {
7667 let json = r#"{
7668 "method": "GET",
7669 "uri": "/api/search?q=%E4%B8%AD%E6%96%87"
7670 }"#;
7671
7672 let request: EvaluateRequest = serde_json::from_str(json).unwrap();
7673
7674 assert_eq!(request.uri, "/api/search?q=%E4%B8%AD%E6%96%87");
7675 }
7676
7677 #[test]
7678 fn test_default_client_ip() {
7679 assert_eq!(default_client_ip(), "127.0.0.1");
7680 }
7681
7682 #[test]
7687 fn test_evaluation_result_serialization() {
7688 let result = EvaluationResult {
7689 blocked: true,
7690 risk_score: 85,
7691 matched_rules: vec![942100, 942190],
7692 block_reason: Some("SQL Injection detected".to_string()),
7693 detection_time_us: 1234,
7694 };
7695
7696 let json = serde_json::to_string(&result).unwrap();
7697
7698 assert!(json.contains("\"blocked\":true"));
7699 assert!(json.contains("\"risk_score\":85"));
7700 assert!(json.contains("942100"));
7701 assert!(json.contains("942190"));
7702 assert!(json.contains("SQL Injection detected"));
7703 assert!(json.contains("\"detection_time_us\":1234"));
7704 }
7705
7706 #[test]
7707 fn test_evaluation_result_no_block_reason() {
7708 let result = EvaluationResult {
7709 blocked: false,
7710 risk_score: 20,
7711 matched_rules: vec![],
7712 block_reason: None,
7713 detection_time_us: 500,
7714 };
7715
7716 let json = serde_json::to_string(&result).unwrap();
7717
7718 assert!(json.contains("\"blocked\":false"));
7719 assert!(json.contains("\"risk_score\":20"));
7720 assert!(json.contains("\"matched_rules\":[]"));
7721 assert!(json.contains("\"block_reason\":null"));
7722 }
7723
7724 #[tokio::test]
7725 async fn test_security_headers_on_api_response() {
7726 let app = create_test_app();
7727
7728 let response = app
7729 .oneshot(
7730 Request::builder()
7731 .uri("/health")
7732 .header("X-Admin-Key", "test-key")
7733 .body(Body::empty())
7734 .unwrap(),
7735 )
7736 .await
7737 .unwrap();
7738
7739 assert_eq!(response.status(), StatusCode::OK);
7740
7741 let headers = response.headers();
7743
7744 assert_eq!(
7746 headers
7747 .get(header::X_CONTENT_TYPE_OPTIONS)
7748 .map(|v| v.to_str().unwrap()),
7749 Some("nosniff"),
7750 "X-Content-Type-Options header missing or incorrect"
7751 );
7752
7753 assert_eq!(
7755 headers
7756 .get(header::X_FRAME_OPTIONS)
7757 .map(|v| v.to_str().unwrap()),
7758 Some("DENY"),
7759 "X-Frame-Options header missing or incorrect"
7760 );
7761
7762 assert_eq!(
7764 headers
7765 .get(header::REFERRER_POLICY)
7766 .map(|v| v.to_str().unwrap()),
7767 Some("strict-origin-when-cross-origin"),
7768 "Referrer-Policy header missing or incorrect"
7769 );
7770
7771 assert!(
7773 headers
7774 .get(header::CACHE_CONTROL)
7775 .map(|v| v.to_str().unwrap())
7776 .unwrap_or("")
7777 .contains("no-store"),
7778 "Cache-Control header should contain no-store"
7779 );
7780
7781 assert!(
7783 headers.get(header::CONTENT_SECURITY_POLICY).is_some(),
7784 "Content-Security-Policy header missing"
7785 );
7786
7787 assert!(
7789 headers.get("permissions-policy").is_some(),
7790 "Permissions-Policy header missing"
7791 );
7792 }
7793
7794 #[test]
7795 fn test_sanitized_error_does_not_leak_internal_details() {
7796 let internal_err = std::io::Error::new(
7798 std::io::ErrorKind::NotFound,
7799 "file '/etc/secret/key.pem' not found at line 42",
7800 );
7801
7802 let response = sanitized_error(
7803 StatusCode::INTERNAL_SERVER_ERROR,
7804 error_codes::INTERNAL_ERROR,
7805 "Configuration could not be loaded",
7806 Some(&internal_err),
7807 );
7808
7809 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
7810
7811 }
7813
7814 #[test]
7815 fn test_validation_error_format() {
7816 let response = validation_error(
7817 "Invalid input provided",
7818 Some(&"detailed parse error: unexpected token at position 42"),
7819 );
7820
7821 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
7822 }
7823
7824 fn create_test_app_with_console(scopes: Vec<String>, admin_auth_disabled: bool) -> Router {
7830 use governor::{Quota, RateLimiter};
7831 use std::num::NonZeroU32;
7832
7833 let handler = Arc::new(ApiHandler::builder().build());
7834 let quota = Quota::per_minute(NonZeroU32::new(1000).unwrap());
7835 let admin_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
7836 let public_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
7837 let report_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
7838 let auth_failure_limiter = Arc::new(RateLimiter::keyed(quota));
7839 let state = AdminState {
7840 handler,
7841 admin_api_key: "test-key".to_string(),
7842 admin_auth_disabled,
7843 admin_scopes: scopes,
7844 signal_permissions: Arc::new(SignalPermissions::default()),
7845 admin_rate_limiter,
7846 public_rate_limiter,
7847 report_rate_limiter,
7848 auth_failure_limiter,
7849 };
7850
7851 let admin_read_routes = Router::new()
7853 .route("/console", get(admin_console_handler))
7854 .route_layer(middleware::from_fn_with_state(
7855 state.clone(),
7856 require_admin_read,
7857 ))
7858 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
7859 .route_layer(middleware::from_fn_with_state(
7860 state.clone(),
7861 rate_limit_public,
7862 ));
7863
7864 let public_routes = Router::new().route("/health", get(health_handler));
7866
7867 Router::new()
7868 .merge(admin_read_routes)
7869 .merge(public_routes)
7870 .layer(middleware::from_fn(security_headers))
7871 .with_state(state)
7872 }
7873
7874 #[tokio::test]
7875 async fn test_console_requires_auth() {
7876 let app = create_test_app_with_console(vec![scopes::ADMIN_READ.to_string()], false);
7878
7879 let response = app
7881 .oneshot(
7882 Request::builder()
7883 .uri("/console")
7884 .body(Body::empty())
7885 .unwrap(),
7886 )
7887 .await
7888 .unwrap();
7889
7890 assert_eq!(
7891 response.status(),
7892 StatusCode::UNAUTHORIZED,
7893 "/console should return 401 without authentication"
7894 );
7895 }
7896
7897 #[tokio::test]
7898 async fn test_console_with_admin_read_scope() {
7899 let app = create_test_app_with_console(vec![scopes::ADMIN_READ.to_string()], false);
7901
7902 let response = app
7904 .oneshot(
7905 Request::builder()
7906 .uri("/console")
7907 .header("X-Admin-Key", "test-key")
7908 .body(Body::empty())
7909 .unwrap(),
7910 )
7911 .await
7912 .unwrap();
7913
7914 assert_eq!(
7915 response.status(),
7916 StatusCode::OK,
7917 "/console should return 200 with valid key and admin:read scope"
7918 );
7919 }
7920
7921 #[tokio::test]
7922 async fn test_console_without_admin_read_scope() {
7923 let app = create_test_app_with_console(vec![scopes::SENSOR_READ.to_string()], false);
7925
7926 let response = app
7928 .oneshot(
7929 Request::builder()
7930 .uri("/console")
7931 .header("X-Admin-Key", "test-key")
7932 .body(Body::empty())
7933 .unwrap(),
7934 )
7935 .await
7936 .unwrap();
7937
7938 assert_eq!(
7939 response.status(),
7940 StatusCode::FORBIDDEN,
7941 "/console should return 403 when admin:read scope is missing"
7942 );
7943 }
7944
7945 #[tokio::test]
7946 async fn test_health_remains_public() {
7947 let app = create_test_app_with_console(vec![], false);
7949
7950 let response = app
7952 .oneshot(
7953 Request::builder()
7954 .uri("/health")
7955 .body(Body::empty())
7956 .unwrap(),
7957 )
7958 .await
7959 .unwrap();
7960
7961 assert_eq!(
7962 response.status(),
7963 StatusCode::OK,
7964 "/health should remain public and return 200 without authentication"
7965 );
7966 }
7967
7968 #[tokio::test]
7969 async fn test_console_auth_disabled_allows_access() {
7970 let app = create_test_app_with_console(vec![], true);
7971
7972 let response = app
7973 .oneshot(
7974 Request::builder()
7975 .uri("/console")
7976 .body(Body::empty())
7977 .unwrap(),
7978 )
7979 .await
7980 .unwrap();
7981
7982 assert_eq!(
7983 response.status(),
7984 StatusCode::OK,
7985 "/console should return 200 when admin auth is disabled"
7986 );
7987 }
7988
7989 fn create_test_app_with_rules(scopes: Vec<String>) -> Router {
7995 use governor::{Quota, RateLimiter};
7996 use std::num::NonZeroU32;
7997
7998 let handler = Arc::new(ApiHandler::builder().build());
7999 let quota = Quota::per_minute(NonZeroU32::new(1000).unwrap());
8000 let admin_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
8001 let public_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
8002 let report_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
8003 let auth_failure_limiter = Arc::new(RateLimiter::keyed(quota));
8004 let state = AdminState {
8005 handler,
8006 admin_api_key: "test-key".to_string(),
8007 admin_auth_disabled: false,
8008 admin_scopes: scopes,
8009 signal_permissions: Arc::new(SignalPermissions::default()),
8010 admin_rate_limiter,
8011 public_rate_limiter,
8012 report_rate_limiter,
8013 auth_failure_limiter,
8014 };
8015
8016 let rules_read_routes = Router::new()
8017 .route("/_sensor/rules", get(sensor_rules_handler))
8018 .route_layer(middleware::from_fn_with_state(
8019 state.clone(),
8020 require_admin_read,
8021 ))
8022 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
8023 .route_layer(middleware::from_fn_with_state(
8024 state.clone(),
8025 rate_limit_admin,
8026 ));
8027
8028 let rules_write_routes = Router::new()
8029 .route("/_sensor/rules", post(sensor_rules_create_handler))
8030 .route(
8031 "/_sensor/rules/:rule_id",
8032 put(sensor_rules_update_handler).delete(sensor_rules_delete_handler),
8033 )
8034 .route_layer(middleware::from_fn_with_state(
8035 state.clone(),
8036 require_admin_write,
8037 ))
8038 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
8039 .route_layer(middleware::from_fn_with_state(
8040 state.clone(),
8041 rate_limit_admin,
8042 ));
8043
8044 Router::new()
8045 .merge(rules_read_routes)
8046 .merge(rules_write_routes)
8047 .with_state(state)
8048 }
8049
8050 fn create_test_app_with_kernel_config(scopes: Vec<String>) -> Router {
8052 use governor::{Quota, RateLimiter};
8053 use std::num::NonZeroU32;
8054
8055 let handler = Arc::new(ApiHandler::builder().build());
8056 let quota = Quota::per_minute(NonZeroU32::new(1000).unwrap());
8057 let admin_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
8058 let public_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
8059 let report_rate_limiter = Arc::new(RateLimiter::keyed(quota.clone()));
8060 let auth_failure_limiter = Arc::new(RateLimiter::keyed(quota));
8061 let state = AdminState {
8062 handler,
8063 admin_api_key: "test-key".to_string(),
8064 admin_auth_disabled: false,
8065 admin_scopes: scopes,
8066 signal_permissions: Arc::new(SignalPermissions::default()),
8067 admin_rate_limiter,
8068 public_rate_limiter,
8069 report_rate_limiter,
8070 auth_failure_limiter,
8071 };
8072
8073 let kernel_read_routes = Router::new()
8074 .route("/_sensor/config/kernel", get(config_kernel_get_handler))
8075 .route_layer(middleware::from_fn_with_state(
8076 state.clone(),
8077 require_admin_read,
8078 ))
8079 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
8080 .route_layer(middleware::from_fn_with_state(
8081 state.clone(),
8082 rate_limit_admin,
8083 ));
8084
8085 let kernel_write_routes = Router::new()
8086 .route("/_sensor/config/kernel", put(config_kernel_put_handler))
8087 .route_layer(middleware::from_fn_with_state(
8088 state.clone(),
8089 require_admin_write,
8090 ))
8091 .route_layer(middleware::from_fn_with_state(state.clone(), require_auth))
8092 .route_layer(middleware::from_fn_with_state(
8093 state.clone(),
8094 rate_limit_admin,
8095 ));
8096
8097 Router::new()
8098 .merge(kernel_read_routes)
8099 .merge(kernel_write_routes)
8100 .with_state(state)
8101 }
8102
8103 fn build_rule_payload() -> serde_json::Value {
8104 serde_json::json!({
8105 "name": "Block Admin",
8106 "type": "BLOCK",
8107 "enabled": true,
8108 "priority": 100,
8109 "conditions": [
8110 { "field": "path", "operator": "eq", "value": "/admin" }
8111 ],
8112 "actions": [
8113 { "type": "block" }
8114 ]
8115 })
8116 }
8117
8118 #[tokio::test]
8119 async fn test_rules_requires_auth() {
8120 let app = create_test_app_with_rules(vec![scopes::ADMIN_READ.to_string()]);
8121
8122 let response = app
8123 .oneshot(
8124 Request::builder()
8125 .uri("/_sensor/rules")
8126 .body(Body::empty())
8127 .unwrap(),
8128 )
8129 .await
8130 .unwrap();
8131
8132 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
8133 }
8134
8135 #[tokio::test]
8136 async fn test_rules_requires_admin_read_scope() {
8137 let app = create_test_app_with_rules(vec![]);
8138
8139 let response = app
8140 .oneshot(
8141 Request::builder()
8142 .uri("/_sensor/rules")
8143 .header("X-Admin-Key", "test-key")
8144 .body(Body::empty())
8145 .unwrap(),
8146 )
8147 .await
8148 .unwrap();
8149
8150 assert_eq!(response.status(), StatusCode::FORBIDDEN);
8151 }
8152
8153 #[tokio::test]
8154 async fn test_rules_allows_admin_read_scope() {
8155 let app = create_test_app_with_rules(vec![scopes::ADMIN_READ.to_string()]);
8156
8157 let response = app
8158 .oneshot(
8159 Request::builder()
8160 .uri("/_sensor/rules")
8161 .header("X-Admin-Key", "test-key")
8162 .body(Body::empty())
8163 .unwrap(),
8164 )
8165 .await
8166 .unwrap();
8167
8168 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
8170 }
8171
8172 #[tokio::test]
8173 async fn test_rules_create_requires_admin_write_scope() {
8174 let app = create_test_app_with_rules(vec![scopes::ADMIN_READ.to_string()]);
8175 let payload = build_rule_payload();
8176
8177 let response = app
8178 .oneshot(
8179 Request::builder()
8180 .method("POST")
8181 .uri("/_sensor/rules")
8182 .header("Content-Type", "application/json")
8183 .header("X-Admin-Key", "test-key")
8184 .body(Body::from(payload.to_string()))
8185 .unwrap(),
8186 )
8187 .await
8188 .unwrap();
8189
8190 assert_eq!(response.status(), StatusCode::FORBIDDEN);
8191 }
8192
8193 #[tokio::test]
8194 async fn test_rules_create_allows_admin_write_scope() {
8195 let app = create_test_app_with_rules(vec![scopes::ADMIN_WRITE.to_string()]);
8196 let payload = build_rule_payload();
8197
8198 let response = app
8199 .oneshot(
8200 Request::builder()
8201 .method("POST")
8202 .uri("/_sensor/rules")
8203 .header("Content-Type", "application/json")
8204 .header("X-Admin-Key", "test-key")
8205 .body(Body::from(payload.to_string()))
8206 .unwrap(),
8207 )
8208 .await
8209 .unwrap();
8210
8211 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
8213 }
8214
8215 #[tokio::test]
8216 async fn test_rules_delete_requires_admin_write_scope() {
8217 let app = create_test_app_with_rules(vec![scopes::ADMIN_READ.to_string()]);
8218
8219 let response = app
8220 .oneshot(
8221 Request::builder()
8222 .method("DELETE")
8223 .uri("/_sensor/rules/rule-1")
8224 .header("X-Admin-Key", "test-key")
8225 .body(Body::empty())
8226 .unwrap(),
8227 )
8228 .await
8229 .unwrap();
8230
8231 assert_eq!(response.status(), StatusCode::FORBIDDEN);
8232 }
8233
8234 #[tokio::test]
8235 async fn test_rules_delete_allows_admin_write_scope() {
8236 let app = create_test_app_with_rules(vec![scopes::ADMIN_WRITE.to_string()]);
8237
8238 let response = app
8239 .oneshot(
8240 Request::builder()
8241 .method("DELETE")
8242 .uri("/_sensor/rules/rule-1")
8243 .header("X-Admin-Key", "test-key")
8244 .body(Body::empty())
8245 .unwrap(),
8246 )
8247 .await
8248 .unwrap();
8249
8250 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
8252 }
8253
8254 #[tokio::test]
8259 async fn test_kernel_config_requires_auth() {
8260 let app = create_test_app_with_kernel_config(vec![scopes::ADMIN_READ.to_string()]);
8261
8262 let response = app
8263 .oneshot(
8264 Request::builder()
8265 .uri("/_sensor/config/kernel")
8266 .body(Body::empty())
8267 .unwrap(),
8268 )
8269 .await
8270 .unwrap();
8271
8272 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
8273 }
8274
8275 #[tokio::test]
8276 async fn test_kernel_config_requires_admin_read_scope() {
8277 let app = create_test_app_with_kernel_config(vec![]);
8278
8279 let response = app
8280 .oneshot(
8281 Request::builder()
8282 .uri("/_sensor/config/kernel")
8283 .header("X-Admin-Key", "test-key")
8284 .body(Body::empty())
8285 .unwrap(),
8286 )
8287 .await
8288 .unwrap();
8289
8290 assert_eq!(response.status(), StatusCode::FORBIDDEN);
8291 }
8292
8293 #[tokio::test]
8294 async fn test_kernel_config_put_requires_admin_write_scope() {
8295 let app = create_test_app_with_kernel_config(vec![scopes::ADMIN_READ.to_string()]);
8296
8297 let payload = serde_json::json!({
8298 "params": { "net.core.somaxconn": 1024 },
8299 });
8300
8301 let response = app
8302 .oneshot(
8303 Request::builder()
8304 .method("PUT")
8305 .uri("/_sensor/config/kernel")
8306 .header("Content-Type", "application/json")
8307 .header("X-Admin-Key", "test-key")
8308 .body(Body::from(payload.to_string()))
8309 .unwrap(),
8310 )
8311 .await
8312 .unwrap();
8313
8314 assert_eq!(response.status(), StatusCode::FORBIDDEN);
8315 }
8316
8317 #[tokio::test]
8318 async fn test_kernel_config_put_with_admin_write_scope() {
8319 let app = create_test_app_with_kernel_config(vec![scopes::ADMIN_WRITE.to_string()]);
8320
8321 let payload = serde_json::json!({
8322 "params": { "net.core.somaxconn": 1024 },
8323 });
8324
8325 let response = app
8326 .oneshot(
8327 Request::builder()
8328 .method("PUT")
8329 .uri("/_sensor/config/kernel")
8330 .header("Content-Type", "application/json")
8331 .header("X-Admin-Key", "test-key")
8332 .body(Body::from(payload.to_string()))
8333 .unwrap(),
8334 )
8335 .await
8336 .unwrap();
8337
8338 assert_ne!(response.status(), StatusCode::UNAUTHORIZED);
8339 assert_ne!(response.status(), StatusCode::FORBIDDEN);
8340 }
8341}