1use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::collections::{BTreeSet, HashSet, VecDeque};
26use std::sync::Mutex;
27use std::time::{SystemTime, UNIX_EPOCH};
28use tokio::sync::{broadcast, mpsc, oneshot, watch};
29use relay_core_api::flow::{Flow, WebSocketMessage, FlowUpdate, BodyData, Direction, Layer};
30#[cfg(feature = "script")]
31use relay_core_script::ScriptInterceptor;
32use relay_core_lib::interceptor::{Interceptor, CompositeInterceptor};
33use relay_core_lib::tls::CertificateAuthority;
34use relay_core_lib::capture::{TcpCaptureSource, TransparentTcpCaptureSource, OriginalDstProvider};
35use relay_core_lib::capture::udp::UdpProxy;
36#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
37use relay_core_lib::capture::LinuxOriginalDstProvider;
38#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
39use relay_core_lib::capture::MacOsOriginalDstProvider;
40#[cfg(target_os = "windows")]
41use relay_core_lib::capture::WindowsOriginalDstProvider;
42use tokio::net::TcpListener;
43use std::net::SocketAddr;
44use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
45
46use tracing::error;
47
48use relay_core_lib::rule::Rule;
49use relay_core_lib::rule::engine::RuleEngine;
50use crate::rule::{
51 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
52 build_mock_response_rule,
53};
54use relay_core_storage::store::{AuditEventRecord, Store};
55use relay_core_api::modification::{FlowQuery, FlowSummary};
56use serde_json::json;
57use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
58
59pub mod audit;
60pub mod rule;
61pub mod actors;
62pub mod interceptors;
63pub mod modification;
64pub mod services;
65
66pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77 pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82 pub flows_total: usize,
83 pub flows_in_memory: usize,
84 pub flows_dropped: usize,
85 pub intercepts_pending: usize,
86 pub ws_pending_messages: usize,
87 pub oldest_intercept_age_ms: Option<u64>,
88 pub oldest_ws_message_age_ms: Option<u64>,
89 pub rule_exec_errors: usize,
90 pub audit_events_total: usize,
91 pub audit_events_failed: usize,
92 pub flow_events_lagged_total: usize,
93 pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97 pub fn to_prometheus_text(&self) -> String {
98 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100 format!(
101 "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113 self.flows_total,
114 self.flows_in_memory,
115 self.flows_dropped,
116 self.intercepts_pending,
117 self.ws_pending_messages,
118 oldest_intercept_age_ms,
119 oldest_ws_message_age_ms,
120 self.rule_exec_errors,
121 self.audit_events_total,
122 self.audit_events_failed,
123 self.flow_events_lagged_total,
124 self.audit_events_lagged_total,
125 )
126 }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131 pub phase: RuntimeLifecyclePhase,
132 pub running: bool,
133 pub port: Option<u16>,
134 pub uptime: Option<u64>,
135 pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140 pub status: CoreStatusSnapshot,
141 pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146 pub pending_count: usize,
147 pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152 pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157 pub since_ms: Option<u64>,
158 pub until_ms: Option<u64>,
159 pub actor: Option<AuditActor>,
160 pub kind: Option<AuditEventKind>,
161 pub outcome: Option<AuditOutcome>,
162 pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166 fn default() -> Self {
167 Self {
168 since_ms: None,
169 until_ms: None,
170 actor: None,
171 kind: None,
172 outcome: None,
173 limit: 50,
174 }
175 }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181 Created,
182 Starting,
183 Running,
184 Stopping,
185 Stopped,
186 Failed,
187}
188
189impl RuntimeLifecyclePhase {
190 pub fn as_str(&self) -> &'static str {
191 match self {
192 Self::Created => "created",
193 Self::Starting => "starting",
194 Self::Running => "running",
195 Self::Stopping => "stopping",
196 Self::Stopped => "stopped",
197 Self::Failed => "failed",
198 }
199 }
200
201 pub fn is_active(&self) -> bool {
202 matches!(self, Self::Starting | Self::Running | Self::Stopping)
203 }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208 pub phase: RuntimeLifecyclePhase,
209 pub port: Option<u16>,
210 pub started_at_ms: Option<u64>,
211 pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215 pub fn created() -> Self {
216 Self {
217 phase: RuntimeLifecyclePhase::Created,
218 port: None,
219 started_at_ms: None,
220 last_error: None,
221 }
222 }
223
224 pub fn is_active(&self) -> bool {
225 self.phase.is_active()
226 }
227
228 pub fn uptime_seconds(&self) -> Option<u64> {
229 let started_at_ms = self.started_at_ms?;
230 let now_ms = now_unix_ms();
231 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232 }
233}
234
235pub enum ProxySpawnResult {
236 Started(tokio::task::JoinHandle<()>),
237 AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242 Stopping,
243 NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247 fn from(lifecycle: RuntimeLifecycle) -> Self {
248 Self {
249 running: lifecycle.is_active(),
250 uptime: lifecycle.uptime_seconds(),
251 port: lifecycle.port,
252 last_error: lifecycle.last_error,
253 phase: lifecycle.phase,
254 }
255 }
256}
257
258pub struct CoreState {
259 flow_store: mpsc::Sender<FlowStoreMessage>,
260 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261 rule_store: mpsc::Sender<RuleStoreMessage>,
262 store: Option<Store>,
263 #[cfg(feature = "script")]
264 pub script_interceptor: Arc<ScriptInterceptor>,
265 pub policy_tx: watch::Sender<ProxyPolicy>,
266 pub flows_dropped: Arc<AtomicUsize>,
267 audit_events_total: Arc<AtomicUsize>,
268 audit_events_failed: Arc<AtomicUsize>,
269 flow_events_lagged_total: Arc<AtomicUsize>,
270 audit_events_lagged_total: Arc<AtomicUsize>,
271 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280 pub async fn new(db_url: Option<String>) -> Self {
281 const AUDIT_HISTORY_LIMIT: usize = 200;
282
283 let store = if let Some(url) = db_url {
284 match Store::connect(&url).await {
285 Ok(s) => {
286 if let Err(e) = s.init().await {
287 tracing::error!("Failed to init store: {}", e);
288 }
289 Some(s)
290 },
291 Err(e) => {
292 tracing::error!("Failed to connect to store: {}", e);
293 None
294 }
295 }
296 } else {
297 None
298 };
299
300 let (flow_tx, flow_rx) = mpsc::channel(10000);
301 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302 tokio::spawn(flow_actor.run());
303
304 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306 tokio::spawn(intercept_actor.run());
307
308 let (rule_tx, rule_rx) = mpsc::channel(100);
309 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310 tokio::spawn(rule_actor.run());
311
312 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313 let (flow_broadcast_tx, _) = broadcast::channel(1000);
314 let (audit_broadcast_tx, _) = broadcast::channel(256);
315 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317 #[cfg(feature = "script")]
318 let script_interceptor = ScriptInterceptor::new().await.expect("Failed to initialize ScriptInterceptor");
319 Self {
320 flow_store: flow_tx,
321 intercept_broker: intercept_tx,
322 rule_store: rule_tx,
323 store,
324 #[cfg(feature = "script")]
325 script_interceptor: Arc::new(script_interceptor),
326 policy_tx,
327 flows_dropped: Arc::new(AtomicUsize::new(0)),
328 audit_events_total: Arc::new(AtomicUsize::new(0)),
329 audit_events_failed: Arc::new(AtomicUsize::new(0)),
330 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
331 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
332 flow_broadcast_tx,
333 audit_broadcast_tx,
334 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
335 lifecycle_tx,
336 shutdown_tx: Mutex::new(None),
337 }
338 }
339
340 pub async fn get_metrics(&self) -> CoreMetrics {
341 let (flow_tx, flow_rx) = oneshot::channel();
342 let _ = self.flow_store.send(FlowStoreMessage::GetMetrics(flow_tx)).await;
343 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
344
345 let (int_tx, int_rx) = oneshot::channel();
346 let _ = self.intercept_broker.send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx }).await;
347 let (intercepts_pending, ws_pending_messages, oldest_intercept_age_ms, oldest_ws_message_age_ms) =
348 int_rx.await.unwrap_or((0, 0, None, None));
349
350 let (rule_tx, rule_rx) = oneshot::channel();
351 let _ = self.rule_store.send(RuleStoreMessage::GetMetrics(rule_tx)).await;
352 let rule_exec_errors = rule_rx.await.unwrap_or(0);
353
354 CoreMetrics {
355 flows_total,
356 flows_in_memory,
357 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
358 intercepts_pending,
359 ws_pending_messages,
360 oldest_intercept_age_ms,
361 oldest_ws_message_age_ms,
362 rule_exec_errors,
363 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
364 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
365 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
366 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
367 }
368 }
369
370 pub async fn get_metrics_prometheus_text(&self) -> String {
371 self.get_metrics().await.to_prometheus_text()
372 }
373
374 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
375 self.lifecycle().into()
376 }
377
378 pub async fn status_report(&self) -> CoreStatusReport {
379 CoreStatusReport {
380 status: self.status_snapshot(),
381 metrics: self.get_metrics().await,
382 }
383 }
384
385 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
386 let metrics = self.get_metrics().await;
387 CoreInterceptSnapshot {
388 pending_count: metrics.intercepts_pending,
389 ws_pending_count: metrics.ws_pending_messages,
390 }
391 }
392
393 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
394 let events = self.recent_audit_events();
395 let start = events.len().saturating_sub(limit);
396 CoreAuditSnapshot {
397 events: events.into_iter().skip(start).collect(),
398 }
399 }
400
401 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
402 let limit = query.limit.clamp(1, 500);
403 if let Some(store) = &self.store {
404 let rows = store
405 .query_audit_events(
406 query.since_ms,
407 query.until_ms,
408 query.actor.as_ref().map(AuditActor::as_str),
409 query.kind.as_ref().map(AuditEventKind::as_str),
410 query.outcome.as_ref().map(AuditOutcome::as_str),
411 limit,
412 )
413 .await
414 .unwrap_or_default();
415
416 let mut events = Vec::with_capacity(rows.len());
417 for row in rows {
418 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
419 events.push(event);
420 }
421 }
422 return CoreAuditSnapshot { events };
423 }
424
425 let mut events = self.recent_audit_events();
426 events.reverse();
427 let filtered = events
428 .into_iter()
429 .filter(|event| {
430 query
431 .since_ms
432 .map(|v| event.timestamp_ms >= v)
433 .unwrap_or(true)
434 })
435 .filter(|event| {
436 query
437 .until_ms
438 .map(|v| event.timestamp_ms <= v)
439 .unwrap_or(true)
440 })
441 .filter(|event| {
442 query
443 .actor
444 .as_ref()
445 .map(|v| &event.actor == v)
446 .unwrap_or(true)
447 })
448 .filter(|event| query.kind.as_ref().map(|v| &event.kind == v).unwrap_or(true))
449 .filter(|event| {
450 query
451 .outcome
452 .as_ref()
453 .map(|v| &event.outcome == v)
454 .unwrap_or(true)
455 })
456 .take(limit)
457 .collect();
458 CoreAuditSnapshot { events: filtered }
459 }
460
461 pub async fn get_flow(&self, id: String) -> Option<Flow> {
462 let (tx, rx) = oneshot::channel();
463 if let Err(e) = self.flow_store.send(FlowStoreMessage::GetFlow { id: id.clone(), respond_to: tx }).await {
464 error!("Failed to send GetFlow request: {}", e);
465 if let Some(store) = &self.store {
466 return store
467 .load_flow(&id)
468 .await
469 .ok()
470 .flatten()
471 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
472 }
473 return None;
474 }
475 let flow = rx.await.map_err(|e| {
476 error!("Failed to receive Flow response: {}", e);
477 e
478 }).unwrap_or(None);
479 if let Some(flow) = flow {
480 return Some(redact_flow(flow, &self.current_redaction_policy()));
481 }
482 if let Some(store) = &self.store {
483 return store
484 .load_flow(&id)
485 .await
486 .ok()
487 .flatten()
488 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
489 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
490 }
491 None
492 }
493
494 pub async fn get_rules(&self) -> Vec<Rule> {
495 let (tx, rx) = oneshot::channel();
496 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
497 error!("Failed to send GetRules request: {}", e);
498 return Vec::new();
499 }
500 rx.await.map_err(|e| {
501 error!("Failed to receive Rules response: {}", e);
502 e
503 }).unwrap_or_default()
504 }
505
506 pub async fn set_rules(&self, rules: Vec<Rule>) {
507 let _ = self
508 .set_rules_from(
509 AuditActor::Runtime,
510 "rules.replace",
511 "rule_set".to_string(),
512 json!({}),
513 rules,
514 )
515 .await;
516 }
517
518 pub async fn upsert_rule_from(
519 &self,
520 actor: AuditActor,
521 operation: &str,
522 target: String,
523 details: serde_json::Value,
524 rule: Rule,
525 ) -> Result<(), String> {
526 let rule_id = rule.id.clone();
527 let mut rules = self.get_rules().await;
528 rules.retain(|existing| existing.id != rule_id);
529 rules.push(rule);
530 self.set_rules_from(actor, operation, target, details, rules).await
531 }
532
533 pub async fn delete_rule_from(
534 &self,
535 actor: AuditActor,
536 operation: &str,
537 target: String,
538 details: serde_json::Value,
539 rule_id: &str,
540 ) -> Result<bool, String> {
541 let mut rules = self.get_rules().await;
542 let before = rules.len();
543 rules.retain(|rule| rule.id != rule_id);
544 if rules.len() == before {
545 return Ok(false);
546 }
547 self.set_rules_from(actor, operation, target, details, rules)
548 .await
549 .map(|_| true)
550 }
551
552 pub async fn create_mock_response_rule_from(
553 &self,
554 actor: AuditActor,
555 target: String,
556 details: serde_json::Value,
557 config: MockResponseRuleConfig,
558 ) -> Result<String, String> {
559 let rule_id = config.rule_id.clone();
560 let rule = build_mock_response_rule(config);
561 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
562 .await
563 .map(|_| rule_id)
564 }
565
566 pub async fn create_intercept_rule_from(
567 &self,
568 actor: AuditActor,
569 target: String,
570 details: serde_json::Value,
571 config: InterceptRuleConfig,
572 ) -> Result<String, String> {
573 let rule_id = config.rule_id.clone();
574 let mut rules = self.get_rules().await;
575 rules.extend(build_intercept_rules(config));
576 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
577 .await
578 .map(|_| rule_id)
579 }
580
581 pub async fn upsert_legacy_intercept_rule_from(
582 &self,
583 actor: AuditActor,
584 target: String,
585 details: serde_json::Value,
586 rule: InterceptRule,
587 ) -> Result<(), String> {
588 let rule_id = rule.id.clone();
589 let mut rules = self.get_rules().await;
590 rules.retain(|existing| existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id)));
591 rules.extend(rule.to_rules());
592 self.set_rules_from(actor, "rule.intercept_legacy_upsert", target, details, rules)
593 .await
594 }
595
596 pub async fn set_rules_from(
597 &self,
598 actor: AuditActor,
599 operation: &str,
600 target: String,
601 details: serde_json::Value,
602 rules: Vec<Rule>,
603 ) -> Result<(), String> {
604 let rule_count = rules.len();
605 if let Err(e) = self.rule_store.send(RuleStoreMessage::SetRules(rules)).await {
606 error!("Failed to send SetRules request: {}", e);
607 self.record_audit_event(AuditEvent::new(
608 actor,
609 AuditEventKind::RuleChanged,
610 target,
611 AuditOutcome::Failed,
612 json!({
613 "operation": operation,
614 "rule_count": rule_count,
615 "details": details,
616 "error": e.to_string()
617 }),
618 ));
619 return Err(e.to_string());
620 }
621
622 self.record_audit_event(AuditEvent::new(
623 actor,
624 AuditEventKind::RuleChanged,
625 target,
626 AuditOutcome::Success,
627 json!({
628 "operation": operation,
629 "rule_count": rule_count,
630 "details": details
631 }),
632 ));
633 Ok(())
634 }
635
636 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
637 let mut new_rules = Vec::new();
638 for rule in rules {
639 new_rules.extend(rule.to_rules());
640 }
641 self.set_rules(new_rules).await;
642 }
643
644 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
645 let (tx, rx) = oneshot::channel();
646 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRuleEngine(tx)).await {
647 error!("Failed to send GetRuleEngine request: {}", e);
648 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
649 }
650 rx.await.map_err(|e| {
651 error!("Failed to receive RuleEngine response: {}", e);
652 e
653 }).unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
654 }
655
656 pub fn update_policy(&self, policy: ProxyPolicy) {
657 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
658 }
659
660 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
661 let mut policy = self.policy_snapshot();
662 policy.apply_patch(patch);
663 self.update_policy_from(actor, target, policy);
664 }
665
666 pub fn policy_snapshot(&self) -> ProxyPolicy {
667 self.policy_tx.borrow().clone()
668 }
669
670 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
671 let details = json!({
672 "strict_http_semantics": policy.strict_http_semantics,
673 "request_timeout_ms": policy.request_timeout_ms,
674 "max_body_size": policy.max_body_size,
675 "transparent_enabled": policy.transparent_enabled,
676 "redaction_enabled": policy.redaction.enabled,
677 "redact_bodies": policy.redaction.redact_bodies
678 });
679
680 self.policy_tx.send_replace(policy);
681 self.record_audit_event(AuditEvent::new(
682 actor,
683 AuditEventKind::PolicyUpdated,
684 target,
685 AuditOutcome::Success,
686 details,
687 ));
688 }
689
690 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
691 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::RegisterIntercept { key, tx }).await {
692 error!("Failed to send RegisterIntercept request: {}", e);
693 }
694 }
695
696 pub async fn resolve_intercept(&self, key: String, result: InterceptionResult) -> Result<(), String> {
697 let (tx, rx) = oneshot::channel();
698 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::ResolveIntercept { key, result, respond_to: tx }).await {
699 error!("Failed to send ResolveIntercept request: {}", e);
700 return Err(e.to_string());
701 }
702 rx.await.map_err(|_| "Actor dropped".to_string())?
703 }
704
705 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
706 let (tx, rx) = oneshot::channel();
707 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingWebSocketMessage { key, respond_to: tx }).await {
708 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
709 return None;
710 }
711 rx.await.map_err(|e| {
712 error!("Failed to receive WebSocketMessage response: {}", e);
713 e
714 }).unwrap_or(None)
715 }
716
717 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
718 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message }).await {
719 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
720 }
721 }
722
723 pub async fn is_intercept_pending(&self, key: String) -> bool {
724 let (tx, rx) = oneshot::channel();
725 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingIntercept { key, respond_to: tx }).await {
726 error!("Failed to send GetPendingIntercept request: {}", e);
727 return false;
728 }
729 rx.await.map_err(|e| {
730 error!("Failed to receive PendingIntercept response: {}", e);
731 e
732 }).unwrap_or(false)
733 }
734
735 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
736 let (tx, rx) = oneshot::channel();
737 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingInterceptByFlowId { flow_id, respond_to: tx }).await {
738 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
739 return false;
740 }
741 rx.await.map_err(|e| {
742 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
743 e
744 }).unwrap_or(false)
745 }
746
747 pub fn upsert_flow(&self, flow: Box<Flow>) {
748 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
749 error!("FlowStore dropped flow: {}", e);
751 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
752 }
753 }
754
755 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
756 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message }) {
757 error!("FlowStore dropped WS message: {}", e);
758 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
759 }
760 }
761
762 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
763 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody { flow_id, body, direction }) {
764 error!("FlowStore dropped HTTP body: {}", e);
765 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
766 }
767 }
768
769 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
771 self.flow_broadcast_tx.subscribe()
772 }
773
774 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
775 self.audit_broadcast_tx.subscribe()
776 }
777
778 fn current_redaction_policy(&self) -> RedactionPolicy {
779 self.policy_tx.borrow().redaction.clone()
780 }
781
782 pub fn record_flow_events_lagged(&self, skipped: u64) {
783 self.flow_events_lagged_total
784 .fetch_add(skipped as usize, Ordering::Relaxed);
785 }
786
787 pub fn record_audit_events_lagged(&self, skipped: u64) {
788 self.audit_events_lagged_total
789 .fetch_add(skipped as usize, Ordering::Relaxed);
790 }
791
792 pub fn lifecycle(&self) -> RuntimeLifecycle {
793 self.lifecycle_tx.borrow().clone()
794 }
795
796 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
797 self.lifecycle_tx.subscribe()
798 }
799
800 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
801 let current = self.lifecycle();
802 if current.is_active() {
803 return Err(format!(
804 "Proxy is already {} on port {}",
805 current.phase.as_str(),
806 current.port.unwrap_or(port)
807 ));
808 }
809
810 let mut guard = self.shutdown_tx.lock().map_err(|_| "shutdown state poisoned".to_string())?;
811 *guard = Some(shutdown_tx);
812 drop(guard);
813
814 self.update_lifecycle(RuntimeLifecycle {
815 phase: RuntimeLifecyclePhase::Starting,
816 port: Some(port),
817 started_at_ms: None,
818 last_error: None,
819 });
820 Ok(())
821 }
822
823 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
824 let mut guard = self.shutdown_tx.lock().map_err(|_| "shutdown state poisoned".to_string())?;
825 let Some(tx) = guard.take() else {
826 return Ok(ProxyStopResult::NotRunning);
827 };
828 drop(guard);
829
830 let current = self.lifecycle();
831 self.update_lifecycle(RuntimeLifecycle {
832 phase: RuntimeLifecyclePhase::Stopping,
833 port: current.port,
834 started_at_ms: current.started_at_ms,
835 last_error: current.last_error,
836 });
837 let _ = tx.send(());
838 Ok(ProxyStopResult::Stopping)
839 }
840
841 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
842 self.audit_history
843 .lock()
844 .map(|events| events.iter().cloned().collect())
845 .unwrap_or_default()
846 }
847
848 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
850 let redaction = self.current_redaction_policy();
851 if let Some(store) = &self.store {
852 return store
853 .query_flow_summaries(&query)
854 .await
855 .unwrap_or_default()
856 .into_iter()
857 .map(|summary| redact_flow_summary(summary, &redaction))
858 .collect();
859 }
860 let (tx, rx) = oneshot::channel();
861 if let Err(e) = self.flow_store.send(FlowStoreMessage::SearchFlows { query, respond_to: tx }).await {
862 error!("Failed to send SearchFlows request: {}", e);
863 return Vec::new();
864 }
865 rx.await
866 .unwrap_or_default()
867 .into_iter()
868 .map(|summary| redact_flow_summary(summary, &redaction))
869 .collect()
870 }
871
872 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
873 let redaction = self.current_redaction_policy();
874 redact_flow_update(update, &redaction)
875 }
876
877 pub async fn resolve_intercept_with_modifications(
887 &self,
888 key: String,
889 action: &str,
890 mods: Option<relay_core_api::modification::FlowModification>,
891 ) -> Result<(), String> {
892 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
893 .await
894 }
895
896 pub async fn resolve_intercept_with_modifications_from(
897 &self,
898 actor: AuditActor,
899 key: String,
900 action: &str,
901 mods: Option<relay_core_api::modification::FlowModification>,
902 ) -> Result<(), String> {
903 let modified_fields = modification_field_names(mods.as_ref());
904 let result = match action {
905 "drop" => InterceptionResult::Drop,
906 _ => match mods {
907 None => InterceptionResult::Continue,
908 Some(m) => {
909 let parts: Vec<&str> = key.splitn(4, ':').collect();
910 match parts.as_slice() {
911 [flow_id, phase] => {
912 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
913 modification::apply_flow_modification(&flow, phase, m)
914 } else {
915 InterceptionResult::Continue
916 }
917 }
918 [_, "ws_msg", _] => {
921 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
922 modification::apply_ws_modification(&msg, m)
923 } else {
924 InterceptionResult::Continue
925 }
926 }
927 _ => InterceptionResult::Continue,
928 }
929 }
930 },
931 };
932 let outcome = self.resolve_intercept(key.clone(), result).await;
933 let audit_outcome = if outcome.is_ok() {
934 AuditOutcome::Success
935 } else {
936 AuditOutcome::Failed
937 };
938 let error_message = outcome.as_ref().err().cloned();
939
940 self.record_audit_event(AuditEvent::new(
941 actor,
942 AuditEventKind::InterceptResolved,
943 key,
944 audit_outcome,
945 json!({
946 "action": action,
947 "has_modifications": !modified_fields.is_empty(),
948 "modified_fields": modified_fields,
949 "error": error_message
950 }),
951 ));
952 outcome
953 }
954
955 #[cfg(feature = "script")]
956 pub async fn load_script_from(
957 &self,
958 actor: AuditActor,
959 target: String,
960 script: &str,
961 ) -> Result<(), String> {
962 let result = self
963 .script_interceptor
964 .load_script(script)
965 .await
966 .map_err(|e| e.to_string());
967
968 let outcome = if result.is_ok() {
969 AuditOutcome::Success
970 } else {
971 AuditOutcome::Failed
972 };
973 let error_message = result.as_ref().err().cloned();
974
975 self.record_audit_event(AuditEvent::new(
976 actor,
977 AuditEventKind::ScriptReloaded,
978 target,
979 outcome,
980 json!({
981 "script_bytes": script.len(),
982 "error": error_message
983 }),
984 ));
985
986 result
987 }
988
989 fn record_audit_event(&self, event: AuditEvent) {
990 const AUDIT_HISTORY_LIMIT: usize = 200;
991 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
992 if event.outcome == AuditOutcome::Failed {
993 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
994 }
995
996 let details = event.details.to_string();
997 tracing::info!(
998 target: "relay_core_audit",
999 event_id = %event.id,
1000 actor = %event.actor.as_str(),
1001 kind = %event.kind.as_str(),
1002 target = %event.target,
1003 outcome = %event.outcome.as_str(),
1004 details = %details
1005 );
1006
1007 if let Ok(mut history) = self.audit_history.lock() {
1008 if history.len() >= AUDIT_HISTORY_LIMIT {
1009 history.pop_front();
1010 }
1011 history.push_back(event.clone());
1012 }
1013
1014 if let Some(store) = self.store.clone() {
1015 let event_json = serde_json::to_value(&event).unwrap_or_default();
1016 let event_id = event.id.clone();
1017 let timestamp_ms = event.timestamp_ms;
1018 let actor = event.actor.as_str().to_string();
1019 let kind = event.kind.as_str().to_string();
1020 let target = event.target.clone();
1021 let outcome = event.outcome.as_str().to_string();
1022 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1023 handle.spawn(async move {
1024 if let Err(e) = store
1025 .save_audit_event(AuditEventRecord {
1026 id: &event_id,
1027 timestamp_ms,
1028 actor: &actor,
1029 kind: &kind,
1030 target: &target,
1031 outcome: &outcome,
1032 content: &event_json,
1033 })
1034 .await
1035 {
1036 tracing::error!("Failed to persist audit event: {}", e);
1037 }
1038 });
1039 }
1040 }
1041
1042 let _ = self.audit_broadcast_tx.send(event);
1043 }
1044
1045 fn transition_to_running(&self, port: u16) {
1046 self.update_lifecycle(RuntimeLifecycle {
1047 phase: RuntimeLifecyclePhase::Running,
1048 port: Some(port),
1049 started_at_ms: Some(now_unix_ms()),
1050 last_error: None,
1051 });
1052 }
1053
1054 fn transition_to_stopped(&self) {
1055 if let Ok(mut guard) = self.shutdown_tx.lock() {
1056 *guard = None;
1057 }
1058
1059 self.update_lifecycle(RuntimeLifecycle {
1060 phase: RuntimeLifecyclePhase::Stopped,
1061 port: None,
1062 started_at_ms: None,
1063 last_error: None,
1064 });
1065 }
1066
1067 fn transition_to_failed(&self, port: u16, error: String) {
1068 if let Ok(mut guard) = self.shutdown_tx.lock() {
1069 *guard = None;
1070 }
1071
1072 self.update_lifecycle(RuntimeLifecycle {
1073 phase: RuntimeLifecyclePhase::Failed,
1074 port: Some(port),
1075 started_at_ms: None,
1076 last_error: Some(error),
1077 });
1078 }
1079
1080 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1081 tracing::info!(
1082 target: "relay_core_lifecycle",
1083 phase = %lifecycle.phase.as_str(),
1084 port = ?lifecycle.port,
1085 started_at_ms = ?lifecycle.started_at_ms,
1086 last_error = ?lifecycle.last_error
1087 );
1088 self.lifecycle_tx.send_replace(lifecycle);
1089 }
1090
1091 pub fn spawn_proxy(
1092 self: &Arc<Self>,
1093 config: ProxyConfig,
1094 sink: mpsc::Sender<FlowUpdate>,
1095 extra_interceptor: Option<Arc<dyn Interceptor>>,
1096 ) -> Result<ProxySpawnResult, String> {
1097 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1098 match self.prepare_start(config.port, shutdown_tx) {
1099 Ok(()) => {}
1100 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1101 Err(error) => return Err(error),
1102 }
1103 let state = self.clone();
1104 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1105 if let Err(error) = state
1106 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1107 .await
1108 {
1109 error!("Proxy failed: {}", error);
1110 }
1111 })))
1112 }
1113
1114 pub async fn start_proxy(
1115 self: &Arc<Self>,
1116 config: ProxyConfig,
1117 sink: mpsc::Sender<FlowUpdate>,
1118 extra_interceptor: Option<Arc<dyn Interceptor>>,
1119 ) -> Result<(), String> {
1120 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1121 self.prepare_start(config.port, shutdown_tx)?;
1122 self.run_proxy(config, sink, extra_interceptor, shutdown_rx).await
1123 }
1124
1125 async fn run_proxy(
1126 self: &Arc<Self>,
1127 config: ProxyConfig,
1128 sink: mpsc::Sender<FlowUpdate>,
1129 extra_interceptor: Option<Arc<dyn Interceptor>>,
1130 shutdown_rx: oneshot::Receiver<()>,
1131 ) -> Result<(), String> {
1132 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1133 let state = self.clone();
1134
1135 if let Some(parent) = config.ca_cert_path.parent()
1136 && !parent.exists() {
1137 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1138 }
1139
1140 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1141 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1142 let ca = Arc::new(ca);
1143
1144 #[cfg(feature = "script")]
1145 let script_interceptor = self.script_interceptor.clone();
1146
1147 #[cfg(feature = "script")]
1148 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1149
1150 #[cfg(not(feature = "script"))]
1151 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1152
1153 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(self.clone())));
1154
1155 if let Some(interceptor) = extra_interceptor {
1156 interceptors.push(interceptor);
1157 }
1158
1159 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1160 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1161
1162 tokio::spawn(async move {
1163 while let Some(update) = proxy_rx.recv().await {
1164 match update.clone() {
1165 FlowUpdate::Full(flow) => {
1166 state.upsert_flow(flow);
1167 }
1168 FlowUpdate::WebSocketMessage { flow_id, message } => {
1169 state.append_ws_message(flow_id, message);
1170 }
1171 FlowUpdate::HttpBody {
1172 flow_id,
1173 direction,
1174 body,
1175 } => {
1176 state.update_http_body(flow_id, body, direction);
1177 }
1178 }
1179
1180 let _ = state.flow_broadcast_tx.send(update.clone());
1181
1182 if sink.try_send(update).is_err() {
1183 relay_core_lib::metrics::inc_flows_dropped();
1184 }
1185 }
1186 });
1187
1188 if let Some(udp_port) = config.udp_tproxy_port {
1189 let udp_proxy_tx = proxy_tx.clone();
1190 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1191 tokio::spawn(async move {
1192 match tokio::net::UdpSocket::bind(udp_addr).await {
1193 Ok(socket) => {
1194 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1195 if let Err(e) = proxy.run(udp_proxy_tx).await {
1196 error!("UDP TPROXY failed: {}", e);
1197 }
1198 }
1199 Err(e) => {
1200 error!("Failed to bind UDP TPROXY socket: {}", e);
1201 }
1202 }
1203 });
1204 }
1205
1206 let listener = match TcpListener::bind(addr).await {
1207 Ok(listener) => listener,
1208 Err(e) => {
1209 if let Ok(mut guard) = self.shutdown_tx.lock() {
1210 *guard = None;
1211 }
1212 let message = format!("Failed to bind to address {}: {}", addr, e);
1213 self.transition_to_failed(config.port, message.clone());
1214 return Err(message);
1215 }
1216 };
1217
1218 self.transition_to_running(config.port);
1219 let shutdown_rx = Some(shutdown_rx);
1220
1221 if config.transparent {
1222 let policy = ProxyPolicy {
1223 transparent_enabled: true,
1224 ..Default::default()
1225 };
1226 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1227 let policy_rx = self.policy_tx.subscribe();
1228
1229 let provider: Arc<dyn OriginalDstProvider> = {
1230 let mut addrs = BTreeSet::new();
1231 if let Ok(local) = listener.local_addr() {
1232 addrs.insert(local);
1233 }
1234
1235 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1236 {
1237 Arc::new(LinuxOriginalDstProvider::new(addrs))
1238 }
1239 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1240 {
1241 match MacOsOriginalDstProvider::new(addrs.clone()) {
1242 Ok(provider) => Arc::new(provider),
1243 Err(e) => {
1244 error!("Failed to initialize macOS PF provider: {}", e);
1245 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1246 }
1247 }
1248 }
1249 #[cfg(target_os = "windows")]
1250 {
1251 let filter = "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)".to_string();
1252 let port = config.port;
1253 tokio::spawn(async move {
1254 relay_core_lib::capture::windows::start_windivert_capture(filter, port).await;
1255 });
1256
1257 Arc::new(WindowsOriginalDstProvider::new(addrs))
1258 }
1259 #[cfg(not(any(
1260 all(target_os = "linux", feature = "transparent-linux"),
1261 all(target_os = "macos", feature = "transparent-macos"),
1262 target_os = "windows"
1263 )))]
1264 {
1265 Arc::new(NoOpOriginalDstProvider::new(addrs))
1266 }
1267 };
1268
1269 let source = TransparentTcpCaptureSource::new(listener, provider);
1270 let result = relay_core_lib::start_proxy(
1271 source,
1272 proxy_tx,
1273 interceptor,
1274 ca,
1275 policy_rx,
1276 None,
1277 shutdown_rx,
1278 )
1279 .await
1280 .map_err(|e| e.to_string());
1281 if let Err(error) = &result {
1282 self.transition_to_failed(config.port, error.clone());
1283 } else {
1284 self.transition_to_stopped();
1285 }
1286 result
1287 } else {
1288 let source = TcpCaptureSource::new(listener);
1289 let policy = ProxyPolicy::default();
1290 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1291 let policy_rx = self.policy_tx.subscribe();
1292
1293 let result = relay_core_lib::start_proxy(
1294 source,
1295 proxy_tx,
1296 interceptor,
1297 ca,
1298 policy_rx,
1299 None,
1300 shutdown_rx,
1301 )
1302 .await
1303 .map_err(|e| e.to_string());
1304 if let Err(error) = &result {
1305 self.transition_to_failed(config.port, error.clone());
1306 } else {
1307 self.transition_to_stopped();
1308 }
1309 result
1310 }
1311 }
1312}
1313
1314fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1315 match update {
1316 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1317 FlowUpdate::WebSocketMessage { flow_id, mut message } => {
1318 message.content = redact_body(message.content, redaction);
1319 FlowUpdate::WebSocketMessage { flow_id, message }
1320 }
1321 FlowUpdate::HttpBody {
1322 flow_id,
1323 direction,
1324 body,
1325 } => FlowUpdate::HttpBody {
1326 flow_id,
1327 direction,
1328 body: redact_body(body, redaction),
1329 },
1330 }
1331}
1332
1333fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1334 if !redaction.enabled {
1335 return flow;
1336 }
1337 match &mut flow.layer {
1338 Layer::Http(http) => {
1339 redact_http_request(&mut http.request, redaction);
1340 if let Some(response) = &mut http.response {
1341 redact_headers(&mut response.headers, redaction);
1342 response.body = response.body.take().map(|body| redact_body(body, redaction));
1343 }
1344 }
1345 Layer::WebSocket(ws) => {
1346 redact_http_request(&mut ws.handshake_request, redaction);
1347 redact_headers(&mut ws.handshake_response.headers, redaction);
1348 ws.handshake_response.body = ws
1349 .handshake_response
1350 .body
1351 .take()
1352 .map(|body| redact_body(body, redaction));
1353 for message in &mut ws.messages {
1354 message.content = redact_body(message.content.clone(), redaction);
1355 }
1356 }
1357 _ => {}
1358 }
1359 flow
1360}
1361
1362fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1363 if !redaction.enabled {
1364 return summary;
1365 }
1366 summary.url = redact_url_string(&summary.url, redaction);
1367 summary
1368}
1369
1370fn redact_http_request(request: &mut relay_core_api::flow::HttpRequest, redaction: &RedactionPolicy) {
1371 redact_headers(&mut request.headers, redaction);
1372 redact_query_pairs(&mut request.query, redaction);
1373 request.url = redact_url(&request.url, redaction);
1374 request.body = request.body.take().map(|body| redact_body(body, redaction));
1375}
1376
1377fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1378 if !redaction.enabled {
1379 return;
1380 }
1381 let sensitive = redaction_set(&redaction.sensitive_header_names);
1382 for (name, value) in headers.iter_mut() {
1383 if sensitive.contains(&name.to_ascii_lowercase()) {
1384 *value = "[REDACTED]".to_string();
1385 }
1386 }
1387}
1388
1389fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1390 if !redaction.enabled {
1391 return;
1392 }
1393 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1394 for (name, value) in query.iter_mut() {
1395 if sensitive.contains(&name.to_ascii_lowercase()) {
1396 *value = "[REDACTED]".to_string();
1397 }
1398 }
1399}
1400
1401fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1402 if !redaction.enabled {
1403 return url.clone();
1404 }
1405 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1406 let pairs: Vec<(String, String)> = url
1407 .query_pairs()
1408 .map(|(k, v)| {
1409 let key = k.to_string();
1410 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1411 "[REDACTED]".to_string()
1412 } else {
1413 v.to_string()
1414 };
1415 (key, value)
1416 })
1417 .collect();
1418 let mut next = url.clone();
1419 if pairs.is_empty() {
1420 return next;
1421 }
1422 next.query_pairs_mut().clear();
1423 for (k, v) in pairs {
1424 next.query_pairs_mut().append_pair(&k, &v);
1425 }
1426 next
1427}
1428
1429fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1430 match url::Url::parse(input) {
1431 Ok(url) => redact_url(&url, redaction).to_string(),
1432 Err(_) => input.to_string(),
1433 }
1434}
1435
1436fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1437 if redaction.enabled && redaction.redact_bodies {
1438 body.content = "[REDACTED]".to_string();
1439 }
1440 body
1441}
1442
1443fn redaction_set(values: &[String]) -> HashSet<String> {
1444 values
1445 .iter()
1446 .map(|value| value.to_ascii_lowercase())
1447 .collect()
1448}
1449
1450#[derive(Clone)]
1451pub struct ProxyConfig {
1452 pub port: u16,
1453 pub ca_cert_path: std::path::PathBuf,
1454 pub ca_key_path: std::path::PathBuf,
1455 pub transparent: bool,
1456 pub udp_tproxy_port: Option<u16>,
1457}
1458
1459impl ProxyConfig {
1460 pub fn new(
1461 port: u16,
1462 ca_cert_path: std::path::PathBuf,
1463 ca_key_path: std::path::PathBuf,
1464 ) -> Self {
1465 Self {
1466 port,
1467 ca_cert_path,
1468 ca_key_path,
1469 transparent: false,
1470 udp_tproxy_port: None,
1471 }
1472 }
1473
1474 pub fn from_app_data_dir(
1475 app_data_dir: impl Into<std::path::PathBuf>,
1476 port: u16,
1477 ) -> Result<Self, String> {
1478 let app_data_dir = app_data_dir.into();
1479 if !app_data_dir.exists() {
1480 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1481 }
1482
1483 Ok(Self::new(
1484 port,
1485 app_data_dir.join("ca_cert.pem"),
1486 app_data_dir.join("ca_key.pem"),
1487 ))
1488 }
1489
1490 pub fn with_transparent(mut self, transparent: bool) -> Self {
1491 self.transparent = transparent;
1492 self
1493 }
1494
1495 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1496 self.udp_tproxy_port = udp_tproxy_port;
1497 self
1498 }
1499}
1500
1501fn now_unix_ms() -> u64 {
1502 SystemTime::now()
1503 .duration_since(UNIX_EPOCH)
1504 .unwrap_or_default()
1505 .as_millis() as u64
1506}
1507
1508fn modification_field_names(
1509 mods: Option<&relay_core_api::modification::FlowModification>,
1510) -> Vec<&'static str> {
1511 let Some(mods) = mods else {
1512 return Vec::new();
1513 };
1514
1515 let mut fields = Vec::new();
1516 if mods.method.is_some() {
1517 fields.push("method");
1518 }
1519 if mods.url.is_some() {
1520 fields.push("url");
1521 }
1522 if mods.request_headers.is_some() {
1523 fields.push("request_headers");
1524 }
1525 if mods.request_body.is_some() {
1526 fields.push("request_body");
1527 }
1528 if mods.status_code.is_some() {
1529 fields.push("status_code");
1530 }
1531 if mods.response_headers.is_some() {
1532 fields.push("response_headers");
1533 }
1534 if mods.response_body.is_some() {
1535 fields.push("response_body");
1536 }
1537 if mods.message_content.is_some() {
1538 fields.push("message_content");
1539 }
1540 fields
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use super::*;
1546 use chrono::Utc;
1547 use relay_core_api::flow::{
1548 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1549 ResponseTiming, TransportProtocol,
1550 };
1551 use std::sync::atomic::{AtomicU64, Ordering};
1552 use tokio::time::{Duration, sleep};
1553 use url::Url;
1554 use uuid::Uuid;
1555
1556 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1557
1558 fn sqlite_url() -> String {
1559 let nanos = SystemTime::now()
1560 .duration_since(UNIX_EPOCH)
1561 .expect("clock drift")
1562 .as_nanos();
1563 let pid = std::process::id();
1564 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1565 let db_dir = std::env::current_dir()
1566 .expect("cwd")
1567 .join("target")
1568 .join("test-dbs");
1569 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1570 let db_path = db_dir.join(format!("relay-core-runtime-test-{}-{}-{}.db", pid, nanos, seq));
1571 format!("sqlite://{}?mode=rwc", db_path.display())
1572 }
1573
1574 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1575 let start_time = chrono::DateTime::<Utc>::from_timestamp_millis(ts)
1576 .expect("timestamp should be valid");
1577 let request_url =
1578 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1579 Flow {
1580 id: Uuid::new_v4(),
1581 start_time,
1582 end_time: Some(start_time),
1583 network: NetworkInfo {
1584 client_ip: "127.0.0.1".to_string(),
1585 client_port: 12000,
1586 server_ip: "127.0.0.1".to_string(),
1587 server_port: 8080,
1588 protocol: TransportProtocol::TCP,
1589 tls: false,
1590 tls_version: None,
1591 sni: None,
1592 },
1593 layer: Layer::Http(HttpLayer {
1594 request: HttpRequest {
1595 method: method.to_string(),
1596 url: request_url,
1597 version: "HTTP/1.1".to_string(),
1598 headers: vec![],
1599 cookies: vec![],
1600 query: vec![],
1601 body: None,
1602 },
1603 response: Some(HttpResponse {
1604 status,
1605 status_text: "OK".to_string(),
1606 version: "HTTP/1.1".to_string(),
1607 headers: vec![],
1608 cookies: vec![],
1609 body: None,
1610 timing: ResponseTiming {
1611 time_to_first_byte: None,
1612 time_to_last_byte: None,
1613 connect_time_ms: None,
1614 ssl_time_ms: None,
1615 },
1616 }),
1617 error: None,
1618 }),
1619 tags: vec![],
1620 meta: std::collections::HashMap::new(),
1621 }
1622 }
1623
1624 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1625 let start_time = chrono::DateTime::<Utc>::from_timestamp_millis(ts)
1626 .expect("timestamp should be valid");
1627 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1628 .expect("url should parse");
1629 Flow {
1630 id: Uuid::new_v4(),
1631 start_time,
1632 end_time: Some(start_time),
1633 network: NetworkInfo {
1634 client_ip: "127.0.0.1".to_string(),
1635 client_port: 12000,
1636 server_ip: "127.0.0.1".to_string(),
1637 server_port: 8080,
1638 protocol: TransportProtocol::TCP,
1639 tls: false,
1640 tls_version: None,
1641 sni: None,
1642 },
1643 layer: Layer::Http(HttpLayer {
1644 request: HttpRequest {
1645 method: "GET".to_string(),
1646 url: request_url,
1647 version: "HTTP/1.1".to_string(),
1648 headers: vec![
1649 ("Authorization".to_string(), "Bearer secret-token".to_string()),
1650 ("X-Normal".to_string(), "visible".to_string()),
1651 ],
1652 cookies: vec![],
1653 query: vec![
1654 ("token".to_string(), "abc123".to_string()),
1655 ("ok".to_string(), "1".to_string()),
1656 ],
1657 body: Some(BodyData {
1658 encoding: "utf-8".to_string(),
1659 content: "secret request body".to_string(),
1660 size: 19,
1661 }),
1662 },
1663 response: Some(HttpResponse {
1664 status: 200,
1665 status_text: "OK".to_string(),
1666 version: "HTTP/1.1".to_string(),
1667 headers: vec![
1668 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1669 ("X-Response".to_string(), "visible".to_string()),
1670 ],
1671 cookies: vec![],
1672 body: Some(BodyData {
1673 encoding: "utf-8".to_string(),
1674 content: "secret response body".to_string(),
1675 size: 20,
1676 }),
1677 timing: ResponseTiming {
1678 time_to_first_byte: None,
1679 time_to_last_byte: None,
1680 connect_time_ms: None,
1681 ssl_time_ms: None,
1682 },
1683 }),
1684 error: None,
1685 }),
1686 tags: vec![],
1687 meta: std::collections::HashMap::new(),
1688 }
1689 }
1690
1691 #[tokio::test]
1692 async fn set_rules_from_records_audit_event() {
1693 let state = CoreState::new(None).await;
1694
1695 state
1696 .set_rules_from(
1697 AuditActor::Http,
1698 "rule.upsert",
1699 "rule-1".to_string(),
1700 json!({ "route": "/api/v1/rules" }),
1701 Vec::new(),
1702 )
1703 .await
1704 .expect("set rules should succeed");
1705
1706 let events = state.recent_audit_events();
1707 let event = events.last().expect("audit event should exist");
1708 assert_eq!(event.actor, AuditActor::Http);
1709 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1710 assert_eq!(event.outcome, AuditOutcome::Success);
1711 assert_eq!(event.target, "rule-1");
1712 assert_eq!(event.details["operation"], "rule.upsert");
1713 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1714 }
1715
1716 #[tokio::test]
1717 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1718 let state = CoreState::new(None).await;
1719
1720 state
1721 .upsert_rule_from(
1722 AuditActor::Probe,
1723 "rule.upsert",
1724 "rule-1".to_string(),
1725 json!({ "tool": "set_rule" }),
1726 Rule {
1727 id: "rule-1".to_string(),
1728 name: "first".to_string(),
1729 active: true,
1730 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1731 priority: 1,
1732 termination: relay_core_lib::rule::RuleTermination::Continue,
1733 filter: relay_core_lib::rule::Filter::Url(
1734 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1735 ),
1736 actions: vec![],
1737 constraints: None,
1738 },
1739 )
1740 .await
1741 .expect("initial upsert should succeed");
1742
1743 state
1744 .upsert_rule_from(
1745 AuditActor::Probe,
1746 "rule.upsert",
1747 "rule-1".to_string(),
1748 json!({ "tool": "set_rule" }),
1749 Rule {
1750 id: "rule-1".to_string(),
1751 name: "second".to_string(),
1752 active: true,
1753 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1754 priority: 2,
1755 termination: relay_core_lib::rule::RuleTermination::Continue,
1756 filter: relay_core_lib::rule::Filter::Url(
1757 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1758 ),
1759 actions: vec![],
1760 constraints: None,
1761 },
1762 )
1763 .await
1764 .expect("replacement upsert should succeed");
1765
1766 let rules = state.get_rules().await;
1767 assert_eq!(rules.len(), 1);
1768 assert_eq!(rules[0].name, "second");
1769 let event = state.recent_audit_events().last().cloned().expect("audit event");
1770 assert_eq!(event.details["operation"], "rule.upsert");
1771 assert_eq!(event.details["details"]["tool"], "set_rule");
1772 }
1773
1774 #[tokio::test]
1775 async fn delete_rule_from_returns_false_when_rule_missing() {
1776 let state = CoreState::new(None).await;
1777
1778 let deleted = state
1779 .delete_rule_from(
1780 AuditActor::Http,
1781 "rule.delete",
1782 "missing".to_string(),
1783 json!({ "route": "/api/v1/rules/{id}" }),
1784 "missing",
1785 )
1786 .await
1787 .expect("delete should not fail");
1788
1789 assert!(!deleted);
1790 assert!(state.recent_audit_events().is_empty());
1791 }
1792
1793 #[tokio::test]
1794 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1795 let state = CoreState::new(None).await;
1796
1797 let rule_id = state
1798 .create_mock_response_rule_from(
1799 AuditActor::Http,
1800 "api-mock-1".to_string(),
1801 json!({ "route": "/api/v1/mock", "status": 201 }),
1802 MockResponseRuleConfig {
1803 rule_id: "api-mock-1".to_string(),
1804 url_pattern: "example.com".to_string(),
1805 name: "api-mock:example.com".to_string(),
1806 status: 201,
1807 content_type: "application/json".to_string(),
1808 body: "{\"ok\":true}".to_string(),
1809 },
1810 )
1811 .await
1812 .expect("mock rule should be created");
1813
1814 assert_eq!(rule_id, "api-mock-1");
1815 let rules = state.get_rules().await;
1816 assert_eq!(rules.len(), 1);
1817 assert_eq!(rules[0].id, "api-mock-1");
1818 let event = state.recent_audit_events().last().cloned().expect("audit event");
1819 assert_eq!(event.details["operation"], "rule.mock_create");
1820 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1821 }
1822
1823 #[tokio::test]
1824 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1825 let state = CoreState::new(None).await;
1826
1827 let rule_id = state
1828 .create_intercept_rule_from(
1829 AuditActor::Http,
1830 "intercept-1".to_string(),
1831 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1832 InterceptRuleConfig {
1833 rule_id: "intercept-1".to_string(),
1834 active: true,
1835 url_pattern: "example.com".to_string(),
1836 method: None,
1837 phase: "request".to_string(),
1838 name: "api-intercept:example.com".to_string(),
1839 priority: 100,
1840 termination: relay_core_lib::rule::RuleTermination::Stop,
1841 },
1842 )
1843 .await
1844 .expect("intercept rule should be created");
1845
1846 assert_eq!(rule_id, "intercept-1");
1847 let rules = state.get_rules().await;
1848 assert_eq!(rules.len(), 1);
1849 assert_eq!(rules[0].id, "intercept-1");
1850 assert_eq!(rules[0].name, "api-intercept:example.com");
1851 let event = state.recent_audit_events().last().cloned().expect("audit event");
1852 assert_eq!(event.details["operation"], "rule.intercept_create");
1853 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
1854 }
1855
1856 #[tokio::test]
1857 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
1858 let state = CoreState::new(None).await;
1859
1860 state
1861 .upsert_legacy_intercept_rule_from(
1862 AuditActor::Tauri,
1863 "legacy-1".to_string(),
1864 json!({ "command": "set_intercept_rule" }),
1865 InterceptRule {
1866 id: "legacy-1".to_string(),
1867 active: true,
1868 url_pattern: "example.com".to_string(),
1869 method: None,
1870 phase: "both".to_string(),
1871 },
1872 )
1873 .await
1874 .expect("initial family upsert should succeed");
1875 assert_eq!(state.get_rules().await.len(), 2);
1876
1877 state
1878 .upsert_legacy_intercept_rule_from(
1879 AuditActor::Tauri,
1880 "legacy-1".to_string(),
1881 json!({ "command": "set_intercept_rule" }),
1882 InterceptRule {
1883 id: "legacy-1".to_string(),
1884 active: true,
1885 url_pattern: "example.org".to_string(),
1886 method: Some("POST".to_string()),
1887 phase: "request".to_string(),
1888 },
1889 )
1890 .await
1891 .expect("replacement family upsert should succeed");
1892
1893 let rules = state.get_rules().await;
1894 assert_eq!(rules.len(), 1);
1895 assert_eq!(rules[0].id, "legacy-1");
1896 let event = state.recent_audit_events().last().cloned().expect("audit event");
1897 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
1898 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
1899 }
1900
1901 #[tokio::test]
1902 async fn resolve_intercept_failure_records_failed_audit_event() {
1903 let state = CoreState::new(None).await;
1904
1905 let result = state
1906 .resolve_intercept_with_modifications_from(
1907 AuditActor::Probe,
1908 "missing-flow:request".to_string(),
1909 "drop",
1910 None,
1911 )
1912 .await;
1913
1914 assert!(result.is_err());
1915 let events = state.recent_audit_events();
1916 let event = events.last().expect("audit event should exist");
1917 assert_eq!(event.actor, AuditActor::Probe);
1918 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
1919 assert_eq!(event.outcome, AuditOutcome::Failed);
1920 assert_eq!(event.details["action"], "drop");
1921 assert!(event.details["error"].as_str().unwrap_or_default().contains("Interception not found"));
1922 }
1923
1924 #[tokio::test]
1925 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
1926 let state = CoreState::new(None).await;
1927 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1928
1929 state
1930 .prepare_start(8080, shutdown_tx)
1931 .expect("prepare start should succeed");
1932 let lifecycle = state.lifecycle();
1933 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
1934 assert_eq!(lifecycle.port, Some(8080));
1935 assert!(lifecycle.started_at_ms.is_none());
1936 assert!(lifecycle.last_error.is_none());
1937
1938 assert_eq!(
1939 state.stop_proxy().expect("stop should succeed"),
1940 ProxyStopResult::Stopping
1941 );
1942 let lifecycle = state.lifecycle();
1943 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
1944 assert_eq!(lifecycle.port, Some(8080));
1945 assert!(shutdown_rx.await.is_ok());
1946 }
1947
1948 #[tokio::test]
1949 async fn status_snapshot_derives_runtime_facing_fields() {
1950 let state = CoreState::new(None).await;
1951 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
1952 state
1953 .prepare_start(8080, shutdown_tx)
1954 .expect("prepare start should succeed");
1955
1956 let status = state.status_snapshot();
1957 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
1958 assert!(status.running);
1959 assert_eq!(status.port, Some(8080));
1960 assert!(status.uptime.is_none());
1961 assert!(status.last_error.is_none());
1962 }
1963
1964 #[tokio::test]
1965 async fn status_report_combines_status_and_metrics() {
1966 let state = CoreState::new(None).await;
1967 let report = state.status_report().await;
1968
1969 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
1970 assert!(!report.status.running);
1971 assert_eq!(report.metrics.intercepts_pending, 0);
1972 assert_eq!(report.metrics.ws_pending_messages, 0);
1973 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
1974 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
1975 assert_eq!(report.metrics.audit_events_total, 0);
1976 assert_eq!(report.metrics.audit_events_failed, 0);
1977 assert_eq!(report.metrics.flow_events_lagged_total, 0);
1978 assert_eq!(report.metrics.audit_events_lagged_total, 0);
1979 }
1980
1981 #[test]
1982 fn proxy_config_new_and_transport_setters_preserve_values() {
1983 let config = ProxyConfig::new(
1984 8080,
1985 std::path::PathBuf::from("/tmp/ca_cert.pem"),
1986 std::path::PathBuf::from("/tmp/ca_key.pem"),
1987 )
1988 .with_transparent(true)
1989 .with_udp_tproxy_port(Some(15000));
1990
1991 assert_eq!(config.port, 8080);
1992 assert_eq!(config.ca_cert_path, std::path::PathBuf::from("/tmp/ca_cert.pem"));
1993 assert_eq!(config.ca_key_path, std::path::PathBuf::from("/tmp/ca_key.pem"));
1994 assert!(config.transparent);
1995 assert_eq!(config.udp_tproxy_port, Some(15000));
1996 }
1997
1998 #[test]
1999 fn proxy_config_from_app_data_dir_creates_default_paths() {
2000 let unique = SystemTime::now()
2001 .duration_since(UNIX_EPOCH)
2002 .expect("clock drift")
2003 .as_nanos();
2004 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2005
2006 let config = ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2007
2008 assert!(dir.exists());
2009 assert_eq!(config.port, 8899);
2010 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2011 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2012 assert!(!config.transparent);
2013 assert!(config.udp_tproxy_port.is_none());
2014 }
2015
2016 #[tokio::test]
2017 async fn intercept_snapshot_maps_pending_counts() {
2018 let state = CoreState::new(None).await;
2019 let snapshot = state.intercept_snapshot().await;
2020
2021 assert_eq!(snapshot.pending_count, 0);
2022 assert_eq!(snapshot.ws_pending_count, 0);
2023 }
2024
2025 #[tokio::test]
2026 async fn audit_snapshot_returns_latest_events_in_order() {
2027 let state = CoreState::new(None).await;
2028 state.record_audit_event(AuditEvent::new(
2029 AuditActor::Runtime,
2030 AuditEventKind::RuleChanged,
2031 "first",
2032 AuditOutcome::Success,
2033 json!({ "index": 1 }),
2034 ));
2035 state.record_audit_event(AuditEvent::new(
2036 AuditActor::Http,
2037 AuditEventKind::PolicyUpdated,
2038 "second",
2039 AuditOutcome::Success,
2040 json!({ "index": 2 }),
2041 ));
2042
2043 let snapshot = state.audit_snapshot(1);
2044
2045 assert_eq!(snapshot.events.len(), 1);
2046 assert_eq!(snapshot.events[0].target, "second");
2047 assert_eq!(snapshot.events[0].details["index"], 2);
2048 }
2049
2050 #[tokio::test]
2051 async fn query_audit_snapshot_filters_in_memory_events() {
2052 let state = CoreState::new(None).await;
2053 state.record_audit_event(AuditEvent::new(
2054 AuditActor::Http,
2055 AuditEventKind::RuleChanged,
2056 "rule-1",
2057 AuditOutcome::Success,
2058 json!({ "idx": 1 }),
2059 ));
2060 state.record_audit_event(AuditEvent::new(
2061 AuditActor::Probe,
2062 AuditEventKind::PolicyUpdated,
2063 "policy",
2064 AuditOutcome::Failed,
2065 json!({ "idx": 2 }),
2066 ));
2067
2068 let snapshot = state
2069 .query_audit_snapshot(CoreAuditQuery {
2070 actor: Some(AuditActor::Probe),
2071 kind: Some(AuditEventKind::PolicyUpdated),
2072 outcome: Some(AuditOutcome::Failed),
2073 limit: 10,
2074 ..Default::default()
2075 })
2076 .await;
2077
2078 assert_eq!(snapshot.events.len(), 1);
2079 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2080 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2081 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2082 }
2083
2084 #[tokio::test]
2085 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2086 let state = CoreState::new(Some(sqlite_url())).await;
2087 state.update_policy_from(
2088 AuditActor::Http,
2089 "policy".to_string(),
2090 ProxyPolicy {
2091 transparent_enabled: true,
2092 ..Default::default()
2093 },
2094 );
2095
2096 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2097 for _ in 0..10 {
2098 snapshot = state
2099 .query_audit_snapshot(CoreAuditQuery {
2100 actor: Some(AuditActor::Http),
2101 kind: Some(AuditEventKind::PolicyUpdated),
2102 limit: 10,
2103 ..Default::default()
2104 })
2105 .await;
2106 if !snapshot.events.is_empty() {
2107 break;
2108 }
2109 sleep(Duration::from_millis(20)).await;
2110 }
2111
2112 assert!(!snapshot.events.is_empty());
2113 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2114 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2115 }
2116
2117 #[tokio::test]
2118 async fn prepare_start_rejects_second_active_start() {
2119 let state = CoreState::new(None).await;
2120 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2121 state
2122 .prepare_start(8080, shutdown_tx)
2123 .expect("first start should succeed");
2124
2125 let (second_tx, _second_rx) = oneshot::channel();
2126 let error = state
2127 .prepare_start(8081, second_tx)
2128 .expect_err("second active start should be rejected");
2129 assert!(error.contains("already"));
2130 }
2131
2132 #[test]
2133 fn update_policy_records_audit_event() {
2134 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2135 let state = runtime.block_on(CoreState::new(None));
2136
2137 state.update_policy_from(
2138 AuditActor::Runtime,
2139 "policy".to_string(),
2140 ProxyPolicy {
2141 transparent_enabled: true,
2142 ..Default::default()
2143 },
2144 );
2145
2146 let events = state.recent_audit_events();
2147 let event = events.last().expect("audit event should exist");
2148 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2149 assert_eq!(event.outcome, AuditOutcome::Success);
2150 assert_eq!(event.details["transparent_enabled"], true);
2151 }
2152
2153 #[test]
2154 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2155 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2156 let state = runtime.block_on(CoreState::new(None));
2157 let original_timeout = state.policy_snapshot().request_timeout_ms;
2158
2159 state.patch_policy_from(
2160 AuditActor::Runtime,
2161 "policy.patch".to_string(),
2162 relay_core_api::policy::ProxyPolicyPatch {
2163 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2164 enabled: Some(true),
2165 redact_bodies: Some(true),
2166 ..Default::default()
2167 }),
2168 },
2169 );
2170
2171 let policy = state.policy_snapshot();
2172 assert_eq!(policy.request_timeout_ms, original_timeout);
2173 assert!(policy.redaction.enabled);
2174 assert!(policy.redaction.redact_bodies);
2175
2176 let events = state.recent_audit_events();
2177 let event = events.last().expect("audit event should exist");
2178 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2179 assert_eq!(event.details["redaction_enabled"], true);
2180 }
2181
2182 #[tokio::test]
2183 async fn metrics_include_audit_and_lagged_event_counters() {
2184 let state = CoreState::new(None).await;
2185
2186 state.update_policy_from(
2187 AuditActor::Runtime,
2188 "policy".to_string(),
2189 ProxyPolicy::default(),
2190 );
2191 let _ = state
2192 .resolve_intercept_with_modifications_from(
2193 AuditActor::Probe,
2194 "missing-flow:request".to_string(),
2195 "drop",
2196 None,
2197 )
2198 .await;
2199
2200 state.record_flow_events_lagged(3);
2201 state.record_audit_events_lagged(5);
2202
2203 let metrics = state.get_metrics().await;
2204 assert_eq!(metrics.audit_events_total, 2);
2205 assert_eq!(metrics.audit_events_failed, 1);
2206 assert_eq!(metrics.flow_events_lagged_total, 3);
2207 assert_eq!(metrics.audit_events_lagged_total, 5);
2208 }
2209
2210 #[tokio::test]
2211 async fn prometheus_metrics_text_contains_observability_fields() {
2212 let state = CoreState::new(None).await;
2213 state.record_flow_events_lagged(2);
2214 state.record_audit_events_lagged(4);
2215
2216 let text = state.get_metrics_prometheus_text().await;
2217 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2218 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2219 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2220 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2221 }
2222
2223 #[tokio::test]
2224 async fn search_flows_uses_store_with_offset_pagination() {
2225 let state = CoreState::new(Some(sqlite_url())).await;
2226 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2227 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2228 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2229
2230 state.upsert_flow(Box::new(flow_a));
2231 state.upsert_flow(Box::new(flow_b));
2232 state.upsert_flow(Box::new(flow_c));
2233
2234 let mut baseline = Vec::new();
2235 for _ in 0..20 {
2236 baseline = state
2237 .search_flows(FlowQuery {
2238 host: Some("api.example.com".to_string()),
2239 path_contains: None,
2240 method: None,
2241 status_min: None,
2242 status_max: None,
2243 has_error: None,
2244 is_websocket: None,
2245 limit: Some(3),
2246 offset: Some(0),
2247 })
2248 .await;
2249 if baseline.len() == 3 {
2250 break;
2251 }
2252 sleep(Duration::from_millis(20)).await;
2253 }
2254
2255 assert_eq!(baseline.len(), 3);
2256 let page = state
2257 .search_flows(FlowQuery {
2258 host: Some("api.example.com".to_string()),
2259 path_contains: None,
2260 method: None,
2261 status_min: None,
2262 status_max: None,
2263 has_error: None,
2264 is_websocket: None,
2265 limit: Some(1),
2266 offset: Some(1),
2267 })
2268 .await;
2269 assert_eq!(page.len(), 1);
2270 assert_eq!(page[0].id, baseline[1].id);
2271 }
2272
2273 #[tokio::test]
2274 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2275 let state = CoreState::new(Some(sqlite_url())).await;
2276 let first_flow = sample_http_flow("persist.example.com", "/first", "GET", 200, 1_700_000_010_000);
2277 let first_id = first_flow.id.to_string();
2278 state.upsert_flow(Box::new(first_flow));
2279 for i in 0..240 {
2280 state.upsert_flow(Box::new(sample_http_flow(
2281 "persist.example.com",
2282 &format!("/{}", i),
2283 "GET",
2284 200,
2285 1_700_000_020_000 + i,
2286 )));
2287 }
2288
2289 sleep(Duration::from_millis(200)).await;
2290
2291 let loaded = state.get_flow(first_id).await;
2292 assert!(loaded.is_some());
2293 }
2294
2295 #[tokio::test]
2296 async fn search_flows_redacts_summary_url_when_enabled() {
2297 let state = CoreState::new(None).await;
2298 state.update_policy_from(
2299 AuditActor::Runtime,
2300 "policy.redaction".to_string(),
2301 ProxyPolicy {
2302 redaction: RedactionPolicy {
2303 enabled: true,
2304 sensitive_query_keys: vec!["token".to_string()],
2305 redact_bodies: false,
2306 ..Default::default()
2307 },
2308 ..Default::default()
2309 },
2310 );
2311 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2312
2313 let mut items = Vec::new();
2314 for _ in 0..20 {
2315 items = state
2316 .search_flows(FlowQuery {
2317 host: Some("api.example.com".to_string()),
2318 path_contains: Some("/private".to_string()),
2319 ..Default::default()
2320 })
2321 .await;
2322 if !items.is_empty() {
2323 break;
2324 }
2325 sleep(Duration::from_millis(20)).await;
2326 }
2327
2328 assert!(!items.is_empty());
2329 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2330 let token = redacted
2331 .query_pairs()
2332 .find(|(k, _)| k == "token")
2333 .map(|(_, v)| v.to_string());
2334 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2335 }
2336
2337 #[tokio::test]
2338 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2339 let state = CoreState::new(Some(sqlite_url())).await;
2340 state.update_policy_from(
2341 AuditActor::Runtime,
2342 "policy.redaction".to_string(),
2343 ProxyPolicy {
2344 redaction: RedactionPolicy {
2345 enabled: true,
2346 sensitive_header_names: vec![
2347 "authorization".to_string(),
2348 "set-cookie".to_string(),
2349 ],
2350 sensitive_query_keys: vec!["token".to_string()],
2351 redact_bodies: true,
2352 },
2353 ..Default::default()
2354 },
2355 );
2356
2357 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2358 let flow_id = flow.id.to_string();
2359 state.upsert_flow(Box::new(flow));
2360 sleep(Duration::from_millis(80)).await;
2361
2362 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2363 let Layer::Http(http) = loaded.layer else {
2364 panic!("expected http layer");
2365 };
2366
2367 let auth = http
2368 .request
2369 .headers
2370 .iter()
2371 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2372 .map(|(_, v)| v.as_str());
2373 assert_eq!(auth, Some("[REDACTED]"));
2374
2375 let req_query_token = http
2376 .request
2377 .query
2378 .iter()
2379 .find(|(k, _)| k == "token")
2380 .map(|(_, v)| v.as_str());
2381 assert_eq!(req_query_token, Some("[REDACTED]"));
2382
2383 let req_body = http
2384 .request
2385 .body
2386 .as_ref()
2387 .map(|b| b.content.as_str());
2388 assert_eq!(req_body, Some("[REDACTED]"));
2389
2390 let response = http.response.expect("response should exist");
2391 let set_cookie = response
2392 .headers
2393 .iter()
2394 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2395 .map(|(_, v)| v.as_str());
2396 assert_eq!(set_cookie, Some("[REDACTED]"));
2397 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2398 assert_eq!(res_body, Some("[REDACTED]"));
2399 }
2400
2401 #[test]
2402 fn redact_flow_update_masks_http_body_when_enabled() {
2403 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2404 let state = runtime.block_on(CoreState::new(None));
2405 state.update_policy_from(
2406 AuditActor::Runtime,
2407 "policy.redaction".to_string(),
2408 ProxyPolicy {
2409 redaction: RedactionPolicy {
2410 enabled: true,
2411 redact_bodies: true,
2412 ..Default::default()
2413 },
2414 ..Default::default()
2415 },
2416 );
2417
2418 let update = FlowUpdate::HttpBody {
2419 flow_id: "f-1".to_string(),
2420 direction: relay_core_api::flow::Direction::ClientToServer,
2421 body: BodyData {
2422 encoding: "utf-8".to_string(),
2423 content: "super-secret".to_string(),
2424 size: 12,
2425 },
2426 };
2427 let redacted = state.redact_flow_update_for_output(update);
2428 match redacted {
2429 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2430 _ => panic!("expected http body update"),
2431 }
2432 }
2433
2434 #[cfg(feature = "script")]
2435 #[tokio::test]
2436 async fn load_script_from_records_audit_event() {
2437 let state = CoreState::new(None).await;
2438
2439 state
2440 .load_script_from(
2441 AuditActor::Tauri,
2442 "tauri.load_script".to_string(),
2443 "globalThis.onRequestHeaders = (_flow) => {};",
2444 )
2445 .await
2446 .expect("script should load");
2447
2448 let events = state.recent_audit_events();
2449 let event = events.last().expect("audit event should exist");
2450 assert_eq!(event.actor, AuditActor::Tauri);
2451 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2452 assert_eq!(event.outcome, AuditOutcome::Success);
2453 assert_eq!(event.target, "tauri.load_script");
2454 }
2455}