1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77 pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82 pub flows_total: usize,
83 pub flows_in_memory: usize,
84 pub flows_dropped: usize,
85 pub intercepts_pending: usize,
86 pub ws_pending_messages: usize,
87 pub oldest_intercept_age_ms: Option<u64>,
88 pub oldest_ws_message_age_ms: Option<u64>,
89 pub rule_exec_errors: usize,
90 pub audit_events_total: usize,
91 pub audit_events_failed: usize,
92 pub flow_events_lagged_total: usize,
93 pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97 pub fn to_prometheus_text(&self) -> String {
98 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100 format!(
101 "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113 self.flows_total,
114 self.flows_in_memory,
115 self.flows_dropped,
116 self.intercepts_pending,
117 self.ws_pending_messages,
118 oldest_intercept_age_ms,
119 oldest_ws_message_age_ms,
120 self.rule_exec_errors,
121 self.audit_events_total,
122 self.audit_events_failed,
123 self.flow_events_lagged_total,
124 self.audit_events_lagged_total,
125 )
126 }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131 pub phase: RuntimeLifecyclePhase,
132 pub running: bool,
133 pub port: Option<u16>,
134 pub uptime: Option<u64>,
135 pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140 pub status: CoreStatusSnapshot,
141 pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146 pub pending_count: usize,
147 pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152 pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157 pub since_ms: Option<u64>,
158 pub until_ms: Option<u64>,
159 pub actor: Option<AuditActor>,
160 pub kind: Option<AuditEventKind>,
161 pub outcome: Option<AuditOutcome>,
162 pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166 fn default() -> Self {
167 Self {
168 since_ms: None,
169 until_ms: None,
170 actor: None,
171 kind: None,
172 outcome: None,
173 limit: 50,
174 }
175 }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181 Created,
182 Starting,
183 Running,
184 Stopping,
185 Stopped,
186 Failed,
187}
188
189impl RuntimeLifecyclePhase {
190 pub fn as_str(&self) -> &'static str {
191 match self {
192 Self::Created => "created",
193 Self::Starting => "starting",
194 Self::Running => "running",
195 Self::Stopping => "stopping",
196 Self::Stopped => "stopped",
197 Self::Failed => "failed",
198 }
199 }
200
201 pub fn is_active(&self) -> bool {
202 matches!(self, Self::Starting | Self::Running | Self::Stopping)
203 }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208 pub phase: RuntimeLifecyclePhase,
209 pub port: Option<u16>,
210 pub started_at_ms: Option<u64>,
211 pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215 pub fn created() -> Self {
216 Self {
217 phase: RuntimeLifecyclePhase::Created,
218 port: None,
219 started_at_ms: None,
220 last_error: None,
221 }
222 }
223
224 pub fn is_active(&self) -> bool {
225 self.phase.is_active()
226 }
227
228 pub fn uptime_seconds(&self) -> Option<u64> {
229 let started_at_ms = self.started_at_ms?;
230 let now_ms = now_unix_ms();
231 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232 }
233}
234
235pub enum ProxySpawnResult {
236 Started(tokio::task::JoinHandle<()>),
237 AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242 Stopping,
243 NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247 fn from(lifecycle: RuntimeLifecycle) -> Self {
248 Self {
249 running: lifecycle.is_active(),
250 uptime: lifecycle.uptime_seconds(),
251 port: lifecycle.port,
252 last_error: lifecycle.last_error,
253 phase: lifecycle.phase,
254 }
255 }
256}
257
258pub struct CoreState {
259 flow_store: mpsc::Sender<FlowStoreMessage>,
260 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261 rule_store: mpsc::Sender<RuleStoreMessage>,
262 store: Option<Store>,
263 #[cfg(feature = "script")]
264 pub script_interceptor: Arc<ScriptInterceptor>,
265 pub policy_tx: watch::Sender<ProxyPolicy>,
266 pub flows_dropped: Arc<AtomicUsize>,
267 audit_events_total: Arc<AtomicUsize>,
268 audit_events_failed: Arc<AtomicUsize>,
269 flow_events_lagged_total: Arc<AtomicUsize>,
270 audit_events_lagged_total: Arc<AtomicUsize>,
271 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280 pub async fn new(db_url: Option<String>) -> Self {
281 const AUDIT_HISTORY_LIMIT: usize = 200;
282
283 let store = if let Some(url) = db_url {
284 match Store::connect(&url).await {
285 Ok(s) => {
286 if let Err(e) = s.init().await {
287 tracing::error!("Failed to init store: {}", e);
288 }
289 Some(s)
290 }
291 Err(e) => {
292 tracing::error!("Failed to connect to store: {}", e);
293 None
294 }
295 }
296 } else {
297 None
298 };
299
300 let (flow_tx, flow_rx) = mpsc::channel(10000);
301 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302 tokio::spawn(flow_actor.run());
303
304 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306 tokio::spawn(intercept_actor.run());
307
308 let (rule_tx, rule_rx) = mpsc::channel(100);
309 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310 tokio::spawn(rule_actor.run());
311
312 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313 let (flow_broadcast_tx, _) = broadcast::channel(1000);
314 let (audit_broadcast_tx, _) = broadcast::channel(256);
315 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317 #[cfg(feature = "script")]
318 let script_interceptor = ScriptInterceptor::new()
319 .await
320 .expect("Failed to initialize ScriptInterceptor");
321 Self {
322 flow_store: flow_tx,
323 intercept_broker: intercept_tx,
324 rule_store: rule_tx,
325 store,
326 #[cfg(feature = "script")]
327 script_interceptor: Arc::new(script_interceptor),
328 policy_tx,
329 flows_dropped: Arc::new(AtomicUsize::new(0)),
330 audit_events_total: Arc::new(AtomicUsize::new(0)),
331 audit_events_failed: Arc::new(AtomicUsize::new(0)),
332 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
333 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
334 flow_broadcast_tx,
335 audit_broadcast_tx,
336 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
337 lifecycle_tx,
338 shutdown_tx: Mutex::new(None),
339 }
340 }
341
342 pub async fn get_metrics(&self) -> CoreMetrics {
343 let (flow_tx, flow_rx) = oneshot::channel();
344 let _ = self
345 .flow_store
346 .send(FlowStoreMessage::GetMetrics(flow_tx))
347 .await;
348 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
349
350 let (int_tx, int_rx) = oneshot::channel();
351 let _ = self
352 .intercept_broker
353 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
354 .await;
355 let (
356 intercepts_pending,
357 ws_pending_messages,
358 oldest_intercept_age_ms,
359 oldest_ws_message_age_ms,
360 ) = int_rx.await.unwrap_or((0, 0, None, None));
361
362 let (rule_tx, rule_rx) = oneshot::channel();
363 let _ = self
364 .rule_store
365 .send(RuleStoreMessage::GetMetrics(rule_tx))
366 .await;
367 let rule_exec_errors = rule_rx.await.unwrap_or(0);
368
369 CoreMetrics {
370 flows_total,
371 flows_in_memory,
372 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
373 intercepts_pending,
374 ws_pending_messages,
375 oldest_intercept_age_ms,
376 oldest_ws_message_age_ms,
377 rule_exec_errors,
378 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
379 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
380 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
381 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
382 }
383 }
384
385 pub async fn get_metrics_prometheus_text(&self) -> String {
386 let mut text = self.get_metrics().await.to_prometheus_text();
387 #[cfg(feature = "script")]
388 {
389 text.push_str(&self.script_interceptor.metrics.prometheus_lines());
390 }
391 text
392 }
393
394 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
395 self.lifecycle().into()
396 }
397
398 pub async fn status_report(&self) -> CoreStatusReport {
399 CoreStatusReport {
400 status: self.status_snapshot(),
401 metrics: self.get_metrics().await,
402 }
403 }
404
405 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
406 let metrics = self.get_metrics().await;
407 CoreInterceptSnapshot {
408 pending_count: metrics.intercepts_pending,
409 ws_pending_count: metrics.ws_pending_messages,
410 }
411 }
412
413 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
414 let events = self.recent_audit_events();
415 let start = events.len().saturating_sub(limit);
416 CoreAuditSnapshot {
417 events: events.into_iter().skip(start).collect(),
418 }
419 }
420
421 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
422 let limit = query.limit.clamp(1, 500);
423 if let Some(store) = &self.store {
424 let rows = store
425 .query_audit_events(
426 query.since_ms,
427 query.until_ms,
428 query.actor.as_ref().map(AuditActor::as_str),
429 query.kind.as_ref().map(AuditEventKind::as_str),
430 query.outcome.as_ref().map(AuditOutcome::as_str),
431 limit,
432 )
433 .await
434 .unwrap_or_default();
435
436 let mut events = Vec::with_capacity(rows.len());
437 for row in rows {
438 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
439 events.push(event);
440 }
441 }
442 return CoreAuditSnapshot { events };
443 }
444
445 let mut events = self.recent_audit_events();
446 events.reverse();
447 let filtered = events
448 .into_iter()
449 .filter(|event| {
450 query
451 .since_ms
452 .map(|v| event.timestamp_ms >= v)
453 .unwrap_or(true)
454 })
455 .filter(|event| {
456 query
457 .until_ms
458 .map(|v| event.timestamp_ms <= v)
459 .unwrap_or(true)
460 })
461 .filter(|event| {
462 query
463 .actor
464 .as_ref()
465 .map(|v| &event.actor == v)
466 .unwrap_or(true)
467 })
468 .filter(|event| {
469 query
470 .kind
471 .as_ref()
472 .map(|v| &event.kind == v)
473 .unwrap_or(true)
474 })
475 .filter(|event| {
476 query
477 .outcome
478 .as_ref()
479 .map(|v| &event.outcome == v)
480 .unwrap_or(true)
481 })
482 .take(limit)
483 .collect();
484 CoreAuditSnapshot { events: filtered }
485 }
486
487 pub async fn get_flow(&self, id: String) -> Option<Flow> {
488 let (tx, rx) = oneshot::channel();
489 if let Err(e) = self
490 .flow_store
491 .send(FlowStoreMessage::GetFlow {
492 id: id.clone(),
493 respond_to: tx,
494 })
495 .await
496 {
497 error!("Failed to send GetFlow request: {}", e);
498 if let Some(store) = &self.store {
499 return store
500 .load_flow(&id)
501 .await
502 .ok()
503 .flatten()
504 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
505 }
506 return None;
507 }
508 let flow = rx
509 .await
510 .map_err(|e| {
511 error!("Failed to receive Flow response: {}", e);
512 e
513 })
514 .unwrap_or(None);
515 if let Some(flow) = flow {
516 return Some(redact_flow(flow, &self.current_redaction_policy()));
517 }
518 if let Some(store) = &self.store {
519 return store
520 .load_flow(&id)
521 .await
522 .ok()
523 .flatten()
524 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
525 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
526 }
527 None
528 }
529
530 pub async fn get_rules(&self) -> Vec<Rule> {
531 let (tx, rx) = oneshot::channel();
532 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
533 error!("Failed to send GetRules request: {}", e);
534 return Vec::new();
535 }
536 rx.await
537 .map_err(|e| {
538 error!("Failed to receive Rules response: {}", e);
539 e
540 })
541 .unwrap_or_default()
542 }
543
544 pub async fn set_rules(&self, rules: Vec<Rule>) {
545 let _ = self
546 .set_rules_from(
547 AuditActor::Runtime,
548 "rules.replace",
549 "rule_set".to_string(),
550 json!({}),
551 rules,
552 )
553 .await;
554 }
555
556 pub async fn upsert_rule_from(
557 &self,
558 actor: AuditActor,
559 operation: &str,
560 target: String,
561 details: serde_json::Value,
562 rule: Rule,
563 ) -> Result<(), String> {
564 let rule_id = rule.id.clone();
565 let mut rules = self.get_rules().await;
566 rules.retain(|existing| existing.id != rule_id);
567 rules.push(rule);
568 self.set_rules_from(actor, operation, target, details, rules)
569 .await
570 }
571
572 pub async fn delete_rule_from(
573 &self,
574 actor: AuditActor,
575 operation: &str,
576 target: String,
577 details: serde_json::Value,
578 rule_id: &str,
579 ) -> Result<bool, String> {
580 let mut rules = self.get_rules().await;
581 let before = rules.len();
582 rules.retain(|rule| rule.id != rule_id);
583 if rules.len() == before {
584 return Ok(false);
585 }
586 self.set_rules_from(actor, operation, target, details, rules)
587 .await
588 .map(|_| true)
589 }
590
591 pub async fn create_mock_response_rule_from(
592 &self,
593 actor: AuditActor,
594 target: String,
595 details: serde_json::Value,
596 config: MockResponseRuleConfig,
597 ) -> Result<String, String> {
598 let rule_id = config.rule_id.clone();
599 let rule = build_mock_response_rule(config);
600 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
601 .await
602 .map(|_| rule_id)
603 }
604
605 pub async fn create_intercept_rule_from(
606 &self,
607 actor: AuditActor,
608 target: String,
609 details: serde_json::Value,
610 config: InterceptRuleConfig,
611 ) -> Result<String, String> {
612 let rule_id = config.rule_id.clone();
613 let mut rules = self.get_rules().await;
614 rules.extend(build_intercept_rules(config));
615 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
616 .await
617 .map(|_| rule_id)
618 }
619
620 pub async fn upsert_legacy_intercept_rule_from(
621 &self,
622 actor: AuditActor,
623 target: String,
624 details: serde_json::Value,
625 rule: InterceptRule,
626 ) -> Result<(), String> {
627 let rule_id = rule.id.clone();
628 let mut rules = self.get_rules().await;
629 rules.retain(|existing| {
630 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
631 });
632 rules.extend(rule.to_rules());
633 self.set_rules_from(
634 actor,
635 "rule.intercept_legacy_upsert",
636 target,
637 details,
638 rules,
639 )
640 .await
641 }
642
643 pub async fn set_rules_from(
644 &self,
645 actor: AuditActor,
646 operation: &str,
647 target: String,
648 details: serde_json::Value,
649 rules: Vec<Rule>,
650 ) -> Result<(), String> {
651 let rule_count = rules.len();
652 if let Err(e) = self
653 .rule_store
654 .send(RuleStoreMessage::SetRules(rules))
655 .await
656 {
657 error!("Failed to send SetRules request: {}", e);
658 self.record_audit_event(AuditEvent::new(
659 actor,
660 AuditEventKind::RuleChanged,
661 target,
662 AuditOutcome::Failed,
663 json!({
664 "operation": operation,
665 "rule_count": rule_count,
666 "details": details,
667 "error": e.to_string()
668 }),
669 ));
670 return Err(e.to_string());
671 }
672
673 self.record_audit_event(AuditEvent::new(
674 actor,
675 AuditEventKind::RuleChanged,
676 target,
677 AuditOutcome::Success,
678 json!({
679 "operation": operation,
680 "rule_count": rule_count,
681 "details": details
682 }),
683 ));
684 Ok(())
685 }
686
687 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
688 let mut new_rules = Vec::new();
689 for rule in rules {
690 new_rules.extend(rule.to_rules());
691 }
692 self.set_rules(new_rules).await;
693 }
694
695 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
696 let (tx, rx) = oneshot::channel();
697 if let Err(e) = self
698 .rule_store
699 .send(RuleStoreMessage::GetRuleEngine(tx))
700 .await
701 {
702 error!("Failed to send GetRuleEngine request: {}", e);
703 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
704 }
705 rx.await
706 .map_err(|e| {
707 error!("Failed to receive RuleEngine response: {}", e);
708 e
709 })
710 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
711 }
712
713 pub fn update_policy(&self, policy: ProxyPolicy) {
714 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
715 }
716
717 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
718 let mut policy = self.policy_snapshot();
719 policy.apply_patch(patch);
720 self.update_policy_from(actor, target, policy);
721 }
722
723 pub fn policy_snapshot(&self) -> ProxyPolicy {
724 self.policy_tx.borrow().clone()
725 }
726
727 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
728 let details = json!({
729 "strict_http_semantics": policy.strict_http_semantics,
730 "request_timeout_ms": policy.request_timeout_ms,
731 "max_body_size": policy.max_body_size,
732 "transparent_enabled": policy.transparent_enabled,
733 "redaction_enabled": policy.redaction.enabled,
734 "redact_bodies": policy.redaction.redact_bodies
735 });
736
737 self.policy_tx.send_replace(policy);
738 self.record_audit_event(AuditEvent::new(
739 actor,
740 AuditEventKind::PolicyUpdated,
741 target,
742 AuditOutcome::Success,
743 details,
744 ));
745 }
746
747 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
748 if let Err(e) = self
749 .intercept_broker
750 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
751 .await
752 {
753 error!("Failed to send RegisterIntercept request: {}", e);
754 }
755 }
756
757 pub async fn resolve_intercept(
758 &self,
759 key: String,
760 result: InterceptionResult,
761 ) -> Result<(), String> {
762 let (tx, rx) = oneshot::channel();
763 if let Err(e) = self
764 .intercept_broker
765 .send(InterceptBrokerMessage::ResolveIntercept {
766 key,
767 result,
768 respond_to: tx,
769 })
770 .await
771 {
772 error!("Failed to send ResolveIntercept request: {}", e);
773 return Err(e.to_string());
774 }
775 rx.await.map_err(|_| "Actor dropped".to_string())?
776 }
777
778 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
779 let (tx, rx) = oneshot::channel();
780 if let Err(e) = self
781 .intercept_broker
782 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
783 key,
784 respond_to: tx,
785 })
786 .await
787 {
788 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
789 return None;
790 }
791 rx.await
792 .map_err(|e| {
793 error!("Failed to receive WebSocketMessage response: {}", e);
794 e
795 })
796 .unwrap_or(None)
797 }
798
799 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
800 if let Err(e) = self
801 .intercept_broker
802 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
803 .await
804 {
805 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
806 }
807 }
808
809 pub async fn is_intercept_pending(&self, key: String) -> bool {
810 let (tx, rx) = oneshot::channel();
811 if let Err(e) = self
812 .intercept_broker
813 .send(InterceptBrokerMessage::GetPendingIntercept {
814 key,
815 respond_to: tx,
816 })
817 .await
818 {
819 error!("Failed to send GetPendingIntercept request: {}", e);
820 return false;
821 }
822 rx.await
823 .map_err(|e| {
824 error!("Failed to receive PendingIntercept response: {}", e);
825 e
826 })
827 .unwrap_or(false)
828 }
829
830 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
831 let (tx, rx) = oneshot::channel();
832 if let Err(e) = self
833 .intercept_broker
834 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
835 flow_id,
836 respond_to: tx,
837 })
838 .await
839 {
840 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
841 return false;
842 }
843 rx.await
844 .map_err(|e| {
845 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
846 e
847 })
848 .unwrap_or(false)
849 }
850
851 pub fn upsert_flow(&self, flow: Box<Flow>) {
852 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
853 error!("FlowStore dropped flow: {}", e);
855 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
856 }
857 }
858
859 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
860 if let Err(e) = self
861 .flow_store
862 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
863 {
864 error!("FlowStore dropped WS message: {}", e);
865 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
866 }
867 }
868
869 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
870 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
871 flow_id,
872 body,
873 direction,
874 }) {
875 error!("FlowStore dropped HTTP body: {}", e);
876 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
877 }
878 }
879
880 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
882 self.flow_broadcast_tx.subscribe()
883 }
884
885 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
886 self.audit_broadcast_tx.subscribe()
887 }
888
889 fn current_redaction_policy(&self) -> RedactionPolicy {
890 self.policy_tx.borrow().redaction.clone()
891 }
892
893 pub fn record_flow_events_lagged(&self, skipped: u64) {
894 self.flow_events_lagged_total
895 .fetch_add(skipped as usize, Ordering::Relaxed);
896 }
897
898 pub fn record_audit_events_lagged(&self, skipped: u64) {
899 self.audit_events_lagged_total
900 .fetch_add(skipped as usize, Ordering::Relaxed);
901 }
902
903 pub fn lifecycle(&self) -> RuntimeLifecycle {
904 self.lifecycle_tx.borrow().clone()
905 }
906
907 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
908 self.lifecycle_tx.subscribe()
909 }
910
911 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
912 let current = self.lifecycle();
913 if current.is_active() {
914 return Err(format!(
915 "Proxy is already {} on port {}",
916 current.phase.as_str(),
917 current.port.unwrap_or(port)
918 ));
919 }
920
921 let mut guard = self
922 .shutdown_tx
923 .lock()
924 .map_err(|_| "shutdown state poisoned".to_string())?;
925 *guard = Some(shutdown_tx);
926 drop(guard);
927
928 self.update_lifecycle(RuntimeLifecycle {
929 phase: RuntimeLifecyclePhase::Starting,
930 port: Some(port),
931 started_at_ms: None,
932 last_error: None,
933 });
934 Ok(())
935 }
936
937 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
938 let mut guard = self
939 .shutdown_tx
940 .lock()
941 .map_err(|_| "shutdown state poisoned".to_string())?;
942 let Some(tx) = guard.take() else {
943 return Ok(ProxyStopResult::NotRunning);
944 };
945 drop(guard);
946
947 let current = self.lifecycle();
948 self.update_lifecycle(RuntimeLifecycle {
949 phase: RuntimeLifecyclePhase::Stopping,
950 port: current.port,
951 started_at_ms: current.started_at_ms,
952 last_error: current.last_error,
953 });
954 let _ = tx.send(());
955 Ok(ProxyStopResult::Stopping)
956 }
957
958 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
959 self.audit_history
960 .lock()
961 .map(|events| events.iter().cloned().collect())
962 .unwrap_or_default()
963 }
964
965 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
967 let redaction = self.current_redaction_policy();
968 if let Some(store) = &self.store {
969 return store
970 .query_flow_summaries(&query)
971 .await
972 .unwrap_or_default()
973 .into_iter()
974 .map(|summary| redact_flow_summary(summary, &redaction))
975 .collect();
976 }
977 let (tx, rx) = oneshot::channel();
978 if let Err(e) = self
979 .flow_store
980 .send(FlowStoreMessage::SearchFlows {
981 query,
982 respond_to: tx,
983 })
984 .await
985 {
986 error!("Failed to send SearchFlows request: {}", e);
987 return Vec::new();
988 }
989 rx.await
990 .unwrap_or_default()
991 .into_iter()
992 .map(|summary| redact_flow_summary(summary, &redaction))
993 .collect()
994 }
995
996 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
997 let redaction = self.current_redaction_policy();
998 redact_flow_update(update, &redaction)
999 }
1000
1001 pub async fn resolve_intercept_with_modifications(
1011 &self,
1012 key: String,
1013 action: &str,
1014 mods: Option<relay_core_api::modification::FlowModification>,
1015 ) -> Result<(), String> {
1016 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1017 .await
1018 }
1019
1020 pub async fn resolve_intercept_with_modifications_from(
1021 &self,
1022 actor: AuditActor,
1023 key: String,
1024 action: &str,
1025 mods: Option<relay_core_api::modification::FlowModification>,
1026 ) -> Result<(), String> {
1027 let modified_fields = modification_field_names(mods.as_ref());
1028 let result = match action {
1029 "drop" => InterceptionResult::Drop,
1030 _ => match mods {
1031 None => InterceptionResult::Continue,
1032 Some(m) => {
1033 let parts: Vec<&str> = key.splitn(4, ':').collect();
1034 match parts.as_slice() {
1035 [flow_id, phase] => {
1036 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1037 modification::apply_flow_modification(&flow, phase, m)
1038 } else {
1039 InterceptionResult::Continue
1040 }
1041 }
1042 [_, "ws_msg", _] => {
1045 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1046 modification::apply_ws_modification(&msg, m)
1047 } else {
1048 InterceptionResult::Continue
1049 }
1050 }
1051 _ => InterceptionResult::Continue,
1052 }
1053 }
1054 },
1055 };
1056 let outcome = self.resolve_intercept(key.clone(), result).await;
1057 let audit_outcome = if outcome.is_ok() {
1058 AuditOutcome::Success
1059 } else {
1060 AuditOutcome::Failed
1061 };
1062 let error_message = outcome.as_ref().err().cloned();
1063
1064 self.record_audit_event(AuditEvent::new(
1065 actor,
1066 AuditEventKind::InterceptResolved,
1067 key,
1068 audit_outcome,
1069 json!({
1070 "action": action,
1071 "has_modifications": !modified_fields.is_empty(),
1072 "modified_fields": modified_fields,
1073 "error": error_message
1074 }),
1075 ));
1076 outcome
1077 }
1078
1079 #[cfg(feature = "script")]
1080 pub async fn load_script_from(
1081 &self,
1082 actor: AuditActor,
1083 target: String,
1084 script: &str,
1085 ) -> Result<(), String> {
1086 let result = self
1087 .script_interceptor
1088 .load_script(script)
1089 .await
1090 .map_err(|e| e.to_string());
1091
1092 let outcome = if result.is_ok() {
1093 AuditOutcome::Success
1094 } else {
1095 AuditOutcome::Failed
1096 };
1097 let error_message = result.as_ref().err().cloned();
1098
1099 self.record_audit_event(AuditEvent::new(
1100 actor,
1101 AuditEventKind::ScriptReloaded,
1102 target,
1103 outcome,
1104 json!({
1105 "script_bytes": script.len(),
1106 "error": error_message
1107 }),
1108 ));
1109
1110 result
1111 }
1112
1113 #[cfg(feature = "script")]
1115 pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1116 self.script_interceptor.set_env_allow(env_allow).await;
1117 }
1118
1119 fn record_audit_event(&self, event: AuditEvent) {
1120 const AUDIT_HISTORY_LIMIT: usize = 200;
1121 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1122 if event.outcome == AuditOutcome::Failed {
1123 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1124 }
1125
1126 let details = event.details.to_string();
1127 tracing::info!(
1128 target: "relay_core_audit",
1129 event_id = %event.id,
1130 actor = %event.actor.as_str(),
1131 kind = %event.kind.as_str(),
1132 target = %event.target,
1133 outcome = %event.outcome.as_str(),
1134 details = %details
1135 );
1136
1137 if let Ok(mut history) = self.audit_history.lock() {
1138 if history.len() >= AUDIT_HISTORY_LIMIT {
1139 history.pop_front();
1140 }
1141 history.push_back(event.clone());
1142 }
1143
1144 if let Some(store) = self.store.clone() {
1145 let event_json = serde_json::to_value(&event).unwrap_or_default();
1146 let event_id = event.id.clone();
1147 let timestamp_ms = event.timestamp_ms;
1148 let actor = event.actor.as_str().to_string();
1149 let kind = event.kind.as_str().to_string();
1150 let target = event.target.clone();
1151 let outcome = event.outcome.as_str().to_string();
1152 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1153 handle.spawn(async move {
1154 if let Err(e) = store
1155 .save_audit_event(AuditEventRecord {
1156 id: &event_id,
1157 timestamp_ms,
1158 actor: &actor,
1159 kind: &kind,
1160 target: &target,
1161 outcome: &outcome,
1162 content: &event_json,
1163 })
1164 .await
1165 {
1166 tracing::error!("Failed to persist audit event: {}", e);
1167 }
1168 });
1169 }
1170 }
1171
1172 let _ = self.audit_broadcast_tx.send(event);
1173 }
1174
1175 fn transition_to_running(&self, port: u16) {
1176 self.update_lifecycle(RuntimeLifecycle {
1177 phase: RuntimeLifecyclePhase::Running,
1178 port: Some(port),
1179 started_at_ms: Some(now_unix_ms()),
1180 last_error: None,
1181 });
1182 }
1183
1184 fn transition_to_stopped(&self) {
1185 if let Ok(mut guard) = self.shutdown_tx.lock() {
1186 *guard = None;
1187 }
1188
1189 self.update_lifecycle(RuntimeLifecycle {
1190 phase: RuntimeLifecyclePhase::Stopped,
1191 port: None,
1192 started_at_ms: None,
1193 last_error: None,
1194 });
1195 }
1196
1197 fn transition_to_failed(&self, port: u16, error: String) {
1198 if let Ok(mut guard) = self.shutdown_tx.lock() {
1199 *guard = None;
1200 }
1201
1202 self.update_lifecycle(RuntimeLifecycle {
1203 phase: RuntimeLifecyclePhase::Failed,
1204 port: Some(port),
1205 started_at_ms: None,
1206 last_error: Some(error),
1207 });
1208 }
1209
1210 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1211 tracing::info!(
1212 target: "relay_core_lifecycle",
1213 phase = %lifecycle.phase.as_str(),
1214 port = ?lifecycle.port,
1215 started_at_ms = ?lifecycle.started_at_ms,
1216 last_error = ?lifecycle.last_error
1217 );
1218 self.lifecycle_tx.send_replace(lifecycle);
1219 }
1220
1221 pub fn spawn_proxy(
1222 self: &Arc<Self>,
1223 config: ProxyConfig,
1224 sink: mpsc::Sender<FlowUpdate>,
1225 extra_interceptor: Option<Arc<dyn Interceptor>>,
1226 ) -> Result<ProxySpawnResult, String> {
1227 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1228 match self.prepare_start(config.port, shutdown_tx) {
1229 Ok(()) => {}
1230 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1231 Err(error) => return Err(error),
1232 }
1233 let state = self.clone();
1234 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1235 if let Err(error) = state
1236 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1237 .await
1238 {
1239 error!("Proxy failed: {}", error);
1240 }
1241 })))
1242 }
1243
1244 pub async fn start_proxy(
1245 self: &Arc<Self>,
1246 config: ProxyConfig,
1247 sink: mpsc::Sender<FlowUpdate>,
1248 extra_interceptor: Option<Arc<dyn Interceptor>>,
1249 ) -> Result<(), String> {
1250 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1251 self.prepare_start(config.port, shutdown_tx)?;
1252 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1253 .await
1254 }
1255
1256 async fn run_proxy(
1257 self: &Arc<Self>,
1258 config: ProxyConfig,
1259 sink: mpsc::Sender<FlowUpdate>,
1260 extra_interceptor: Option<Arc<dyn Interceptor>>,
1261 shutdown_rx: oneshot::Receiver<()>,
1262 ) -> Result<(), String> {
1263 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1264 let state = self.clone();
1265
1266 if let Some(parent) = config.ca_cert_path.parent()
1267 && !parent.exists()
1268 {
1269 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1270 }
1271
1272 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1273 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1274 let ca = Arc::new(ca);
1275
1276 #[cfg(feature = "script")]
1277 let script_interceptor = self.script_interceptor.clone();
1278
1279 #[cfg(feature = "script")]
1280 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1281
1282 #[cfg(not(feature = "script"))]
1283 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1284
1285 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1286 self.clone(),
1287 )));
1288
1289 if let Some(interceptor) = extra_interceptor {
1290 interceptors.push(interceptor);
1291 }
1292
1293 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1294 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1295
1296 tokio::spawn(async move {
1297 while let Some(update) = proxy_rx.recv().await {
1298 match update.clone() {
1299 FlowUpdate::Full(flow) => {
1300 state.upsert_flow(flow);
1301 }
1302 FlowUpdate::WebSocketMessage { flow_id, message } => {
1303 state.append_ws_message(flow_id, message);
1304 }
1305 FlowUpdate::HttpBody {
1306 flow_id,
1307 direction,
1308 body,
1309 } => {
1310 state.update_http_body(flow_id, body, direction);
1311 }
1312 }
1313
1314 let _ = state.flow_broadcast_tx.send(update.clone());
1315
1316 if sink.try_send(update).is_err() {
1317 relay_core_lib::metrics::inc_flows_dropped();
1318 }
1319 }
1320 });
1321
1322 if let Some(udp_port) = config.udp_tproxy_port {
1323 let udp_proxy_tx = proxy_tx.clone();
1324 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1325 tokio::spawn(async move {
1326 match tokio::net::UdpSocket::bind(udp_addr).await {
1327 Ok(socket) => {
1328 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1329 if let Err(e) = proxy.run(udp_proxy_tx).await {
1330 error!("UDP TPROXY failed: {}", e);
1331 }
1332 }
1333 Err(e) => {
1334 error!("Failed to bind UDP TPROXY socket: {}", e);
1335 }
1336 }
1337 });
1338 }
1339
1340 let listener = match TcpListener::bind(addr).await {
1341 Ok(listener) => listener,
1342 Err(e) => {
1343 if let Ok(mut guard) = self.shutdown_tx.lock() {
1344 *guard = None;
1345 }
1346 let message = format!("Failed to bind to address {}: {}", addr, e);
1347 self.transition_to_failed(config.port, message.clone());
1348 return Err(message);
1349 }
1350 };
1351
1352 self.transition_to_running(config.port);
1353 let shutdown_rx = Some(shutdown_rx);
1354
1355 if config.transparent {
1356 let policy = ProxyPolicy {
1357 transparent_enabled: true,
1358 ..Default::default()
1359 };
1360 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1361 let policy_rx = self.policy_tx.subscribe();
1362
1363 let provider: Arc<dyn OriginalDstProvider> = {
1364 let mut addrs = BTreeSet::new();
1365 if let Ok(local) = listener.local_addr() {
1366 addrs.insert(local);
1367 }
1368
1369 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1370 {
1371 Arc::new(LinuxOriginalDstProvider::new(addrs))
1372 }
1373 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1374 {
1375 match MacOsOriginalDstProvider::new(addrs.clone()) {
1376 Ok(provider) => Arc::new(provider),
1377 Err(e) => {
1378 error!("Failed to initialize macOS PF provider: {}", e);
1379 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1380 }
1381 }
1382 }
1383 #[cfg(target_os = "windows")]
1384 {
1385 let filter =
1386 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1387 .to_string();
1388 let port = config.port;
1389 tokio::spawn(async move {
1390 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1391 .await;
1392 });
1393
1394 Arc::new(WindowsOriginalDstProvider::new(addrs))
1395 }
1396 #[cfg(not(any(
1397 all(target_os = "linux", feature = "transparent-linux"),
1398 all(target_os = "macos", feature = "transparent-macos"),
1399 target_os = "windows"
1400 )))]
1401 {
1402 Arc::new(NoOpOriginalDstProvider::new(addrs))
1403 }
1404 };
1405
1406 let source = TransparentTcpCaptureSource::new(listener, provider);
1407 let result = relay_core_lib::start_proxy(
1408 source,
1409 proxy_tx,
1410 interceptor,
1411 ca,
1412 policy_rx,
1413 None,
1414 shutdown_rx,
1415 )
1416 .await
1417 .map_err(|e| e.to_string());
1418 if let Err(error) = &result {
1419 self.transition_to_failed(config.port, error.clone());
1420 } else {
1421 self.transition_to_stopped();
1422 }
1423 result
1424 } else {
1425 let source = TcpCaptureSource::new(listener);
1426 let policy = ProxyPolicy::default();
1427 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1428 let policy_rx = self.policy_tx.subscribe();
1429
1430 let result = relay_core_lib::start_proxy(
1431 source,
1432 proxy_tx,
1433 interceptor,
1434 ca,
1435 policy_rx,
1436 None,
1437 shutdown_rx,
1438 )
1439 .await
1440 .map_err(|e| e.to_string());
1441 if let Err(error) = &result {
1442 self.transition_to_failed(config.port, error.clone());
1443 } else {
1444 self.transition_to_stopped();
1445 }
1446 result
1447 }
1448 }
1449}
1450
1451fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1452 match update {
1453 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1454 FlowUpdate::WebSocketMessage {
1455 flow_id,
1456 mut message,
1457 } => {
1458 message.content = redact_body(message.content, redaction);
1459 FlowUpdate::WebSocketMessage { flow_id, message }
1460 }
1461 FlowUpdate::HttpBody {
1462 flow_id,
1463 direction,
1464 body,
1465 } => FlowUpdate::HttpBody {
1466 flow_id,
1467 direction,
1468 body: redact_body(body, redaction),
1469 },
1470 }
1471}
1472
1473fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1474 if !redaction.enabled {
1475 return flow;
1476 }
1477 match &mut flow.layer {
1478 Layer::Http(http) => {
1479 redact_http_request(&mut http.request, redaction);
1480 if let Some(response) = &mut http.response {
1481 redact_headers(&mut response.headers, redaction);
1482 response.body = response
1483 .body
1484 .take()
1485 .map(|body| redact_body(body, redaction));
1486 }
1487 }
1488 Layer::WebSocket(ws) => {
1489 redact_http_request(&mut ws.handshake_request, redaction);
1490 redact_headers(&mut ws.handshake_response.headers, redaction);
1491 ws.handshake_response.body = ws
1492 .handshake_response
1493 .body
1494 .take()
1495 .map(|body| redact_body(body, redaction));
1496 for message in &mut ws.messages {
1497 message.content = redact_body(message.content.clone(), redaction);
1498 }
1499 }
1500 _ => {}
1501 }
1502 flow
1503}
1504
1505fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1506 if !redaction.enabled {
1507 return summary;
1508 }
1509 summary.url = redact_url_string(&summary.url, redaction);
1510 summary
1511}
1512
1513fn redact_http_request(
1514 request: &mut relay_core_api::flow::HttpRequest,
1515 redaction: &RedactionPolicy,
1516) {
1517 redact_headers(&mut request.headers, redaction);
1518 redact_query_pairs(&mut request.query, redaction);
1519 request.url = redact_url(&request.url, redaction);
1520 request.body = request.body.take().map(|body| redact_body(body, redaction));
1521}
1522
1523fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1524 if !redaction.enabled {
1525 return;
1526 }
1527 let sensitive = redaction_set(&redaction.sensitive_header_names);
1528 for (name, value) in headers.iter_mut() {
1529 if sensitive.contains(&name.to_ascii_lowercase()) {
1530 *value = "[REDACTED]".to_string();
1531 }
1532 }
1533}
1534
1535fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1536 if !redaction.enabled {
1537 return;
1538 }
1539 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1540 for (name, value) in query.iter_mut() {
1541 if sensitive.contains(&name.to_ascii_lowercase()) {
1542 *value = "[REDACTED]".to_string();
1543 }
1544 }
1545}
1546
1547fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1548 if !redaction.enabled {
1549 return url.clone();
1550 }
1551 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1552 let pairs: Vec<(String, String)> = url
1553 .query_pairs()
1554 .map(|(k, v)| {
1555 let key = k.to_string();
1556 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1557 "[REDACTED]".to_string()
1558 } else {
1559 v.to_string()
1560 };
1561 (key, value)
1562 })
1563 .collect();
1564 let mut next = url.clone();
1565 if pairs.is_empty() {
1566 return next;
1567 }
1568 next.query_pairs_mut().clear();
1569 for (k, v) in pairs {
1570 next.query_pairs_mut().append_pair(&k, &v);
1571 }
1572 next
1573}
1574
1575fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1576 match url::Url::parse(input) {
1577 Ok(url) => redact_url(&url, redaction).to_string(),
1578 Err(_) => input.to_string(),
1579 }
1580}
1581
1582fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1583 if redaction.enabled && redaction.redact_bodies {
1584 body.content = "[REDACTED]".to_string();
1585 }
1586 body
1587}
1588
1589fn redaction_set(values: &[String]) -> HashSet<String> {
1590 values
1591 .iter()
1592 .map(|value| value.to_ascii_lowercase())
1593 .collect()
1594}
1595
1596#[derive(Clone)]
1597pub struct ProxyConfig {
1598 pub port: u16,
1599 pub ca_cert_path: std::path::PathBuf,
1600 pub ca_key_path: std::path::PathBuf,
1601 pub transparent: bool,
1602 pub udp_tproxy_port: Option<u16>,
1603}
1604
1605impl ProxyConfig {
1606 pub fn new(
1607 port: u16,
1608 ca_cert_path: std::path::PathBuf,
1609 ca_key_path: std::path::PathBuf,
1610 ) -> Self {
1611 Self {
1612 port,
1613 ca_cert_path,
1614 ca_key_path,
1615 transparent: false,
1616 udp_tproxy_port: None,
1617 }
1618 }
1619
1620 pub fn from_app_data_dir(
1621 app_data_dir: impl Into<std::path::PathBuf>,
1622 port: u16,
1623 ) -> Result<Self, String> {
1624 let app_data_dir = app_data_dir.into();
1625 if !app_data_dir.exists() {
1626 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1627 }
1628
1629 Ok(Self::new(
1630 port,
1631 app_data_dir.join("ca_cert.pem"),
1632 app_data_dir.join("ca_key.pem"),
1633 ))
1634 }
1635
1636 pub fn with_transparent(mut self, transparent: bool) -> Self {
1637 self.transparent = transparent;
1638 self
1639 }
1640
1641 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1642 self.udp_tproxy_port = udp_tproxy_port;
1643 self
1644 }
1645}
1646
1647fn now_unix_ms() -> u64 {
1648 SystemTime::now()
1649 .duration_since(UNIX_EPOCH)
1650 .unwrap_or_default()
1651 .as_millis() as u64
1652}
1653
1654fn modification_field_names(
1655 mods: Option<&relay_core_api::modification::FlowModification>,
1656) -> Vec<&'static str> {
1657 let Some(mods) = mods else {
1658 return Vec::new();
1659 };
1660
1661 let mut fields = Vec::new();
1662 if mods.method.is_some() {
1663 fields.push("method");
1664 }
1665 if mods.url.is_some() {
1666 fields.push("url");
1667 }
1668 if mods.request_headers.is_some() {
1669 fields.push("request_headers");
1670 }
1671 if mods.request_body.is_some() {
1672 fields.push("request_body");
1673 }
1674 if mods.status_code.is_some() {
1675 fields.push("status_code");
1676 }
1677 if mods.response_headers.is_some() {
1678 fields.push("response_headers");
1679 }
1680 if mods.response_body.is_some() {
1681 fields.push("response_body");
1682 }
1683 if mods.message_content.is_some() {
1684 fields.push("message_content");
1685 }
1686 fields
1687}
1688
1689#[cfg(test)]
1690mod tests {
1691 use super::*;
1692 use chrono::Utc;
1693 use relay_core_api::flow::{
1694 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1695 ResponseTiming, TransportProtocol,
1696 };
1697 use std::sync::atomic::{AtomicU64, Ordering};
1698 use tokio::time::{Duration, sleep};
1699 use url::Url;
1700 use uuid::Uuid;
1701
1702 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1703
1704 fn sqlite_url() -> String {
1705 let nanos = SystemTime::now()
1706 .duration_since(UNIX_EPOCH)
1707 .expect("clock drift")
1708 .as_nanos();
1709 let pid = std::process::id();
1710 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1711 let db_dir = std::env::current_dir()
1712 .expect("cwd")
1713 .join("target")
1714 .join("test-dbs");
1715 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1716 let db_path = db_dir.join(format!(
1717 "relay-core-runtime-test-{}-{}-{}.db",
1718 pid, nanos, seq
1719 ));
1720 format!("sqlite://{}?mode=rwc", db_path.display())
1721 }
1722
1723 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1724 let start_time =
1725 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1726 let request_url =
1727 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1728 Flow {
1729 id: Uuid::new_v4(),
1730 start_time,
1731 end_time: Some(start_time),
1732 network: NetworkInfo {
1733 client_ip: "127.0.0.1".to_string(),
1734 client_port: 12000,
1735 server_ip: "127.0.0.1".to_string(),
1736 server_port: 8080,
1737 protocol: TransportProtocol::TCP,
1738 tls: false,
1739 tls_version: None,
1740 sni: None,
1741 },
1742 layer: Layer::Http(HttpLayer {
1743 request: HttpRequest {
1744 method: method.to_string(),
1745 url: request_url,
1746 version: "HTTP/1.1".to_string(),
1747 headers: vec![],
1748 cookies: vec![],
1749 query: vec![],
1750 body: None,
1751 },
1752 response: Some(HttpResponse {
1753 status,
1754 status_text: "OK".to_string(),
1755 version: "HTTP/1.1".to_string(),
1756 headers: vec![],
1757 cookies: vec![],
1758 body: None,
1759 timing: ResponseTiming {
1760 time_to_first_byte: None,
1761 time_to_last_byte: None,
1762 connect_time_ms: None,
1763 ssl_time_ms: None,
1764 },
1765 }),
1766 error: None,
1767 }),
1768 tags: vec![],
1769 meta: std::collections::HashMap::new(),
1770 }
1771 }
1772
1773 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1774 let start_time =
1775 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1776 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1777 .expect("url should parse");
1778 Flow {
1779 id: Uuid::new_v4(),
1780 start_time,
1781 end_time: Some(start_time),
1782 network: NetworkInfo {
1783 client_ip: "127.0.0.1".to_string(),
1784 client_port: 12000,
1785 server_ip: "127.0.0.1".to_string(),
1786 server_port: 8080,
1787 protocol: TransportProtocol::TCP,
1788 tls: false,
1789 tls_version: None,
1790 sni: None,
1791 },
1792 layer: Layer::Http(HttpLayer {
1793 request: HttpRequest {
1794 method: "GET".to_string(),
1795 url: request_url,
1796 version: "HTTP/1.1".to_string(),
1797 headers: vec![
1798 (
1799 "Authorization".to_string(),
1800 "Bearer secret-token".to_string(),
1801 ),
1802 ("X-Normal".to_string(), "visible".to_string()),
1803 ],
1804 cookies: vec![],
1805 query: vec![
1806 ("token".to_string(), "abc123".to_string()),
1807 ("ok".to_string(), "1".to_string()),
1808 ],
1809 body: Some(BodyData {
1810 encoding: "utf-8".to_string(),
1811 content: "secret request body".to_string(),
1812 size: 19,
1813 }),
1814 },
1815 response: Some(HttpResponse {
1816 status: 200,
1817 status_text: "OK".to_string(),
1818 version: "HTTP/1.1".to_string(),
1819 headers: vec![
1820 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1821 ("X-Response".to_string(), "visible".to_string()),
1822 ],
1823 cookies: vec![],
1824 body: Some(BodyData {
1825 encoding: "utf-8".to_string(),
1826 content: "secret response body".to_string(),
1827 size: 20,
1828 }),
1829 timing: ResponseTiming {
1830 time_to_first_byte: None,
1831 time_to_last_byte: None,
1832 connect_time_ms: None,
1833 ssl_time_ms: None,
1834 },
1835 }),
1836 error: None,
1837 }),
1838 tags: vec![],
1839 meta: std::collections::HashMap::new(),
1840 }
1841 }
1842
1843 #[tokio::test]
1844 async fn set_rules_from_records_audit_event() {
1845 let state = CoreState::new(None).await;
1846
1847 state
1848 .set_rules_from(
1849 AuditActor::Http,
1850 "rule.upsert",
1851 "rule-1".to_string(),
1852 json!({ "route": "/api/v1/rules" }),
1853 Vec::new(),
1854 )
1855 .await
1856 .expect("set rules should succeed");
1857
1858 let events = state.recent_audit_events();
1859 let event = events.last().expect("audit event should exist");
1860 assert_eq!(event.actor, AuditActor::Http);
1861 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1862 assert_eq!(event.outcome, AuditOutcome::Success);
1863 assert_eq!(event.target, "rule-1");
1864 assert_eq!(event.details["operation"], "rule.upsert");
1865 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1866 }
1867
1868 #[tokio::test]
1869 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1870 let state = CoreState::new(None).await;
1871
1872 state
1873 .upsert_rule_from(
1874 AuditActor::Probe,
1875 "rule.upsert",
1876 "rule-1".to_string(),
1877 json!({ "tool": "set_rule" }),
1878 Rule {
1879 id: "rule-1".to_string(),
1880 name: "first".to_string(),
1881 active: true,
1882 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1883 priority: 1,
1884 termination: relay_core_lib::rule::RuleTermination::Continue,
1885 filter: relay_core_lib::rule::Filter::Url(
1886 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1887 ),
1888 actions: vec![],
1889 constraints: None,
1890 },
1891 )
1892 .await
1893 .expect("initial upsert should succeed");
1894
1895 state
1896 .upsert_rule_from(
1897 AuditActor::Probe,
1898 "rule.upsert",
1899 "rule-1".to_string(),
1900 json!({ "tool": "set_rule" }),
1901 Rule {
1902 id: "rule-1".to_string(),
1903 name: "second".to_string(),
1904 active: true,
1905 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1906 priority: 2,
1907 termination: relay_core_lib::rule::RuleTermination::Continue,
1908 filter: relay_core_lib::rule::Filter::Url(
1909 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1910 ),
1911 actions: vec![],
1912 constraints: None,
1913 },
1914 )
1915 .await
1916 .expect("replacement upsert should succeed");
1917
1918 let rules = state.get_rules().await;
1919 assert_eq!(rules.len(), 1);
1920 assert_eq!(rules[0].name, "second");
1921 let event = state
1922 .recent_audit_events()
1923 .last()
1924 .cloned()
1925 .expect("audit event");
1926 assert_eq!(event.details["operation"], "rule.upsert");
1927 assert_eq!(event.details["details"]["tool"], "set_rule");
1928 }
1929
1930 #[tokio::test]
1931 async fn delete_rule_from_returns_false_when_rule_missing() {
1932 let state = CoreState::new(None).await;
1933
1934 let deleted = state
1935 .delete_rule_from(
1936 AuditActor::Http,
1937 "rule.delete",
1938 "missing".to_string(),
1939 json!({ "route": "/api/v1/rules/{id}" }),
1940 "missing",
1941 )
1942 .await
1943 .expect("delete should not fail");
1944
1945 assert!(!deleted);
1946 assert!(state.recent_audit_events().is_empty());
1947 }
1948
1949 #[tokio::test]
1950 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1951 let state = CoreState::new(None).await;
1952
1953 let rule_id = state
1954 .create_mock_response_rule_from(
1955 AuditActor::Http,
1956 "api-mock-1".to_string(),
1957 json!({ "route": "/api/v1/mock", "status": 201 }),
1958 MockResponseRuleConfig {
1959 rule_id: "api-mock-1".to_string(),
1960 url_pattern: "example.com".to_string(),
1961 name: "api-mock:example.com".to_string(),
1962 status: 201,
1963 content_type: "application/json".to_string(),
1964 body: "{\"ok\":true}".to_string(),
1965 },
1966 )
1967 .await
1968 .expect("mock rule should be created");
1969
1970 assert_eq!(rule_id, "api-mock-1");
1971 let rules = state.get_rules().await;
1972 assert_eq!(rules.len(), 1);
1973 assert_eq!(rules[0].id, "api-mock-1");
1974 let event = state
1975 .recent_audit_events()
1976 .last()
1977 .cloned()
1978 .expect("audit event");
1979 assert_eq!(event.details["operation"], "rule.mock_create");
1980 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1981 }
1982
1983 #[tokio::test]
1984 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1985 let state = CoreState::new(None).await;
1986
1987 let rule_id = state
1988 .create_intercept_rule_from(
1989 AuditActor::Http,
1990 "intercept-1".to_string(),
1991 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1992 InterceptRuleConfig {
1993 rule_id: "intercept-1".to_string(),
1994 active: true,
1995 url_pattern: "example.com".to_string(),
1996 method: None,
1997 phase: "request".to_string(),
1998 name: "api-intercept:example.com".to_string(),
1999 priority: 100,
2000 termination: relay_core_lib::rule::RuleTermination::Stop,
2001 },
2002 )
2003 .await
2004 .expect("intercept rule should be created");
2005
2006 assert_eq!(rule_id, "intercept-1");
2007 let rules = state.get_rules().await;
2008 assert_eq!(rules.len(), 1);
2009 assert_eq!(rules[0].id, "intercept-1");
2010 assert_eq!(rules[0].name, "api-intercept:example.com");
2011 let event = state
2012 .recent_audit_events()
2013 .last()
2014 .cloned()
2015 .expect("audit event");
2016 assert_eq!(event.details["operation"], "rule.intercept_create");
2017 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2018 }
2019
2020 #[tokio::test]
2021 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2022 let state = CoreState::new(None).await;
2023
2024 state
2025 .upsert_legacy_intercept_rule_from(
2026 AuditActor::Tauri,
2027 "legacy-1".to_string(),
2028 json!({ "command": "set_intercept_rule" }),
2029 InterceptRule {
2030 id: "legacy-1".to_string(),
2031 active: true,
2032 url_pattern: "example.com".to_string(),
2033 method: None,
2034 phase: "both".to_string(),
2035 },
2036 )
2037 .await
2038 .expect("initial family upsert should succeed");
2039 assert_eq!(state.get_rules().await.len(), 2);
2040
2041 state
2042 .upsert_legacy_intercept_rule_from(
2043 AuditActor::Tauri,
2044 "legacy-1".to_string(),
2045 json!({ "command": "set_intercept_rule" }),
2046 InterceptRule {
2047 id: "legacy-1".to_string(),
2048 active: true,
2049 url_pattern: "example.org".to_string(),
2050 method: Some("POST".to_string()),
2051 phase: "request".to_string(),
2052 },
2053 )
2054 .await
2055 .expect("replacement family upsert should succeed");
2056
2057 let rules = state.get_rules().await;
2058 assert_eq!(rules.len(), 1);
2059 assert_eq!(rules[0].id, "legacy-1");
2060 let event = state
2061 .recent_audit_events()
2062 .last()
2063 .cloned()
2064 .expect("audit event");
2065 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2066 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2067 }
2068
2069 #[tokio::test]
2070 async fn resolve_intercept_failure_records_failed_audit_event() {
2071 let state = CoreState::new(None).await;
2072
2073 let result = state
2074 .resolve_intercept_with_modifications_from(
2075 AuditActor::Probe,
2076 "missing-flow:request".to_string(),
2077 "drop",
2078 None,
2079 )
2080 .await;
2081
2082 assert!(result.is_err());
2083 let events = state.recent_audit_events();
2084 let event = events.last().expect("audit event should exist");
2085 assert_eq!(event.actor, AuditActor::Probe);
2086 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2087 assert_eq!(event.outcome, AuditOutcome::Failed);
2088 assert_eq!(event.details["action"], "drop");
2089 assert!(
2090 event.details["error"]
2091 .as_str()
2092 .unwrap_or_default()
2093 .contains("Interception not found")
2094 );
2095 }
2096
2097 #[tokio::test]
2098 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2099 let state = CoreState::new(None).await;
2100 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2101
2102 state
2103 .prepare_start(8080, shutdown_tx)
2104 .expect("prepare start should succeed");
2105 let lifecycle = state.lifecycle();
2106 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2107 assert_eq!(lifecycle.port, Some(8080));
2108 assert!(lifecycle.started_at_ms.is_none());
2109 assert!(lifecycle.last_error.is_none());
2110
2111 assert_eq!(
2112 state.stop_proxy().expect("stop should succeed"),
2113 ProxyStopResult::Stopping
2114 );
2115 let lifecycle = state.lifecycle();
2116 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2117 assert_eq!(lifecycle.port, Some(8080));
2118 assert!(shutdown_rx.await.is_ok());
2119 }
2120
2121 #[tokio::test]
2122 async fn status_snapshot_derives_runtime_facing_fields() {
2123 let state = CoreState::new(None).await;
2124 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2125 state
2126 .prepare_start(8080, shutdown_tx)
2127 .expect("prepare start should succeed");
2128
2129 let status = state.status_snapshot();
2130 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2131 assert!(status.running);
2132 assert_eq!(status.port, Some(8080));
2133 assert!(status.uptime.is_none());
2134 assert!(status.last_error.is_none());
2135 }
2136
2137 #[tokio::test]
2138 async fn status_report_combines_status_and_metrics() {
2139 let state = CoreState::new(None).await;
2140 let report = state.status_report().await;
2141
2142 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2143 assert!(!report.status.running);
2144 assert_eq!(report.metrics.intercepts_pending, 0);
2145 assert_eq!(report.metrics.ws_pending_messages, 0);
2146 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2147 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2148 assert_eq!(report.metrics.audit_events_total, 0);
2149 assert_eq!(report.metrics.audit_events_failed, 0);
2150 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2151 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2152 }
2153
2154 #[test]
2155 fn proxy_config_new_and_transport_setters_preserve_values() {
2156 let config = ProxyConfig::new(
2157 8080,
2158 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2159 std::path::PathBuf::from("/tmp/ca_key.pem"),
2160 )
2161 .with_transparent(true)
2162 .with_udp_tproxy_port(Some(15000));
2163
2164 assert_eq!(config.port, 8080);
2165 assert_eq!(
2166 config.ca_cert_path,
2167 std::path::PathBuf::from("/tmp/ca_cert.pem")
2168 );
2169 assert_eq!(
2170 config.ca_key_path,
2171 std::path::PathBuf::from("/tmp/ca_key.pem")
2172 );
2173 assert!(config.transparent);
2174 assert_eq!(config.udp_tproxy_port, Some(15000));
2175 }
2176
2177 #[test]
2178 fn proxy_config_from_app_data_dir_creates_default_paths() {
2179 let unique = SystemTime::now()
2180 .duration_since(UNIX_EPOCH)
2181 .expect("clock drift")
2182 .as_nanos();
2183 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2184
2185 let config =
2186 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2187
2188 assert!(dir.exists());
2189 assert_eq!(config.port, 8899);
2190 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2191 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2192 assert!(!config.transparent);
2193 assert!(config.udp_tproxy_port.is_none());
2194 }
2195
2196 #[tokio::test]
2197 async fn intercept_snapshot_maps_pending_counts() {
2198 let state = CoreState::new(None).await;
2199 let snapshot = state.intercept_snapshot().await;
2200
2201 assert_eq!(snapshot.pending_count, 0);
2202 assert_eq!(snapshot.ws_pending_count, 0);
2203 }
2204
2205 #[tokio::test]
2206 async fn audit_snapshot_returns_latest_events_in_order() {
2207 let state = CoreState::new(None).await;
2208 state.record_audit_event(AuditEvent::new(
2209 AuditActor::Runtime,
2210 AuditEventKind::RuleChanged,
2211 "first",
2212 AuditOutcome::Success,
2213 json!({ "index": 1 }),
2214 ));
2215 state.record_audit_event(AuditEvent::new(
2216 AuditActor::Http,
2217 AuditEventKind::PolicyUpdated,
2218 "second",
2219 AuditOutcome::Success,
2220 json!({ "index": 2 }),
2221 ));
2222
2223 let snapshot = state.audit_snapshot(1);
2224
2225 assert_eq!(snapshot.events.len(), 1);
2226 assert_eq!(snapshot.events[0].target, "second");
2227 assert_eq!(snapshot.events[0].details["index"], 2);
2228 }
2229
2230 #[tokio::test]
2231 async fn query_audit_snapshot_filters_in_memory_events() {
2232 let state = CoreState::new(None).await;
2233 state.record_audit_event(AuditEvent::new(
2234 AuditActor::Http,
2235 AuditEventKind::RuleChanged,
2236 "rule-1",
2237 AuditOutcome::Success,
2238 json!({ "idx": 1 }),
2239 ));
2240 state.record_audit_event(AuditEvent::new(
2241 AuditActor::Probe,
2242 AuditEventKind::PolicyUpdated,
2243 "policy",
2244 AuditOutcome::Failed,
2245 json!({ "idx": 2 }),
2246 ));
2247
2248 let snapshot = state
2249 .query_audit_snapshot(CoreAuditQuery {
2250 actor: Some(AuditActor::Probe),
2251 kind: Some(AuditEventKind::PolicyUpdated),
2252 outcome: Some(AuditOutcome::Failed),
2253 limit: 10,
2254 ..Default::default()
2255 })
2256 .await;
2257
2258 assert_eq!(snapshot.events.len(), 1);
2259 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2260 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2261 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2262 }
2263
2264 #[tokio::test]
2265 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2266 let state = CoreState::new(Some(sqlite_url())).await;
2267 state.update_policy_from(
2268 AuditActor::Http,
2269 "policy".to_string(),
2270 ProxyPolicy {
2271 transparent_enabled: true,
2272 ..Default::default()
2273 },
2274 );
2275
2276 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2277 for _ in 0..10 {
2278 snapshot = state
2279 .query_audit_snapshot(CoreAuditQuery {
2280 actor: Some(AuditActor::Http),
2281 kind: Some(AuditEventKind::PolicyUpdated),
2282 limit: 10,
2283 ..Default::default()
2284 })
2285 .await;
2286 if !snapshot.events.is_empty() {
2287 break;
2288 }
2289 sleep(Duration::from_millis(20)).await;
2290 }
2291
2292 assert!(!snapshot.events.is_empty());
2293 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2294 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2295 }
2296
2297 #[tokio::test]
2298 async fn prepare_start_rejects_second_active_start() {
2299 let state = CoreState::new(None).await;
2300 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2301 state
2302 .prepare_start(8080, shutdown_tx)
2303 .expect("first start should succeed");
2304
2305 let (second_tx, _second_rx) = oneshot::channel();
2306 let error = state
2307 .prepare_start(8081, second_tx)
2308 .expect_err("second active start should be rejected");
2309 assert!(error.contains("already"));
2310 }
2311
2312 #[test]
2313 fn update_policy_records_audit_event() {
2314 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2315 let state = runtime.block_on(CoreState::new(None));
2316
2317 state.update_policy_from(
2318 AuditActor::Runtime,
2319 "policy".to_string(),
2320 ProxyPolicy {
2321 transparent_enabled: true,
2322 ..Default::default()
2323 },
2324 );
2325
2326 let events = state.recent_audit_events();
2327 let event = events.last().expect("audit event should exist");
2328 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2329 assert_eq!(event.outcome, AuditOutcome::Success);
2330 assert_eq!(event.details["transparent_enabled"], true);
2331 }
2332
2333 #[test]
2334 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2335 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2336 let state = runtime.block_on(CoreState::new(None));
2337 let original_timeout = state.policy_snapshot().request_timeout_ms;
2338
2339 state.patch_policy_from(
2340 AuditActor::Runtime,
2341 "policy.patch".to_string(),
2342 relay_core_api::policy::ProxyPolicyPatch {
2343 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2344 enabled: Some(true),
2345 redact_bodies: Some(true),
2346 ..Default::default()
2347 }),
2348 },
2349 );
2350
2351 let policy = state.policy_snapshot();
2352 assert_eq!(policy.request_timeout_ms, original_timeout);
2353 assert!(policy.redaction.enabled);
2354 assert!(policy.redaction.redact_bodies);
2355
2356 let events = state.recent_audit_events();
2357 let event = events.last().expect("audit event should exist");
2358 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2359 assert_eq!(event.details["redaction_enabled"], true);
2360 }
2361
2362 #[tokio::test]
2363 async fn metrics_include_audit_and_lagged_event_counters() {
2364 let state = CoreState::new(None).await;
2365
2366 state.update_policy_from(
2367 AuditActor::Runtime,
2368 "policy".to_string(),
2369 ProxyPolicy::default(),
2370 );
2371 let _ = state
2372 .resolve_intercept_with_modifications_from(
2373 AuditActor::Probe,
2374 "missing-flow:request".to_string(),
2375 "drop",
2376 None,
2377 )
2378 .await;
2379
2380 state.record_flow_events_lagged(3);
2381 state.record_audit_events_lagged(5);
2382
2383 let metrics = state.get_metrics().await;
2384 assert_eq!(metrics.audit_events_total, 2);
2385 assert_eq!(metrics.audit_events_failed, 1);
2386 assert_eq!(metrics.flow_events_lagged_total, 3);
2387 assert_eq!(metrics.audit_events_lagged_total, 5);
2388 }
2389
2390 #[tokio::test]
2391 async fn prometheus_metrics_text_contains_observability_fields() {
2392 let state = CoreState::new(None).await;
2393 state.record_flow_events_lagged(2);
2394 state.record_audit_events_lagged(4);
2395
2396 let text = state.get_metrics_prometheus_text().await;
2397 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2398 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2399 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2400 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2401 }
2402
2403 #[tokio::test]
2404 async fn search_flows_uses_store_with_offset_pagination() {
2405 let state = CoreState::new(Some(sqlite_url())).await;
2406 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2407 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2408 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2409
2410 state.upsert_flow(Box::new(flow_a));
2411 state.upsert_flow(Box::new(flow_b));
2412 state.upsert_flow(Box::new(flow_c));
2413
2414 let mut baseline = Vec::new();
2415 for _ in 0..20 {
2416 baseline = state
2417 .search_flows(FlowQuery {
2418 host: Some("api.example.com".to_string()),
2419 path_contains: None,
2420 method: None,
2421 status_min: None,
2422 status_max: None,
2423 has_error: None,
2424 is_websocket: None,
2425 limit: Some(3),
2426 offset: Some(0),
2427 })
2428 .await;
2429 if baseline.len() == 3 {
2430 break;
2431 }
2432 sleep(Duration::from_millis(20)).await;
2433 }
2434
2435 assert_eq!(baseline.len(), 3);
2436 let page = state
2437 .search_flows(FlowQuery {
2438 host: Some("api.example.com".to_string()),
2439 path_contains: None,
2440 method: None,
2441 status_min: None,
2442 status_max: None,
2443 has_error: None,
2444 is_websocket: None,
2445 limit: Some(1),
2446 offset: Some(1),
2447 })
2448 .await;
2449 assert_eq!(page.len(), 1);
2450 assert_eq!(page[0].id, baseline[1].id);
2451 }
2452
2453 #[tokio::test]
2454 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2455 let state = CoreState::new(Some(sqlite_url())).await;
2456 let first_flow = sample_http_flow(
2457 "persist.example.com",
2458 "/first",
2459 "GET",
2460 200,
2461 1_700_000_010_000,
2462 );
2463 let first_id = first_flow.id.to_string();
2464 state.upsert_flow(Box::new(first_flow));
2465 for i in 0..240 {
2466 state.upsert_flow(Box::new(sample_http_flow(
2467 "persist.example.com",
2468 &format!("/{}", i),
2469 "GET",
2470 200,
2471 1_700_000_020_000 + i,
2472 )));
2473 }
2474
2475 sleep(Duration::from_millis(200)).await;
2476
2477 let loaded = state.get_flow(first_id).await;
2478 assert!(loaded.is_some());
2479 }
2480
2481 #[tokio::test]
2482 async fn search_flows_redacts_summary_url_when_enabled() {
2483 let state = CoreState::new(None).await;
2484 state.update_policy_from(
2485 AuditActor::Runtime,
2486 "policy.redaction".to_string(),
2487 ProxyPolicy {
2488 redaction: RedactionPolicy {
2489 enabled: true,
2490 sensitive_query_keys: vec!["token".to_string()],
2491 redact_bodies: false,
2492 ..Default::default()
2493 },
2494 ..Default::default()
2495 },
2496 );
2497 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2498
2499 let mut items = Vec::new();
2500 for _ in 0..20 {
2501 items = state
2502 .search_flows(FlowQuery {
2503 host: Some("api.example.com".to_string()),
2504 path_contains: Some("/private".to_string()),
2505 ..Default::default()
2506 })
2507 .await;
2508 if !items.is_empty() {
2509 break;
2510 }
2511 sleep(Duration::from_millis(20)).await;
2512 }
2513
2514 assert!(!items.is_empty());
2515 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2516 let token = redacted
2517 .query_pairs()
2518 .find(|(k, _)| k == "token")
2519 .map(|(_, v)| v.to_string());
2520 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2521 }
2522
2523 #[tokio::test]
2524 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2525 let state = CoreState::new(Some(sqlite_url())).await;
2526 state.update_policy_from(
2527 AuditActor::Runtime,
2528 "policy.redaction".to_string(),
2529 ProxyPolicy {
2530 redaction: RedactionPolicy {
2531 enabled: true,
2532 sensitive_header_names: vec![
2533 "authorization".to_string(),
2534 "set-cookie".to_string(),
2535 ],
2536 sensitive_query_keys: vec!["token".to_string()],
2537 redact_bodies: true,
2538 },
2539 ..Default::default()
2540 },
2541 );
2542
2543 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2544 let flow_id = flow.id.to_string();
2545 state.upsert_flow(Box::new(flow));
2546 sleep(Duration::from_millis(80)).await;
2547
2548 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2549 let Layer::Http(http) = loaded.layer else {
2550 panic!("expected http layer");
2551 };
2552
2553 let auth = http
2554 .request
2555 .headers
2556 .iter()
2557 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2558 .map(|(_, v)| v.as_str());
2559 assert_eq!(auth, Some("[REDACTED]"));
2560
2561 let req_query_token = http
2562 .request
2563 .query
2564 .iter()
2565 .find(|(k, _)| k == "token")
2566 .map(|(_, v)| v.as_str());
2567 assert_eq!(req_query_token, Some("[REDACTED]"));
2568
2569 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2570 assert_eq!(req_body, Some("[REDACTED]"));
2571
2572 let response = http.response.expect("response should exist");
2573 let set_cookie = response
2574 .headers
2575 .iter()
2576 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2577 .map(|(_, v)| v.as_str());
2578 assert_eq!(set_cookie, Some("[REDACTED]"));
2579 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2580 assert_eq!(res_body, Some("[REDACTED]"));
2581 }
2582
2583 #[test]
2584 fn redact_flow_update_masks_http_body_when_enabled() {
2585 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2586 let state = runtime.block_on(CoreState::new(None));
2587 state.update_policy_from(
2588 AuditActor::Runtime,
2589 "policy.redaction".to_string(),
2590 ProxyPolicy {
2591 redaction: RedactionPolicy {
2592 enabled: true,
2593 redact_bodies: true,
2594 ..Default::default()
2595 },
2596 ..Default::default()
2597 },
2598 );
2599
2600 let update = FlowUpdate::HttpBody {
2601 flow_id: "f-1".to_string(),
2602 direction: relay_core_api::flow::Direction::ClientToServer,
2603 body: BodyData {
2604 encoding: "utf-8".to_string(),
2605 content: "super-secret".to_string(),
2606 size: 12,
2607 },
2608 };
2609 let redacted = state.redact_flow_update_for_output(update);
2610 match redacted {
2611 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2612 _ => panic!("expected http body update"),
2613 }
2614 }
2615
2616 #[cfg(feature = "script")]
2617 #[tokio::test]
2618 async fn load_script_from_records_audit_event() {
2619 let state = CoreState::new(None).await;
2620
2621 state
2622 .load_script_from(
2623 AuditActor::Tauri,
2624 "tauri.load_script".to_string(),
2625 "globalThis.onRequestHeaders = (_flow) => {};",
2626 )
2627 .await
2628 .expect("script should load");
2629
2630 let events = state.recent_audit_events();
2631 let event = events.last().expect("audit event should exist");
2632 assert_eq!(event.actor, AuditActor::Tauri);
2633 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2634 assert_eq!(event.outcome, AuditOutcome::Success);
2635 assert_eq!(event.target, "tauri.load_script");
2636 }
2637}