1use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::collections::{BTreeSet, HashSet, VecDeque};
26use std::sync::Mutex;
27use std::time::{SystemTime, UNIX_EPOCH};
28use tokio::sync::{broadcast, mpsc, oneshot, watch};
29use relay_core_api::flow::{Flow, WebSocketMessage, FlowUpdate, BodyData, Direction, Layer};
30#[cfg(feature = "script")]
31use relay_core_script::ScriptInterceptor;
32use relay_core_lib::interceptor::{Interceptor, CompositeInterceptor};
33use relay_core_lib::tls::CertificateAuthority;
34use relay_core_lib::capture::{TcpCaptureSource, TransparentTcpCaptureSource, OriginalDstProvider};
35use relay_core_lib::capture::udp::UdpProxy;
36#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
37use relay_core_lib::capture::LinuxOriginalDstProvider;
38#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
39use relay_core_lib::capture::MacOsOriginalDstProvider;
40#[cfg(target_os = "windows")]
41use relay_core_lib::capture::WindowsOriginalDstProvider;
42use tokio::net::TcpListener;
43use std::net::SocketAddr;
44use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
45
46use tracing::error;
47
48use relay_core_lib::rule::Rule;
49use relay_core_lib::rule::engine::RuleEngine;
50use crate::rule::{
51 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
52 build_mock_response_rule,
53};
54use relay_core_storage::store::{AuditEventRecord, Store};
55use relay_core_api::modification::{FlowQuery, FlowSummary};
56use serde_json::json;
57use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
58
59pub mod audit;
60pub mod rule;
61pub mod actors;
62pub mod interceptors;
63pub mod modification;
64pub mod services;
65
66pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77 pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82 pub flows_total: usize,
83 pub flows_in_memory: usize,
84 pub flows_dropped: usize,
85 pub intercepts_pending: usize,
86 pub ws_pending_messages: usize,
87 pub oldest_intercept_age_ms: Option<u64>,
88 pub oldest_ws_message_age_ms: Option<u64>,
89 pub rule_exec_errors: usize,
90 pub audit_events_total: usize,
91 pub audit_events_failed: usize,
92 pub flow_events_lagged_total: usize,
93 pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97 pub fn to_prometheus_text(&self) -> String {
98 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100 format!(
101 "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113 self.flows_total,
114 self.flows_in_memory,
115 self.flows_dropped,
116 self.intercepts_pending,
117 self.ws_pending_messages,
118 oldest_intercept_age_ms,
119 oldest_ws_message_age_ms,
120 self.rule_exec_errors,
121 self.audit_events_total,
122 self.audit_events_failed,
123 self.flow_events_lagged_total,
124 self.audit_events_lagged_total,
125 )
126 }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131 pub phase: RuntimeLifecyclePhase,
132 pub running: bool,
133 pub port: Option<u16>,
134 pub uptime: Option<u64>,
135 pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140 pub status: CoreStatusSnapshot,
141 pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146 pub pending_count: usize,
147 pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152 pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157 pub since_ms: Option<u64>,
158 pub until_ms: Option<u64>,
159 pub actor: Option<AuditActor>,
160 pub kind: Option<AuditEventKind>,
161 pub outcome: Option<AuditOutcome>,
162 pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166 fn default() -> Self {
167 Self {
168 since_ms: None,
169 until_ms: None,
170 actor: None,
171 kind: None,
172 outcome: None,
173 limit: 50,
174 }
175 }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181 Created,
182 Starting,
183 Running,
184 Stopping,
185 Stopped,
186 Failed,
187}
188
189impl RuntimeLifecyclePhase {
190 pub fn as_str(&self) -> &'static str {
191 match self {
192 Self::Created => "created",
193 Self::Starting => "starting",
194 Self::Running => "running",
195 Self::Stopping => "stopping",
196 Self::Stopped => "stopped",
197 Self::Failed => "failed",
198 }
199 }
200
201 pub fn is_active(&self) -> bool {
202 matches!(self, Self::Starting | Self::Running | Self::Stopping)
203 }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208 pub phase: RuntimeLifecyclePhase,
209 pub port: Option<u16>,
210 pub started_at_ms: Option<u64>,
211 pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215 pub fn created() -> Self {
216 Self {
217 phase: RuntimeLifecyclePhase::Created,
218 port: None,
219 started_at_ms: None,
220 last_error: None,
221 }
222 }
223
224 pub fn is_active(&self) -> bool {
225 self.phase.is_active()
226 }
227
228 pub fn uptime_seconds(&self) -> Option<u64> {
229 let started_at_ms = self.started_at_ms?;
230 let now_ms = now_unix_ms();
231 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232 }
233}
234
235pub enum ProxySpawnResult {
236 Started(tokio::task::JoinHandle<()>),
237 AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242 Stopping,
243 NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247 fn from(lifecycle: RuntimeLifecycle) -> Self {
248 Self {
249 running: lifecycle.is_active(),
250 uptime: lifecycle.uptime_seconds(),
251 port: lifecycle.port,
252 last_error: lifecycle.last_error,
253 phase: lifecycle.phase,
254 }
255 }
256}
257
258pub struct CoreState {
259 flow_store: mpsc::Sender<FlowStoreMessage>,
260 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261 rule_store: mpsc::Sender<RuleStoreMessage>,
262 store: Option<Store>,
263 #[cfg(feature = "script")]
264 pub script_interceptor: Arc<ScriptInterceptor>,
265 pub policy_tx: watch::Sender<ProxyPolicy>,
266 pub flows_dropped: Arc<AtomicUsize>,
267 audit_events_total: Arc<AtomicUsize>,
268 audit_events_failed: Arc<AtomicUsize>,
269 flow_events_lagged_total: Arc<AtomicUsize>,
270 audit_events_lagged_total: Arc<AtomicUsize>,
271 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280 pub async fn new(db_url: Option<String>) -> Self {
281 const AUDIT_HISTORY_LIMIT: usize = 200;
282
283 let store = if let Some(url) = db_url {
284 match Store::connect(&url).await {
285 Ok(s) => {
286 if let Err(e) = s.init().await {
287 tracing::error!("Failed to init store: {}", e);
288 }
289 Some(s)
290 },
291 Err(e) => {
292 tracing::error!("Failed to connect to store: {}", e);
293 None
294 }
295 }
296 } else {
297 None
298 };
299
300 let (flow_tx, flow_rx) = mpsc::channel(10000);
301 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302 tokio::spawn(flow_actor.run());
303
304 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306 tokio::spawn(intercept_actor.run());
307
308 let (rule_tx, rule_rx) = mpsc::channel(100);
309 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310 tokio::spawn(rule_actor.run());
311
312 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313 let (flow_broadcast_tx, _) = broadcast::channel(1000);
314 let (audit_broadcast_tx, _) = broadcast::channel(256);
315 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317 #[cfg(feature = "script")]
318 let script_interceptor = ScriptInterceptor::new().await.expect("Failed to initialize ScriptInterceptor");
319 Self {
320 flow_store: flow_tx,
321 intercept_broker: intercept_tx,
322 rule_store: rule_tx,
323 store,
324 #[cfg(feature = "script")]
325 script_interceptor: Arc::new(script_interceptor),
326 policy_tx,
327 flows_dropped: Arc::new(AtomicUsize::new(0)),
328 audit_events_total: Arc::new(AtomicUsize::new(0)),
329 audit_events_failed: Arc::new(AtomicUsize::new(0)),
330 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
331 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
332 flow_broadcast_tx,
333 audit_broadcast_tx,
334 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
335 lifecycle_tx,
336 shutdown_tx: Mutex::new(None),
337 }
338 }
339
340 pub async fn get_metrics(&self) -> CoreMetrics {
341 let (flow_tx, flow_rx) = oneshot::channel();
342 let _ = self.flow_store.send(FlowStoreMessage::GetMetrics(flow_tx)).await;
343 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
344
345 let (int_tx, int_rx) = oneshot::channel();
346 let _ = self.intercept_broker.send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx }).await;
347 let (intercepts_pending, ws_pending_messages, oldest_intercept_age_ms, oldest_ws_message_age_ms) =
348 int_rx.await.unwrap_or((0, 0, None, None));
349
350 let (rule_tx, rule_rx) = oneshot::channel();
351 let _ = self.rule_store.send(RuleStoreMessage::GetMetrics(rule_tx)).await;
352 let rule_exec_errors = rule_rx.await.unwrap_or(0);
353
354 CoreMetrics {
355 flows_total,
356 flows_in_memory,
357 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
358 intercepts_pending,
359 ws_pending_messages,
360 oldest_intercept_age_ms,
361 oldest_ws_message_age_ms,
362 rule_exec_errors,
363 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
364 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
365 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
366 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
367 }
368 }
369
370 pub async fn get_metrics_prometheus_text(&self) -> String {
371 self.get_metrics().await.to_prometheus_text()
372 }
373
374 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
375 self.lifecycle().into()
376 }
377
378 pub async fn status_report(&self) -> CoreStatusReport {
379 CoreStatusReport {
380 status: self.status_snapshot(),
381 metrics: self.get_metrics().await,
382 }
383 }
384
385 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
386 let metrics = self.get_metrics().await;
387 CoreInterceptSnapshot {
388 pending_count: metrics.intercepts_pending,
389 ws_pending_count: metrics.ws_pending_messages,
390 }
391 }
392
393 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
394 let events = self.recent_audit_events();
395 let start = events.len().saturating_sub(limit);
396 CoreAuditSnapshot {
397 events: events.into_iter().skip(start).collect(),
398 }
399 }
400
401 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
402 let limit = query.limit.clamp(1, 500);
403 if let Some(store) = &self.store {
404 let rows = store
405 .query_audit_events(
406 query.since_ms,
407 query.until_ms,
408 query.actor.as_ref().map(AuditActor::as_str),
409 query.kind.as_ref().map(AuditEventKind::as_str),
410 query.outcome.as_ref().map(AuditOutcome::as_str),
411 limit,
412 )
413 .await
414 .unwrap_or_default();
415
416 let mut events = Vec::with_capacity(rows.len());
417 for row in rows {
418 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
419 events.push(event);
420 }
421 }
422 return CoreAuditSnapshot { events };
423 }
424
425 let mut events = self.recent_audit_events();
426 events.reverse();
427 let filtered = events
428 .into_iter()
429 .filter(|event| {
430 query
431 .since_ms
432 .map(|v| event.timestamp_ms >= v)
433 .unwrap_or(true)
434 })
435 .filter(|event| {
436 query
437 .until_ms
438 .map(|v| event.timestamp_ms <= v)
439 .unwrap_or(true)
440 })
441 .filter(|event| {
442 query
443 .actor
444 .as_ref()
445 .map(|v| &event.actor == v)
446 .unwrap_or(true)
447 })
448 .filter(|event| query.kind.as_ref().map(|v| &event.kind == v).unwrap_or(true))
449 .filter(|event| {
450 query
451 .outcome
452 .as_ref()
453 .map(|v| &event.outcome == v)
454 .unwrap_or(true)
455 })
456 .take(limit)
457 .collect();
458 CoreAuditSnapshot { events: filtered }
459 }
460
461 pub async fn get_flow(&self, id: String) -> Option<Flow> {
462 let (tx, rx) = oneshot::channel();
463 if let Err(e) = self.flow_store.send(FlowStoreMessage::GetFlow { id: id.clone(), respond_to: tx }).await {
464 error!("Failed to send GetFlow request: {}", e);
465 if let Some(store) = &self.store {
466 return store
467 .load_flow(&id)
468 .await
469 .ok()
470 .flatten()
471 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
472 }
473 return None;
474 }
475 let flow = rx.await.map_err(|e| {
476 error!("Failed to receive Flow response: {}", e);
477 e
478 }).unwrap_or(None);
479 if let Some(flow) = flow {
480 return Some(redact_flow(flow, &self.current_redaction_policy()));
481 }
482 if let Some(store) = &self.store {
483 return store
484 .load_flow(&id)
485 .await
486 .ok()
487 .flatten()
488 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
489 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
490 }
491 None
492 }
493
494 pub async fn get_rules(&self) -> Vec<Rule> {
495 let (tx, rx) = oneshot::channel();
496 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
497 error!("Failed to send GetRules request: {}", e);
498 return Vec::new();
499 }
500 rx.await.map_err(|e| {
501 error!("Failed to receive Rules response: {}", e);
502 e
503 }).unwrap_or_default()
504 }
505
506 pub async fn set_rules(&self, rules: Vec<Rule>) {
507 let _ = self
508 .set_rules_from(
509 AuditActor::Runtime,
510 "rules.replace",
511 "rule_set".to_string(),
512 json!({}),
513 rules,
514 )
515 .await;
516 }
517
518 pub async fn upsert_rule_from(
519 &self,
520 actor: AuditActor,
521 operation: &str,
522 target: String,
523 details: serde_json::Value,
524 rule: Rule,
525 ) -> Result<(), String> {
526 let rule_id = rule.id.clone();
527 let mut rules = self.get_rules().await;
528 rules.retain(|existing| existing.id != rule_id);
529 rules.push(rule);
530 self.set_rules_from(actor, operation, target, details, rules).await
531 }
532
533 pub async fn delete_rule_from(
534 &self,
535 actor: AuditActor,
536 operation: &str,
537 target: String,
538 details: serde_json::Value,
539 rule_id: &str,
540 ) -> Result<bool, String> {
541 let mut rules = self.get_rules().await;
542 let before = rules.len();
543 rules.retain(|rule| rule.id != rule_id);
544 if rules.len() == before {
545 return Ok(false);
546 }
547 self.set_rules_from(actor, operation, target, details, rules)
548 .await
549 .map(|_| true)
550 }
551
552 pub async fn create_mock_response_rule_from(
553 &self,
554 actor: AuditActor,
555 target: String,
556 details: serde_json::Value,
557 config: MockResponseRuleConfig,
558 ) -> Result<String, String> {
559 let rule_id = config.rule_id.clone();
560 let rule = build_mock_response_rule(config);
561 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
562 .await
563 .map(|_| rule_id)
564 }
565
566 pub async fn create_intercept_rule_from(
567 &self,
568 actor: AuditActor,
569 target: String,
570 details: serde_json::Value,
571 config: InterceptRuleConfig,
572 ) -> Result<String, String> {
573 let rule_id = config.rule_id.clone();
574 let mut rules = self.get_rules().await;
575 rules.extend(build_intercept_rules(config));
576 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
577 .await
578 .map(|_| rule_id)
579 }
580
581 pub async fn upsert_legacy_intercept_rule_from(
582 &self,
583 actor: AuditActor,
584 target: String,
585 details: serde_json::Value,
586 rule: InterceptRule,
587 ) -> Result<(), String> {
588 let rule_id = rule.id.clone();
589 let mut rules = self.get_rules().await;
590 rules.retain(|existing| existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id)));
591 rules.extend(rule.to_rules());
592 self.set_rules_from(actor, "rule.intercept_legacy_upsert", target, details, rules)
593 .await
594 }
595
596 pub async fn set_rules_from(
597 &self,
598 actor: AuditActor,
599 operation: &str,
600 target: String,
601 details: serde_json::Value,
602 rules: Vec<Rule>,
603 ) -> Result<(), String> {
604 let rule_count = rules.len();
605 if let Err(e) = self.rule_store.send(RuleStoreMessage::SetRules(rules)).await {
606 error!("Failed to send SetRules request: {}", e);
607 self.record_audit_event(AuditEvent::new(
608 actor,
609 AuditEventKind::RuleChanged,
610 target,
611 AuditOutcome::Failed,
612 json!({
613 "operation": operation,
614 "rule_count": rule_count,
615 "details": details,
616 "error": e.to_string()
617 }),
618 ));
619 return Err(e.to_string());
620 }
621
622 self.record_audit_event(AuditEvent::new(
623 actor,
624 AuditEventKind::RuleChanged,
625 target,
626 AuditOutcome::Success,
627 json!({
628 "operation": operation,
629 "rule_count": rule_count,
630 "details": details
631 }),
632 ));
633 Ok(())
634 }
635
636 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
637 let mut new_rules = Vec::new();
638 for rule in rules {
639 new_rules.extend(rule.to_rules());
640 }
641 self.set_rules(new_rules).await;
642 }
643
644 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
645 let (tx, rx) = oneshot::channel();
646 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRuleEngine(tx)).await {
647 error!("Failed to send GetRuleEngine request: {}", e);
648 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
649 }
650 rx.await.map_err(|e| {
651 error!("Failed to receive RuleEngine response: {}", e);
652 e
653 }).unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
654 }
655
656 pub fn update_policy(&self, policy: ProxyPolicy) {
657 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
658 }
659
660 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
661 let mut policy = self.policy_snapshot();
662 policy.apply_patch(patch);
663 self.update_policy_from(actor, target, policy);
664 }
665
666 pub fn policy_snapshot(&self) -> ProxyPolicy {
667 self.policy_tx.borrow().clone()
668 }
669
670 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
671 let details = json!({
672 "strict_http_semantics": policy.strict_http_semantics,
673 "request_timeout_ms": policy.request_timeout_ms,
674 "max_body_size": policy.max_body_size,
675 "transparent_enabled": policy.transparent_enabled,
676 "redaction_enabled": policy.redaction.enabled,
677 "redact_bodies": policy.redaction.redact_bodies
678 });
679
680 self.policy_tx.send_replace(policy);
681 self.record_audit_event(AuditEvent::new(
682 actor,
683 AuditEventKind::PolicyUpdated,
684 target,
685 AuditOutcome::Success,
686 details,
687 ));
688 }
689
690 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
691 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::RegisterIntercept { key, tx }).await {
692 error!("Failed to send RegisterIntercept request: {}", e);
693 }
694 }
695
696 pub async fn resolve_intercept(&self, key: String, result: InterceptionResult) -> Result<(), String> {
697 let (tx, rx) = oneshot::channel();
698 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::ResolveIntercept { key, result, respond_to: tx }).await {
699 error!("Failed to send ResolveIntercept request: {}", e);
700 return Err(e.to_string());
701 }
702 rx.await.map_err(|_| "Actor dropped".to_string())?
703 }
704
705 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
706 let (tx, rx) = oneshot::channel();
707 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingWebSocketMessage { key, respond_to: tx }).await {
708 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
709 return None;
710 }
711 rx.await.map_err(|e| {
712 error!("Failed to receive WebSocketMessage response: {}", e);
713 e
714 }).unwrap_or(None)
715 }
716
717 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
718 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message }).await {
719 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
720 }
721 }
722
723 pub async fn is_intercept_pending(&self, key: String) -> bool {
724 let (tx, rx) = oneshot::channel();
725 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingIntercept { key, respond_to: tx }).await {
726 error!("Failed to send GetPendingIntercept request: {}", e);
727 return false;
728 }
729 rx.await.map_err(|e| {
730 error!("Failed to receive PendingIntercept response: {}", e);
731 e
732 }).unwrap_or(false)
733 }
734
735 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
736 let (tx, rx) = oneshot::channel();
737 if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingInterceptByFlowId { flow_id, respond_to: tx }).await {
738 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
739 return false;
740 }
741 rx.await.map_err(|e| {
742 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
743 e
744 }).unwrap_or(false)
745 }
746
747 pub fn upsert_flow(&self, flow: Box<Flow>) {
748 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
749 error!("FlowStore dropped flow: {}", e);
751 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
752 }
753 }
754
755 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
756 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message }) {
757 error!("FlowStore dropped WS message: {}", e);
758 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
759 }
760 }
761
762 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
763 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody { flow_id, body, direction }) {
764 error!("FlowStore dropped HTTP body: {}", e);
765 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
766 }
767 }
768
769 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
771 self.flow_broadcast_tx.subscribe()
772 }
773
774 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
775 self.audit_broadcast_tx.subscribe()
776 }
777
778 fn current_redaction_policy(&self) -> RedactionPolicy {
779 self.policy_tx.borrow().redaction.clone()
780 }
781
782 pub fn record_flow_events_lagged(&self, skipped: u64) {
783 self.flow_events_lagged_total
784 .fetch_add(skipped as usize, Ordering::Relaxed);
785 }
786
787 pub fn record_audit_events_lagged(&self, skipped: u64) {
788 self.audit_events_lagged_total
789 .fetch_add(skipped as usize, Ordering::Relaxed);
790 }
791
792 pub fn lifecycle(&self) -> RuntimeLifecycle {
793 self.lifecycle_tx.borrow().clone()
794 }
795
796 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
797 self.lifecycle_tx.subscribe()
798 }
799
800 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
801 let current = self.lifecycle();
802 if current.is_active() {
803 return Err(format!(
804 "Proxy is already {} on port {}",
805 current.phase.as_str(),
806 current.port.unwrap_or(port)
807 ));
808 }
809
810 let mut guard = self.shutdown_tx.lock().map_err(|_| "shutdown state poisoned".to_string())?;
811 *guard = Some(shutdown_tx);
812 drop(guard);
813
814 self.update_lifecycle(RuntimeLifecycle {
815 phase: RuntimeLifecyclePhase::Starting,
816 port: Some(port),
817 started_at_ms: None,
818 last_error: None,
819 });
820 Ok(())
821 }
822
823 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
824 let mut guard = self.shutdown_tx.lock().map_err(|_| "shutdown state poisoned".to_string())?;
825 let Some(tx) = guard.take() else {
826 return Ok(ProxyStopResult::NotRunning);
827 };
828 drop(guard);
829
830 let current = self.lifecycle();
831 self.update_lifecycle(RuntimeLifecycle {
832 phase: RuntimeLifecyclePhase::Stopping,
833 port: current.port,
834 started_at_ms: current.started_at_ms,
835 last_error: current.last_error,
836 });
837 let _ = tx.send(());
838 Ok(ProxyStopResult::Stopping)
839 }
840
841 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
842 self.audit_history
843 .lock()
844 .map(|events| events.iter().cloned().collect())
845 .unwrap_or_default()
846 }
847
848 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
850 let redaction = self.current_redaction_policy();
851 if let Some(store) = &self.store {
852 return store
853 .query_flow_summaries(&query)
854 .await
855 .unwrap_or_default()
856 .into_iter()
857 .map(|summary| redact_flow_summary(summary, &redaction))
858 .collect();
859 }
860 let (tx, rx) = oneshot::channel();
861 if let Err(e) = self.flow_store.send(FlowStoreMessage::SearchFlows { query, respond_to: tx }).await {
862 error!("Failed to send SearchFlows request: {}", e);
863 return Vec::new();
864 }
865 rx.await
866 .unwrap_or_default()
867 .into_iter()
868 .map(|summary| redact_flow_summary(summary, &redaction))
869 .collect()
870 }
871
872 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
873 let redaction = self.current_redaction_policy();
874 redact_flow_update(update, &redaction)
875 }
876
877 pub async fn resolve_intercept_with_modifications(
887 &self,
888 key: String,
889 action: &str,
890 mods: Option<relay_core_api::modification::FlowModification>,
891 ) -> Result<(), String> {
892 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
893 .await
894 }
895
896 pub async fn resolve_intercept_with_modifications_from(
897 &self,
898 actor: AuditActor,
899 key: String,
900 action: &str,
901 mods: Option<relay_core_api::modification::FlowModification>,
902 ) -> Result<(), String> {
903 let modified_fields = modification_field_names(mods.as_ref());
904 let result = match action {
905 "drop" => InterceptionResult::Drop,
906 _ => match mods {
907 None => InterceptionResult::Continue,
908 Some(m) => {
909 let parts: Vec<&str> = key.splitn(4, ':').collect();
910 match parts.as_slice() {
911 [flow_id, phase] => {
912 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
913 modification::apply_flow_modification(&flow, phase, m)
914 } else {
915 InterceptionResult::Continue
916 }
917 }
918 [_, "ws_msg", _] => {
921 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
922 modification::apply_ws_modification(&msg, m)
923 } else {
924 InterceptionResult::Continue
925 }
926 }
927 _ => InterceptionResult::Continue,
928 }
929 }
930 },
931 };
932 let outcome = self.resolve_intercept(key.clone(), result).await;
933 let audit_outcome = if outcome.is_ok() {
934 AuditOutcome::Success
935 } else {
936 AuditOutcome::Failed
937 };
938 let error_message = outcome.as_ref().err().cloned();
939
940 self.record_audit_event(AuditEvent::new(
941 actor,
942 AuditEventKind::InterceptResolved,
943 key,
944 audit_outcome,
945 json!({
946 "action": action,
947 "has_modifications": !modified_fields.is_empty(),
948 "modified_fields": modified_fields,
949 "error": error_message
950 }),
951 ));
952 outcome
953 }
954
955 #[cfg(feature = "script")]
956 pub async fn load_script_from(
957 &self,
958 actor: AuditActor,
959 target: String,
960 script: &str,
961 ) -> Result<(), String> {
962 let result = self
963 .script_interceptor
964 .load_script(script)
965 .await
966 .map_err(|e| e.to_string());
967
968 let outcome = if result.is_ok() {
969 AuditOutcome::Success
970 } else {
971 AuditOutcome::Failed
972 };
973 let error_message = result.as_ref().err().cloned();
974
975 self.record_audit_event(AuditEvent::new(
976 actor,
977 AuditEventKind::ScriptReloaded,
978 target,
979 outcome,
980 json!({
981 "script_bytes": script.len(),
982 "error": error_message
983 }),
984 ));
985
986 result
987 }
988
989 fn record_audit_event(&self, event: AuditEvent) {
990 const AUDIT_HISTORY_LIMIT: usize = 200;
991 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
992 if event.outcome == AuditOutcome::Failed {
993 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
994 }
995
996 let details = event.details.to_string();
997 tracing::info!(
998 target: "relay_core_audit",
999 event_id = %event.id,
1000 actor = %event.actor.as_str(),
1001 kind = %event.kind.as_str(),
1002 target = %event.target,
1003 outcome = %event.outcome.as_str(),
1004 details = %details
1005 );
1006
1007 if let Ok(mut history) = self.audit_history.lock() {
1008 if history.len() >= AUDIT_HISTORY_LIMIT {
1009 history.pop_front();
1010 }
1011 history.push_back(event.clone());
1012 }
1013
1014 if let Some(store) = self.store.clone() {
1015 let event_json = serde_json::to_value(&event).unwrap_or_default();
1016 let event_id = event.id.clone();
1017 let timestamp_ms = event.timestamp_ms;
1018 let actor = event.actor.as_str().to_string();
1019 let kind = event.kind.as_str().to_string();
1020 let target = event.target.clone();
1021 let outcome = event.outcome.as_str().to_string();
1022 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1023 handle.spawn(async move {
1024 if let Err(e) = store
1025 .save_audit_event(AuditEventRecord {
1026 id: &event_id,
1027 timestamp_ms,
1028 actor: &actor,
1029 kind: &kind,
1030 target: &target,
1031 outcome: &outcome,
1032 content: &event_json,
1033 })
1034 .await
1035 {
1036 tracing::error!("Failed to persist audit event: {}", e);
1037 }
1038 });
1039 }
1040 }
1041
1042 let _ = self.audit_broadcast_tx.send(event);
1043 }
1044
1045 fn transition_to_running(&self, port: u16) {
1046 self.update_lifecycle(RuntimeLifecycle {
1047 phase: RuntimeLifecyclePhase::Running,
1048 port: Some(port),
1049 started_at_ms: Some(now_unix_ms()),
1050 last_error: None,
1051 });
1052 }
1053
1054 fn transition_to_stopped(&self) {
1055 if let Ok(mut guard) = self.shutdown_tx.lock() {
1056 *guard = None;
1057 }
1058
1059 self.update_lifecycle(RuntimeLifecycle {
1060 phase: RuntimeLifecyclePhase::Stopped,
1061 port: None,
1062 started_at_ms: None,
1063 last_error: None,
1064 });
1065 }
1066
1067 fn transition_to_failed(&self, port: u16, error: String) {
1068 if let Ok(mut guard) = self.shutdown_tx.lock() {
1069 *guard = None;
1070 }
1071
1072 self.update_lifecycle(RuntimeLifecycle {
1073 phase: RuntimeLifecyclePhase::Failed,
1074 port: Some(port),
1075 started_at_ms: None,
1076 last_error: Some(error),
1077 });
1078 }
1079
1080 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1081 tracing::info!(
1082 target: "relay_core_lifecycle",
1083 phase = %lifecycle.phase.as_str(),
1084 port = ?lifecycle.port,
1085 started_at_ms = ?lifecycle.started_at_ms,
1086 last_error = ?lifecycle.last_error
1087 );
1088 self.lifecycle_tx.send_replace(lifecycle);
1089 }
1090
1091 pub fn spawn_proxy(
1092 self: &Arc<Self>,
1093 config: ProxyConfig,
1094 sink: mpsc::Sender<FlowUpdate>,
1095 extra_interceptor: Option<Arc<dyn Interceptor>>,
1096 ) -> Result<ProxySpawnResult, String> {
1097 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1098 match self.prepare_start(config.port, shutdown_tx) {
1099 Ok(()) => {}
1100 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1101 Err(error) => return Err(error),
1102 }
1103 let state = self.clone();
1104 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1105 if let Err(error) = state
1106 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1107 .await
1108 {
1109 error!("Proxy failed: {}", error);
1110 }
1111 })))
1112 }
1113
1114 pub async fn start_proxy(
1115 self: &Arc<Self>,
1116 config: ProxyConfig,
1117 sink: mpsc::Sender<FlowUpdate>,
1118 extra_interceptor: Option<Arc<dyn Interceptor>>,
1119 ) -> Result<(), String> {
1120 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1121 self.prepare_start(config.port, shutdown_tx)?;
1122 self.run_proxy(config, sink, extra_interceptor, shutdown_rx).await
1123 }
1124
1125 async fn run_proxy(
1126 self: &Arc<Self>,
1127 config: ProxyConfig,
1128 sink: mpsc::Sender<FlowUpdate>,
1129 extra_interceptor: Option<Arc<dyn Interceptor>>,
1130 shutdown_rx: oneshot::Receiver<()>,
1131 ) -> Result<(), String> {
1132 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1133 let state = self.clone();
1134
1135 if let Some(parent) = config.ca_cert_path.parent()
1136 && !parent.exists() {
1137 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1138 }
1139
1140 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1141 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1142 let ca = Arc::new(ca);
1143
1144 #[cfg(feature = "script")]
1145 let script_interceptor = self.script_interceptor.clone();
1146
1147 #[cfg(feature = "script")]
1148 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1149
1150 #[cfg(not(feature = "script"))]
1151 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1152
1153 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(self.clone())));
1154
1155 if let Some(interceptor) = extra_interceptor {
1156 interceptors.push(interceptor);
1157 }
1158
1159 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1160 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1161
1162 tokio::spawn(async move {
1163 while let Some(update) = proxy_rx.recv().await {
1164 match update.clone() {
1165 FlowUpdate::Full(flow) => {
1166 state.upsert_flow(flow);
1167 }
1168 FlowUpdate::WebSocketMessage { flow_id, message } => {
1169 state.append_ws_message(flow_id, message);
1170 }
1171 FlowUpdate::HttpBody {
1172 flow_id,
1173 direction,
1174 body,
1175 } => {
1176 state.update_http_body(flow_id, body, direction);
1177 }
1178 }
1179
1180 let _ = state.flow_broadcast_tx.send(update.clone());
1181
1182 if sink.try_send(update).is_err() {
1183 relay_core_lib::metrics::inc_flows_dropped();
1184 }
1185 }
1186 });
1187
1188 if let Some(udp_port) = config.udp_tproxy_port {
1189 let udp_proxy_tx = proxy_tx.clone();
1190 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1191 tokio::spawn(async move {
1192 match tokio::net::UdpSocket::bind(udp_addr).await {
1193 Ok(socket) => {
1194 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1195 if let Err(e) = proxy.run(udp_proxy_tx).await {
1196 error!("UDP TPROXY failed: {}", e);
1197 }
1198 }
1199 Err(e) => {
1200 error!("Failed to bind UDP TPROXY socket: {}", e);
1201 }
1202 }
1203 });
1204 }
1205
1206 let listener = match TcpListener::bind(addr).await {
1207 Ok(listener) => listener,
1208 Err(e) => {
1209 if let Ok(mut guard) = self.shutdown_tx.lock() {
1210 *guard = None;
1211 }
1212 let message = format!("Failed to bind to address {}: {}", addr, e);
1213 self.transition_to_failed(config.port, message.clone());
1214 return Err(message);
1215 }
1216 };
1217
1218 self.transition_to_running(config.port);
1219 let shutdown_rx = Some(shutdown_rx);
1220
1221 if config.transparent {
1222 let policy = ProxyPolicy {
1223 transparent_enabled: true,
1224 ..Default::default()
1225 };
1226 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1227 let policy_rx = self.policy_tx.subscribe();
1228
1229 let provider: Arc<dyn OriginalDstProvider> = {
1230 let mut addrs = BTreeSet::new();
1231 if let Ok(local) = listener.local_addr() {
1232 addrs.insert(local);
1233 }
1234
1235 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1236 {
1237 Arc::new(LinuxOriginalDstProvider::new(addrs))
1238 }
1239 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1240 {
1241 match MacOsOriginalDstProvider::new(addrs.clone()) {
1242 Ok(provider) => Arc::new(provider),
1243 Err(e) => {
1244 error!("Failed to initialize macOS PF provider: {}", e);
1245 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1246 }
1247 }
1248 }
1249 #[cfg(target_os = "windows")]
1250 {
1251 let filter = "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)".to_string();
1252 let port = config.port;
1253 tokio::spawn(async move {
1254 relay_core_lib::capture::windows::start_windivert_capture(filter, port).await;
1255 });
1256
1257 Arc::new(WindowsOriginalDstProvider::new(addrs))
1258 }
1259 #[cfg(not(any(
1260 all(target_os = "linux", feature = "transparent-linux"),
1261 all(target_os = "macos", feature = "transparent-macos"),
1262 target_os = "windows"
1263 )))]
1264 {
1265 Arc::new(NoOpOriginalDstProvider::new(addrs))
1266 }
1267 };
1268
1269 let source = TransparentTcpCaptureSource::new(listener, provider);
1270 let result = relay_core_lib::start_proxy(
1271 source,
1272 proxy_tx,
1273 interceptor,
1274 ca,
1275 policy_rx,
1276 None,
1277 shutdown_rx,
1278 )
1279 .await
1280 .map_err(|e| e.to_string());
1281 if let Err(error) = &result {
1282 self.transition_to_failed(config.port, error.clone());
1283 } else {
1284 self.transition_to_stopped();
1285 }
1286 result
1287 } else {
1288 let source = TcpCaptureSource::new(listener);
1289 let policy = ProxyPolicy::default();
1290 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1291 let policy_rx = self.policy_tx.subscribe();
1292
1293 let result = relay_core_lib::start_proxy(
1294 source,
1295 proxy_tx,
1296 interceptor,
1297 ca,
1298 policy_rx,
1299 None,
1300 shutdown_rx,
1301 )
1302 .await
1303 .map_err(|e| e.to_string());
1304 if let Err(error) = &result {
1305 self.transition_to_failed(config.port, error.clone());
1306 } else {
1307 self.transition_to_stopped();
1308 }
1309 result
1310 }
1311 }
1312}
1313
1314fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1315 match update {
1316 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1317 FlowUpdate::WebSocketMessage { flow_id, mut message } => {
1318 message.content = redact_body(message.content, redaction);
1319 FlowUpdate::WebSocketMessage { flow_id, message }
1320 }
1321 FlowUpdate::HttpBody {
1322 flow_id,
1323 direction,
1324 body,
1325 } => FlowUpdate::HttpBody {
1326 flow_id,
1327 direction,
1328 body: redact_body(body, redaction),
1329 },
1330 }
1331}
1332
1333fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1334 if !redaction.enabled {
1335 return flow;
1336 }
1337 match &mut flow.layer {
1338 Layer::Http(http) => {
1339 redact_http_request(&mut http.request, redaction);
1340 if let Some(response) = &mut http.response {
1341 redact_headers(&mut response.headers, redaction);
1342 response.body = response.body.take().map(|body| redact_body(body, redaction));
1343 }
1344 }
1345 Layer::WebSocket(ws) => {
1346 redact_http_request(&mut ws.handshake_request, redaction);
1347 redact_headers(&mut ws.handshake_response.headers, redaction);
1348 ws.handshake_response.body = ws
1349 .handshake_response
1350 .body
1351 .take()
1352 .map(|body| redact_body(body, redaction));
1353 for message in &mut ws.messages {
1354 message.content = redact_body(message.content.clone(), redaction);
1355 }
1356 }
1357 _ => {}
1358 }
1359 flow
1360}
1361
1362fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1363 if !redaction.enabled {
1364 return summary;
1365 }
1366 summary.url = redact_url_string(&summary.url, redaction);
1367 summary
1368}
1369
1370fn redact_http_request(request: &mut relay_core_api::flow::HttpRequest, redaction: &RedactionPolicy) {
1371 redact_headers(&mut request.headers, redaction);
1372 redact_query_pairs(&mut request.query, redaction);
1373 request.url = redact_url(&request.url, redaction);
1374 request.body = request.body.take().map(|body| redact_body(body, redaction));
1375}
1376
1377fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1378 if !redaction.enabled {
1379 return;
1380 }
1381 let sensitive = redaction_set(&redaction.sensitive_header_names);
1382 for (name, value) in headers.iter_mut() {
1383 if sensitive.contains(&name.to_ascii_lowercase()) {
1384 *value = "[REDACTED]".to_string();
1385 }
1386 }
1387}
1388
1389fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1390 if !redaction.enabled {
1391 return;
1392 }
1393 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1394 for (name, value) in query.iter_mut() {
1395 if sensitive.contains(&name.to_ascii_lowercase()) {
1396 *value = "[REDACTED]".to_string();
1397 }
1398 }
1399}
1400
1401fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1402 if !redaction.enabled {
1403 return url.clone();
1404 }
1405 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1406 let pairs: Vec<(String, String)> = url
1407 .query_pairs()
1408 .map(|(k, v)| {
1409 let key = k.to_string();
1410 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1411 "[REDACTED]".to_string()
1412 } else {
1413 v.to_string()
1414 };
1415 (key, value)
1416 })
1417 .collect();
1418 let mut next = url.clone();
1419 if pairs.is_empty() {
1420 return next;
1421 }
1422 next.query_pairs_mut().clear();
1423 for (k, v) in pairs {
1424 next.query_pairs_mut().append_pair(&k, &v);
1425 }
1426 next
1427}
1428
1429fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1430 match url::Url::parse(input) {
1431 Ok(url) => redact_url(&url, redaction).to_string(),
1432 Err(_) => input.to_string(),
1433 }
1434}
1435
1436fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1437 if redaction.enabled && redaction.redact_bodies {
1438 body.content = "[REDACTED]".to_string();
1439 }
1440 body
1441}
1442
1443fn redaction_set(values: &[String]) -> HashSet<String> {
1444 values
1445 .iter()
1446 .map(|value| value.to_ascii_lowercase())
1447 .collect()
1448}
1449
1450#[derive(Clone)]
1451pub struct ProxyConfig {
1452 pub port: u16,
1453 pub ca_cert_path: std::path::PathBuf,
1454 pub ca_key_path: std::path::PathBuf,
1455 pub transparent: bool,
1456 pub udp_tproxy_port: Option<u16>,
1457}
1458
1459impl ProxyConfig {
1460 pub fn new(
1461 port: u16,
1462 ca_cert_path: std::path::PathBuf,
1463 ca_key_path: std::path::PathBuf,
1464 ) -> Self {
1465 Self {
1466 port,
1467 ca_cert_path,
1468 ca_key_path,
1469 transparent: false,
1470 udp_tproxy_port: None,
1471 }
1472 }
1473
1474 pub fn from_app_data_dir(
1475 app_data_dir: impl Into<std::path::PathBuf>,
1476 port: u16,
1477 ) -> Result<Self, String> {
1478 let app_data_dir = app_data_dir.into();
1479 if !app_data_dir.exists() {
1480 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1481 }
1482
1483 Ok(Self::new(
1484 port,
1485 app_data_dir.join("ca_cert.pem"),
1486 app_data_dir.join("ca_key.pem"),
1487 ))
1488 }
1489
1490 pub fn with_transparent(mut self, transparent: bool) -> Self {
1491 self.transparent = transparent;
1492 self
1493 }
1494
1495 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1496 self.udp_tproxy_port = udp_tproxy_port;
1497 self
1498 }
1499}
1500
1501fn now_unix_ms() -> u64 {
1502 SystemTime::now()
1503 .duration_since(UNIX_EPOCH)
1504 .unwrap_or_default()
1505 .as_millis() as u64
1506}
1507
1508fn modification_field_names(
1509 mods: Option<&relay_core_api::modification::FlowModification>,
1510) -> Vec<&'static str> {
1511 let Some(mods) = mods else {
1512 return Vec::new();
1513 };
1514
1515 let mut fields = Vec::new();
1516 if mods.method.is_some() {
1517 fields.push("method");
1518 }
1519 if mods.url.is_some() {
1520 fields.push("url");
1521 }
1522 if mods.request_headers.is_some() {
1523 fields.push("request_headers");
1524 }
1525 if mods.request_body.is_some() {
1526 fields.push("request_body");
1527 }
1528 if mods.status_code.is_some() {
1529 fields.push("status_code");
1530 }
1531 if mods.response_headers.is_some() {
1532 fields.push("response_headers");
1533 }
1534 if mods.response_body.is_some() {
1535 fields.push("response_body");
1536 }
1537 if mods.message_content.is_some() {
1538 fields.push("message_content");
1539 }
1540 fields
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use super::*;
1546 use chrono::Utc;
1547 use relay_core_api::flow::{
1548 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1549 ResponseTiming, TransportProtocol,
1550 };
1551 use std::sync::atomic::{AtomicU64, Ordering};
1552 use tokio::time::{Duration, sleep};
1553 use url::Url;
1554 use uuid::Uuid;
1555
1556 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1557
1558 fn sqlite_url() -> String {
1559 let nanos = SystemTime::now()
1560 .duration_since(UNIX_EPOCH)
1561 .expect("clock drift")
1562 .as_nanos();
1563 let pid = std::process::id();
1564 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1565 let db_dir = std::env::current_dir()
1566 .expect("cwd")
1567 .join("target")
1568 .join("test-dbs");
1569 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1570 let db_path = db_dir.join(format!("relay-core-runtime-test-{}-{}-{}.db", pid, nanos, seq));
1571 format!("sqlite://{}?mode=rwc", db_path.display())
1572 }
1573
1574 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1575 let start_time = chrono::DateTime::<Utc>::from_timestamp_millis(ts)
1576 .expect("timestamp should be valid");
1577 let request_url =
1578 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1579 Flow {
1580 id: Uuid::new_v4(),
1581 start_time,
1582 end_time: Some(start_time),
1583 network: NetworkInfo {
1584 client_ip: "127.0.0.1".to_string(),
1585 client_port: 12000,
1586 server_ip: "127.0.0.1".to_string(),
1587 server_port: 8080,
1588 protocol: TransportProtocol::TCP,
1589 tls: false,
1590 tls_version: None,
1591 sni: None,
1592 },
1593 layer: Layer::Http(HttpLayer {
1594 request: HttpRequest {
1595 method: method.to_string(),
1596 url: request_url,
1597 version: "HTTP/1.1".to_string(),
1598 headers: vec![],
1599 cookies: vec![],
1600 query: vec![],
1601 body: None,
1602 },
1603 response: Some(HttpResponse {
1604 status,
1605 status_text: "OK".to_string(),
1606 version: "HTTP/1.1".to_string(),
1607 headers: vec![],
1608 cookies: vec![],
1609 body: None,
1610 timing: ResponseTiming {
1611 time_to_first_byte: None,
1612 time_to_last_byte: None,
1613 },
1614 }),
1615 error: None,
1616 }),
1617 tags: vec![],
1618 meta: std::collections::HashMap::new(),
1619 }
1620 }
1621
1622 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1623 let start_time = chrono::DateTime::<Utc>::from_timestamp_millis(ts)
1624 .expect("timestamp should be valid");
1625 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1626 .expect("url should parse");
1627 Flow {
1628 id: Uuid::new_v4(),
1629 start_time,
1630 end_time: Some(start_time),
1631 network: NetworkInfo {
1632 client_ip: "127.0.0.1".to_string(),
1633 client_port: 12000,
1634 server_ip: "127.0.0.1".to_string(),
1635 server_port: 8080,
1636 protocol: TransportProtocol::TCP,
1637 tls: false,
1638 tls_version: None,
1639 sni: None,
1640 },
1641 layer: Layer::Http(HttpLayer {
1642 request: HttpRequest {
1643 method: "GET".to_string(),
1644 url: request_url,
1645 version: "HTTP/1.1".to_string(),
1646 headers: vec![
1647 ("Authorization".to_string(), "Bearer secret-token".to_string()),
1648 ("X-Normal".to_string(), "visible".to_string()),
1649 ],
1650 cookies: vec![],
1651 query: vec![
1652 ("token".to_string(), "abc123".to_string()),
1653 ("ok".to_string(), "1".to_string()),
1654 ],
1655 body: Some(BodyData {
1656 encoding: "utf-8".to_string(),
1657 content: "secret request body".to_string(),
1658 size: 19,
1659 }),
1660 },
1661 response: Some(HttpResponse {
1662 status: 200,
1663 status_text: "OK".to_string(),
1664 version: "HTTP/1.1".to_string(),
1665 headers: vec![
1666 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1667 ("X-Response".to_string(), "visible".to_string()),
1668 ],
1669 cookies: vec![],
1670 body: Some(BodyData {
1671 encoding: "utf-8".to_string(),
1672 content: "secret response body".to_string(),
1673 size: 20,
1674 }),
1675 timing: ResponseTiming {
1676 time_to_first_byte: None,
1677 time_to_last_byte: None,
1678 },
1679 }),
1680 error: None,
1681 }),
1682 tags: vec![],
1683 meta: std::collections::HashMap::new(),
1684 }
1685 }
1686
1687 #[tokio::test]
1688 async fn set_rules_from_records_audit_event() {
1689 let state = CoreState::new(None).await;
1690
1691 state
1692 .set_rules_from(
1693 AuditActor::Http,
1694 "rule.upsert",
1695 "rule-1".to_string(),
1696 json!({ "route": "/api/v1/rules" }),
1697 Vec::new(),
1698 )
1699 .await
1700 .expect("set rules should succeed");
1701
1702 let events = state.recent_audit_events();
1703 let event = events.last().expect("audit event should exist");
1704 assert_eq!(event.actor, AuditActor::Http);
1705 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1706 assert_eq!(event.outcome, AuditOutcome::Success);
1707 assert_eq!(event.target, "rule-1");
1708 assert_eq!(event.details["operation"], "rule.upsert");
1709 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1710 }
1711
1712 #[tokio::test]
1713 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1714 let state = CoreState::new(None).await;
1715
1716 state
1717 .upsert_rule_from(
1718 AuditActor::Probe,
1719 "rule.upsert",
1720 "rule-1".to_string(),
1721 json!({ "tool": "set_rule" }),
1722 Rule {
1723 id: "rule-1".to_string(),
1724 name: "first".to_string(),
1725 active: true,
1726 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1727 priority: 1,
1728 termination: relay_core_lib::rule::RuleTermination::Continue,
1729 filter: relay_core_lib::rule::Filter::Url(
1730 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1731 ),
1732 actions: vec![],
1733 constraints: None,
1734 },
1735 )
1736 .await
1737 .expect("initial upsert should succeed");
1738
1739 state
1740 .upsert_rule_from(
1741 AuditActor::Probe,
1742 "rule.upsert",
1743 "rule-1".to_string(),
1744 json!({ "tool": "set_rule" }),
1745 Rule {
1746 id: "rule-1".to_string(),
1747 name: "second".to_string(),
1748 active: true,
1749 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1750 priority: 2,
1751 termination: relay_core_lib::rule::RuleTermination::Continue,
1752 filter: relay_core_lib::rule::Filter::Url(
1753 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1754 ),
1755 actions: vec![],
1756 constraints: None,
1757 },
1758 )
1759 .await
1760 .expect("replacement upsert should succeed");
1761
1762 let rules = state.get_rules().await;
1763 assert_eq!(rules.len(), 1);
1764 assert_eq!(rules[0].name, "second");
1765 let event = state.recent_audit_events().last().cloned().expect("audit event");
1766 assert_eq!(event.details["operation"], "rule.upsert");
1767 assert_eq!(event.details["details"]["tool"], "set_rule");
1768 }
1769
1770 #[tokio::test]
1771 async fn delete_rule_from_returns_false_when_rule_missing() {
1772 let state = CoreState::new(None).await;
1773
1774 let deleted = state
1775 .delete_rule_from(
1776 AuditActor::Http,
1777 "rule.delete",
1778 "missing".to_string(),
1779 json!({ "route": "/api/v1/rules/{id}" }),
1780 "missing",
1781 )
1782 .await
1783 .expect("delete should not fail");
1784
1785 assert!(!deleted);
1786 assert!(state.recent_audit_events().is_empty());
1787 }
1788
1789 #[tokio::test]
1790 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1791 let state = CoreState::new(None).await;
1792
1793 let rule_id = state
1794 .create_mock_response_rule_from(
1795 AuditActor::Http,
1796 "api-mock-1".to_string(),
1797 json!({ "route": "/api/v1/mock", "status": 201 }),
1798 MockResponseRuleConfig {
1799 rule_id: "api-mock-1".to_string(),
1800 url_pattern: "example.com".to_string(),
1801 name: "api-mock:example.com".to_string(),
1802 status: 201,
1803 content_type: "application/json".to_string(),
1804 body: "{\"ok\":true}".to_string(),
1805 },
1806 )
1807 .await
1808 .expect("mock rule should be created");
1809
1810 assert_eq!(rule_id, "api-mock-1");
1811 let rules = state.get_rules().await;
1812 assert_eq!(rules.len(), 1);
1813 assert_eq!(rules[0].id, "api-mock-1");
1814 let event = state.recent_audit_events().last().cloned().expect("audit event");
1815 assert_eq!(event.details["operation"], "rule.mock_create");
1816 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1817 }
1818
1819 #[tokio::test]
1820 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1821 let state = CoreState::new(None).await;
1822
1823 let rule_id = state
1824 .create_intercept_rule_from(
1825 AuditActor::Http,
1826 "intercept-1".to_string(),
1827 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1828 InterceptRuleConfig {
1829 rule_id: "intercept-1".to_string(),
1830 active: true,
1831 url_pattern: "example.com".to_string(),
1832 method: None,
1833 phase: "request".to_string(),
1834 name: "api-intercept:example.com".to_string(),
1835 priority: 100,
1836 termination: relay_core_lib::rule::RuleTermination::Stop,
1837 },
1838 )
1839 .await
1840 .expect("intercept rule should be created");
1841
1842 assert_eq!(rule_id, "intercept-1");
1843 let rules = state.get_rules().await;
1844 assert_eq!(rules.len(), 1);
1845 assert_eq!(rules[0].id, "intercept-1");
1846 assert_eq!(rules[0].name, "api-intercept:example.com");
1847 let event = state.recent_audit_events().last().cloned().expect("audit event");
1848 assert_eq!(event.details["operation"], "rule.intercept_create");
1849 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
1850 }
1851
1852 #[tokio::test]
1853 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
1854 let state = CoreState::new(None).await;
1855
1856 state
1857 .upsert_legacy_intercept_rule_from(
1858 AuditActor::Tauri,
1859 "legacy-1".to_string(),
1860 json!({ "command": "set_intercept_rule" }),
1861 InterceptRule {
1862 id: "legacy-1".to_string(),
1863 active: true,
1864 url_pattern: "example.com".to_string(),
1865 method: None,
1866 phase: "both".to_string(),
1867 },
1868 )
1869 .await
1870 .expect("initial family upsert should succeed");
1871 assert_eq!(state.get_rules().await.len(), 2);
1872
1873 state
1874 .upsert_legacy_intercept_rule_from(
1875 AuditActor::Tauri,
1876 "legacy-1".to_string(),
1877 json!({ "command": "set_intercept_rule" }),
1878 InterceptRule {
1879 id: "legacy-1".to_string(),
1880 active: true,
1881 url_pattern: "example.org".to_string(),
1882 method: Some("POST".to_string()),
1883 phase: "request".to_string(),
1884 },
1885 )
1886 .await
1887 .expect("replacement family upsert should succeed");
1888
1889 let rules = state.get_rules().await;
1890 assert_eq!(rules.len(), 1);
1891 assert_eq!(rules[0].id, "legacy-1");
1892 let event = state.recent_audit_events().last().cloned().expect("audit event");
1893 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
1894 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
1895 }
1896
1897 #[tokio::test]
1898 async fn resolve_intercept_failure_records_failed_audit_event() {
1899 let state = CoreState::new(None).await;
1900
1901 let result = state
1902 .resolve_intercept_with_modifications_from(
1903 AuditActor::Probe,
1904 "missing-flow:request".to_string(),
1905 "drop",
1906 None,
1907 )
1908 .await;
1909
1910 assert!(result.is_err());
1911 let events = state.recent_audit_events();
1912 let event = events.last().expect("audit event should exist");
1913 assert_eq!(event.actor, AuditActor::Probe);
1914 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
1915 assert_eq!(event.outcome, AuditOutcome::Failed);
1916 assert_eq!(event.details["action"], "drop");
1917 assert!(event.details["error"].as_str().unwrap_or_default().contains("Interception not found"));
1918 }
1919
1920 #[tokio::test]
1921 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
1922 let state = CoreState::new(None).await;
1923 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1924
1925 state
1926 .prepare_start(8080, shutdown_tx)
1927 .expect("prepare start should succeed");
1928 let lifecycle = state.lifecycle();
1929 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
1930 assert_eq!(lifecycle.port, Some(8080));
1931 assert!(lifecycle.started_at_ms.is_none());
1932 assert!(lifecycle.last_error.is_none());
1933
1934 assert_eq!(
1935 state.stop_proxy().expect("stop should succeed"),
1936 ProxyStopResult::Stopping
1937 );
1938 let lifecycle = state.lifecycle();
1939 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
1940 assert_eq!(lifecycle.port, Some(8080));
1941 assert!(shutdown_rx.await.is_ok());
1942 }
1943
1944 #[tokio::test]
1945 async fn status_snapshot_derives_runtime_facing_fields() {
1946 let state = CoreState::new(None).await;
1947 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
1948 state
1949 .prepare_start(8080, shutdown_tx)
1950 .expect("prepare start should succeed");
1951
1952 let status = state.status_snapshot();
1953 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
1954 assert!(status.running);
1955 assert_eq!(status.port, Some(8080));
1956 assert!(status.uptime.is_none());
1957 assert!(status.last_error.is_none());
1958 }
1959
1960 #[tokio::test]
1961 async fn status_report_combines_status_and_metrics() {
1962 let state = CoreState::new(None).await;
1963 let report = state.status_report().await;
1964
1965 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
1966 assert!(!report.status.running);
1967 assert_eq!(report.metrics.intercepts_pending, 0);
1968 assert_eq!(report.metrics.ws_pending_messages, 0);
1969 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
1970 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
1971 assert_eq!(report.metrics.audit_events_total, 0);
1972 assert_eq!(report.metrics.audit_events_failed, 0);
1973 assert_eq!(report.metrics.flow_events_lagged_total, 0);
1974 assert_eq!(report.metrics.audit_events_lagged_total, 0);
1975 }
1976
1977 #[test]
1978 fn proxy_config_new_and_transport_setters_preserve_values() {
1979 let config = ProxyConfig::new(
1980 8080,
1981 std::path::PathBuf::from("/tmp/ca_cert.pem"),
1982 std::path::PathBuf::from("/tmp/ca_key.pem"),
1983 )
1984 .with_transparent(true)
1985 .with_udp_tproxy_port(Some(15000));
1986
1987 assert_eq!(config.port, 8080);
1988 assert_eq!(config.ca_cert_path, std::path::PathBuf::from("/tmp/ca_cert.pem"));
1989 assert_eq!(config.ca_key_path, std::path::PathBuf::from("/tmp/ca_key.pem"));
1990 assert!(config.transparent);
1991 assert_eq!(config.udp_tproxy_port, Some(15000));
1992 }
1993
1994 #[test]
1995 fn proxy_config_from_app_data_dir_creates_default_paths() {
1996 let unique = SystemTime::now()
1997 .duration_since(UNIX_EPOCH)
1998 .expect("clock drift")
1999 .as_nanos();
2000 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2001
2002 let config = ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2003
2004 assert!(dir.exists());
2005 assert_eq!(config.port, 8899);
2006 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2007 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2008 assert!(!config.transparent);
2009 assert!(config.udp_tproxy_port.is_none());
2010 }
2011
2012 #[tokio::test]
2013 async fn intercept_snapshot_maps_pending_counts() {
2014 let state = CoreState::new(None).await;
2015 let snapshot = state.intercept_snapshot().await;
2016
2017 assert_eq!(snapshot.pending_count, 0);
2018 assert_eq!(snapshot.ws_pending_count, 0);
2019 }
2020
2021 #[tokio::test]
2022 async fn audit_snapshot_returns_latest_events_in_order() {
2023 let state = CoreState::new(None).await;
2024 state.record_audit_event(AuditEvent::new(
2025 AuditActor::Runtime,
2026 AuditEventKind::RuleChanged,
2027 "first",
2028 AuditOutcome::Success,
2029 json!({ "index": 1 }),
2030 ));
2031 state.record_audit_event(AuditEvent::new(
2032 AuditActor::Http,
2033 AuditEventKind::PolicyUpdated,
2034 "second",
2035 AuditOutcome::Success,
2036 json!({ "index": 2 }),
2037 ));
2038
2039 let snapshot = state.audit_snapshot(1);
2040
2041 assert_eq!(snapshot.events.len(), 1);
2042 assert_eq!(snapshot.events[0].target, "second");
2043 assert_eq!(snapshot.events[0].details["index"], 2);
2044 }
2045
2046 #[tokio::test]
2047 async fn query_audit_snapshot_filters_in_memory_events() {
2048 let state = CoreState::new(None).await;
2049 state.record_audit_event(AuditEvent::new(
2050 AuditActor::Http,
2051 AuditEventKind::RuleChanged,
2052 "rule-1",
2053 AuditOutcome::Success,
2054 json!({ "idx": 1 }),
2055 ));
2056 state.record_audit_event(AuditEvent::new(
2057 AuditActor::Probe,
2058 AuditEventKind::PolicyUpdated,
2059 "policy",
2060 AuditOutcome::Failed,
2061 json!({ "idx": 2 }),
2062 ));
2063
2064 let snapshot = state
2065 .query_audit_snapshot(CoreAuditQuery {
2066 actor: Some(AuditActor::Probe),
2067 kind: Some(AuditEventKind::PolicyUpdated),
2068 outcome: Some(AuditOutcome::Failed),
2069 limit: 10,
2070 ..Default::default()
2071 })
2072 .await;
2073
2074 assert_eq!(snapshot.events.len(), 1);
2075 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2076 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2077 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2078 }
2079
2080 #[tokio::test]
2081 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2082 let state = CoreState::new(Some(sqlite_url())).await;
2083 state.update_policy_from(
2084 AuditActor::Http,
2085 "policy".to_string(),
2086 ProxyPolicy {
2087 transparent_enabled: true,
2088 ..Default::default()
2089 },
2090 );
2091
2092 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2093 for _ in 0..10 {
2094 snapshot = state
2095 .query_audit_snapshot(CoreAuditQuery {
2096 actor: Some(AuditActor::Http),
2097 kind: Some(AuditEventKind::PolicyUpdated),
2098 limit: 10,
2099 ..Default::default()
2100 })
2101 .await;
2102 if !snapshot.events.is_empty() {
2103 break;
2104 }
2105 sleep(Duration::from_millis(20)).await;
2106 }
2107
2108 assert!(!snapshot.events.is_empty());
2109 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2110 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2111 }
2112
2113 #[tokio::test]
2114 async fn prepare_start_rejects_second_active_start() {
2115 let state = CoreState::new(None).await;
2116 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2117 state
2118 .prepare_start(8080, shutdown_tx)
2119 .expect("first start should succeed");
2120
2121 let (second_tx, _second_rx) = oneshot::channel();
2122 let error = state
2123 .prepare_start(8081, second_tx)
2124 .expect_err("second active start should be rejected");
2125 assert!(error.contains("already"));
2126 }
2127
2128 #[test]
2129 fn update_policy_records_audit_event() {
2130 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2131 let state = runtime.block_on(CoreState::new(None));
2132
2133 state.update_policy_from(
2134 AuditActor::Runtime,
2135 "policy".to_string(),
2136 ProxyPolicy {
2137 transparent_enabled: true,
2138 ..Default::default()
2139 },
2140 );
2141
2142 let events = state.recent_audit_events();
2143 let event = events.last().expect("audit event should exist");
2144 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2145 assert_eq!(event.outcome, AuditOutcome::Success);
2146 assert_eq!(event.details["transparent_enabled"], true);
2147 }
2148
2149 #[test]
2150 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2151 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2152 let state = runtime.block_on(CoreState::new(None));
2153 let original_timeout = state.policy_snapshot().request_timeout_ms;
2154
2155 state.patch_policy_from(
2156 AuditActor::Runtime,
2157 "policy.patch".to_string(),
2158 relay_core_api::policy::ProxyPolicyPatch {
2159 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2160 enabled: Some(true),
2161 redact_bodies: Some(true),
2162 ..Default::default()
2163 }),
2164 },
2165 );
2166
2167 let policy = state.policy_snapshot();
2168 assert_eq!(policy.request_timeout_ms, original_timeout);
2169 assert!(policy.redaction.enabled);
2170 assert!(policy.redaction.redact_bodies);
2171
2172 let events = state.recent_audit_events();
2173 let event = events.last().expect("audit event should exist");
2174 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2175 assert_eq!(event.details["redaction_enabled"], true);
2176 }
2177
2178 #[tokio::test]
2179 async fn metrics_include_audit_and_lagged_event_counters() {
2180 let state = CoreState::new(None).await;
2181
2182 state.update_policy_from(
2183 AuditActor::Runtime,
2184 "policy".to_string(),
2185 ProxyPolicy::default(),
2186 );
2187 let _ = state
2188 .resolve_intercept_with_modifications_from(
2189 AuditActor::Probe,
2190 "missing-flow:request".to_string(),
2191 "drop",
2192 None,
2193 )
2194 .await;
2195
2196 state.record_flow_events_lagged(3);
2197 state.record_audit_events_lagged(5);
2198
2199 let metrics = state.get_metrics().await;
2200 assert_eq!(metrics.audit_events_total, 2);
2201 assert_eq!(metrics.audit_events_failed, 1);
2202 assert_eq!(metrics.flow_events_lagged_total, 3);
2203 assert_eq!(metrics.audit_events_lagged_total, 5);
2204 }
2205
2206 #[tokio::test]
2207 async fn prometheus_metrics_text_contains_observability_fields() {
2208 let state = CoreState::new(None).await;
2209 state.record_flow_events_lagged(2);
2210 state.record_audit_events_lagged(4);
2211
2212 let text = state.get_metrics_prometheus_text().await;
2213 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2214 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2215 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2216 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2217 }
2218
2219 #[tokio::test]
2220 async fn search_flows_uses_store_with_offset_pagination() {
2221 let state = CoreState::new(Some(sqlite_url())).await;
2222 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2223 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2224 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2225
2226 state.upsert_flow(Box::new(flow_a));
2227 state.upsert_flow(Box::new(flow_b));
2228 state.upsert_flow(Box::new(flow_c));
2229
2230 let mut baseline = Vec::new();
2231 for _ in 0..20 {
2232 baseline = state
2233 .search_flows(FlowQuery {
2234 host: Some("api.example.com".to_string()),
2235 path_contains: None,
2236 method: None,
2237 status_min: None,
2238 status_max: None,
2239 has_error: None,
2240 is_websocket: None,
2241 limit: Some(3),
2242 offset: Some(0),
2243 })
2244 .await;
2245 if baseline.len() == 3 {
2246 break;
2247 }
2248 sleep(Duration::from_millis(20)).await;
2249 }
2250
2251 assert_eq!(baseline.len(), 3);
2252 let page = state
2253 .search_flows(FlowQuery {
2254 host: Some("api.example.com".to_string()),
2255 path_contains: None,
2256 method: None,
2257 status_min: None,
2258 status_max: None,
2259 has_error: None,
2260 is_websocket: None,
2261 limit: Some(1),
2262 offset: Some(1),
2263 })
2264 .await;
2265 assert_eq!(page.len(), 1);
2266 assert_eq!(page[0].id, baseline[1].id);
2267 }
2268
2269 #[tokio::test]
2270 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2271 let state = CoreState::new(Some(sqlite_url())).await;
2272 let first_flow = sample_http_flow("persist.example.com", "/first", "GET", 200, 1_700_000_010_000);
2273 let first_id = first_flow.id.to_string();
2274 state.upsert_flow(Box::new(first_flow));
2275 for i in 0..240 {
2276 state.upsert_flow(Box::new(sample_http_flow(
2277 "persist.example.com",
2278 &format!("/{}", i),
2279 "GET",
2280 200,
2281 1_700_000_020_000 + i,
2282 )));
2283 }
2284
2285 sleep(Duration::from_millis(200)).await;
2286
2287 let loaded = state.get_flow(first_id).await;
2288 assert!(loaded.is_some());
2289 }
2290
2291 #[tokio::test]
2292 async fn search_flows_redacts_summary_url_when_enabled() {
2293 let state = CoreState::new(None).await;
2294 state.update_policy_from(
2295 AuditActor::Runtime,
2296 "policy.redaction".to_string(),
2297 ProxyPolicy {
2298 redaction: RedactionPolicy {
2299 enabled: true,
2300 sensitive_query_keys: vec!["token".to_string()],
2301 redact_bodies: false,
2302 ..Default::default()
2303 },
2304 ..Default::default()
2305 },
2306 );
2307 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2308
2309 let mut items = Vec::new();
2310 for _ in 0..20 {
2311 items = state
2312 .search_flows(FlowQuery {
2313 host: Some("api.example.com".to_string()),
2314 path_contains: Some("/private".to_string()),
2315 ..Default::default()
2316 })
2317 .await;
2318 if !items.is_empty() {
2319 break;
2320 }
2321 sleep(Duration::from_millis(20)).await;
2322 }
2323
2324 assert!(!items.is_empty());
2325 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2326 let token = redacted
2327 .query_pairs()
2328 .find(|(k, _)| k == "token")
2329 .map(|(_, v)| v.to_string());
2330 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2331 }
2332
2333 #[tokio::test]
2334 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2335 let state = CoreState::new(Some(sqlite_url())).await;
2336 state.update_policy_from(
2337 AuditActor::Runtime,
2338 "policy.redaction".to_string(),
2339 ProxyPolicy {
2340 redaction: RedactionPolicy {
2341 enabled: true,
2342 sensitive_header_names: vec![
2343 "authorization".to_string(),
2344 "set-cookie".to_string(),
2345 ],
2346 sensitive_query_keys: vec!["token".to_string()],
2347 redact_bodies: true,
2348 },
2349 ..Default::default()
2350 },
2351 );
2352
2353 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2354 let flow_id = flow.id.to_string();
2355 state.upsert_flow(Box::new(flow));
2356 sleep(Duration::from_millis(80)).await;
2357
2358 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2359 let Layer::Http(http) = loaded.layer else {
2360 panic!("expected http layer");
2361 };
2362
2363 let auth = http
2364 .request
2365 .headers
2366 .iter()
2367 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2368 .map(|(_, v)| v.as_str());
2369 assert_eq!(auth, Some("[REDACTED]"));
2370
2371 let req_query_token = http
2372 .request
2373 .query
2374 .iter()
2375 .find(|(k, _)| k == "token")
2376 .map(|(_, v)| v.as_str());
2377 assert_eq!(req_query_token, Some("[REDACTED]"));
2378
2379 let req_body = http
2380 .request
2381 .body
2382 .as_ref()
2383 .map(|b| b.content.as_str());
2384 assert_eq!(req_body, Some("[REDACTED]"));
2385
2386 let response = http.response.expect("response should exist");
2387 let set_cookie = response
2388 .headers
2389 .iter()
2390 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2391 .map(|(_, v)| v.as_str());
2392 assert_eq!(set_cookie, Some("[REDACTED]"));
2393 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2394 assert_eq!(res_body, Some("[REDACTED]"));
2395 }
2396
2397 #[test]
2398 fn redact_flow_update_masks_http_body_when_enabled() {
2399 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2400 let state = runtime.block_on(CoreState::new(None));
2401 state.update_policy_from(
2402 AuditActor::Runtime,
2403 "policy.redaction".to_string(),
2404 ProxyPolicy {
2405 redaction: RedactionPolicy {
2406 enabled: true,
2407 redact_bodies: true,
2408 ..Default::default()
2409 },
2410 ..Default::default()
2411 },
2412 );
2413
2414 let update = FlowUpdate::HttpBody {
2415 flow_id: "f-1".to_string(),
2416 direction: relay_core_api::flow::Direction::ClientToServer,
2417 body: BodyData {
2418 encoding: "utf-8".to_string(),
2419 content: "super-secret".to_string(),
2420 size: 12,
2421 },
2422 };
2423 let redacted = state.redact_flow_update_for_output(update);
2424 match redacted {
2425 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2426 _ => panic!("expected http body update"),
2427 }
2428 }
2429
2430 #[cfg(feature = "script")]
2431 #[tokio::test]
2432 async fn load_script_from_records_audit_event() {
2433 let state = CoreState::new(None).await;
2434
2435 state
2436 .load_script_from(
2437 AuditActor::Tauri,
2438 "tauri.load_script".to_string(),
2439 "globalThis.onRequestHeaders = (_flow) => {};",
2440 )
2441 .await
2442 .expect("script should load");
2443
2444 let events = state.recent_audit_events();
2445 let event = events.last().expect("audit event should exist");
2446 assert_eq!(event.actor, AuditActor::Tauri);
2447 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2448 assert_eq!(event.outcome, AuditOutcome::Success);
2449 assert_eq!(event.target, "tauri.load_script");
2450 }
2451}