1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77 pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82 pub flows_total: usize,
83 pub flows_in_memory: usize,
84 pub flows_dropped: usize,
85 pub intercepts_pending: usize,
86 pub ws_pending_messages: usize,
87 pub oldest_intercept_age_ms: Option<u64>,
88 pub oldest_ws_message_age_ms: Option<u64>,
89 pub rule_exec_errors: usize,
90 pub audit_events_total: usize,
91 pub audit_events_failed: usize,
92 pub flow_events_lagged_total: usize,
93 pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97 pub fn to_prometheus_text(&self) -> String {
98 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100 format!(
101 "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113 self.flows_total,
114 self.flows_in_memory,
115 self.flows_dropped,
116 self.intercepts_pending,
117 self.ws_pending_messages,
118 oldest_intercept_age_ms,
119 oldest_ws_message_age_ms,
120 self.rule_exec_errors,
121 self.audit_events_total,
122 self.audit_events_failed,
123 self.flow_events_lagged_total,
124 self.audit_events_lagged_total,
125 )
126 }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131 pub phase: RuntimeLifecyclePhase,
132 pub running: bool,
133 pub port: Option<u16>,
134 pub uptime: Option<u64>,
135 pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140 pub status: CoreStatusSnapshot,
141 pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146 pub pending_count: usize,
147 pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152 pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157 pub since_ms: Option<u64>,
158 pub until_ms: Option<u64>,
159 pub actor: Option<AuditActor>,
160 pub kind: Option<AuditEventKind>,
161 pub outcome: Option<AuditOutcome>,
162 pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166 fn default() -> Self {
167 Self {
168 since_ms: None,
169 until_ms: None,
170 actor: None,
171 kind: None,
172 outcome: None,
173 limit: 50,
174 }
175 }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181 Created,
182 Starting,
183 Running,
184 Stopping,
185 Stopped,
186 Failed,
187}
188
189impl RuntimeLifecyclePhase {
190 pub fn as_str(&self) -> &'static str {
191 match self {
192 Self::Created => "created",
193 Self::Starting => "starting",
194 Self::Running => "running",
195 Self::Stopping => "stopping",
196 Self::Stopped => "stopped",
197 Self::Failed => "failed",
198 }
199 }
200
201 pub fn is_active(&self) -> bool {
202 matches!(self, Self::Starting | Self::Running | Self::Stopping)
203 }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208 pub phase: RuntimeLifecyclePhase,
209 pub port: Option<u16>,
210 pub started_at_ms: Option<u64>,
211 pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215 pub fn created() -> Self {
216 Self {
217 phase: RuntimeLifecyclePhase::Created,
218 port: None,
219 started_at_ms: None,
220 last_error: None,
221 }
222 }
223
224 pub fn is_active(&self) -> bool {
225 self.phase.is_active()
226 }
227
228 pub fn uptime_seconds(&self) -> Option<u64> {
229 let started_at_ms = self.started_at_ms?;
230 let now_ms = now_unix_ms();
231 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232 }
233}
234
235pub enum ProxySpawnResult {
236 Started(tokio::task::JoinHandle<()>),
237 AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242 Stopping,
243 NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247 fn from(lifecycle: RuntimeLifecycle) -> Self {
248 Self {
249 running: lifecycle.is_active(),
250 uptime: lifecycle.uptime_seconds(),
251 port: lifecycle.port,
252 last_error: lifecycle.last_error,
253 phase: lifecycle.phase,
254 }
255 }
256}
257
258pub struct CoreState {
259 flow_store: mpsc::Sender<FlowStoreMessage>,
260 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261 rule_store: mpsc::Sender<RuleStoreMessage>,
262 store: Option<Store>,
263 #[cfg(feature = "script")]
264 pub script_interceptor: Arc<ScriptInterceptor>,
265 pub policy_tx: watch::Sender<ProxyPolicy>,
266 pub flows_dropped: Arc<AtomicUsize>,
267 audit_events_total: Arc<AtomicUsize>,
268 audit_events_failed: Arc<AtomicUsize>,
269 flow_events_lagged_total: Arc<AtomicUsize>,
270 audit_events_lagged_total: Arc<AtomicUsize>,
271 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280 pub async fn new(db_url: Option<String>) -> Self {
281 const AUDIT_HISTORY_LIMIT: usize = 200;
282
283 let store = if let Some(url) = db_url {
284 match Store::connect(&url).await {
285 Ok(s) => {
286 if let Err(e) = s.init().await {
287 tracing::error!("Failed to init store: {}", e);
288 }
289 Some(s)
290 }
291 Err(e) => {
292 tracing::error!("Failed to connect to store: {}", e);
293 None
294 }
295 }
296 } else {
297 None
298 };
299
300 let (flow_tx, flow_rx) = mpsc::channel(10000);
301 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302 tokio::spawn(flow_actor.run());
303
304 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306 tokio::spawn(intercept_actor.run());
307
308 let (rule_tx, rule_rx) = mpsc::channel(100);
309 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310 tokio::spawn(rule_actor.run());
311
312 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313 let (flow_broadcast_tx, _) = broadcast::channel(1000);
314 let (audit_broadcast_tx, _) = broadcast::channel(256);
315 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317 #[cfg(feature = "script")]
318 let script_interceptor = ScriptInterceptor::new()
319 .await
320 .expect("Failed to initialize ScriptInterceptor");
321 Self {
322 flow_store: flow_tx,
323 intercept_broker: intercept_tx,
324 rule_store: rule_tx,
325 store,
326 #[cfg(feature = "script")]
327 script_interceptor: Arc::new(script_interceptor),
328 policy_tx,
329 flows_dropped: Arc::new(AtomicUsize::new(0)),
330 audit_events_total: Arc::new(AtomicUsize::new(0)),
331 audit_events_failed: Arc::new(AtomicUsize::new(0)),
332 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
333 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
334 flow_broadcast_tx,
335 audit_broadcast_tx,
336 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
337 lifecycle_tx,
338 shutdown_tx: Mutex::new(None),
339 }
340 }
341
342 pub async fn get_metrics(&self) -> CoreMetrics {
343 let (flow_tx, flow_rx) = oneshot::channel();
344 let _ = self
345 .flow_store
346 .send(FlowStoreMessage::GetMetrics(flow_tx))
347 .await;
348 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
349
350 let (int_tx, int_rx) = oneshot::channel();
351 let _ = self
352 .intercept_broker
353 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
354 .await;
355 let (
356 intercepts_pending,
357 ws_pending_messages,
358 oldest_intercept_age_ms,
359 oldest_ws_message_age_ms,
360 ) = int_rx.await.unwrap_or((0, 0, None, None));
361
362 let (rule_tx, rule_rx) = oneshot::channel();
363 let _ = self
364 .rule_store
365 .send(RuleStoreMessage::GetMetrics(rule_tx))
366 .await;
367 let rule_exec_errors = rule_rx.await.unwrap_or(0);
368
369 CoreMetrics {
370 flows_total,
371 flows_in_memory,
372 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
373 intercepts_pending,
374 ws_pending_messages,
375 oldest_intercept_age_ms,
376 oldest_ws_message_age_ms,
377 rule_exec_errors,
378 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
379 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
380 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
381 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
382 }
383 }
384
385 pub async fn get_metrics_prometheus_text(&self) -> String {
386 self.get_metrics().await.to_prometheus_text()
387 }
388
389 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
390 self.lifecycle().into()
391 }
392
393 pub async fn status_report(&self) -> CoreStatusReport {
394 CoreStatusReport {
395 status: self.status_snapshot(),
396 metrics: self.get_metrics().await,
397 }
398 }
399
400 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
401 let metrics = self.get_metrics().await;
402 CoreInterceptSnapshot {
403 pending_count: metrics.intercepts_pending,
404 ws_pending_count: metrics.ws_pending_messages,
405 }
406 }
407
408 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
409 let events = self.recent_audit_events();
410 let start = events.len().saturating_sub(limit);
411 CoreAuditSnapshot {
412 events: events.into_iter().skip(start).collect(),
413 }
414 }
415
416 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
417 let limit = query.limit.clamp(1, 500);
418 if let Some(store) = &self.store {
419 let rows = store
420 .query_audit_events(
421 query.since_ms,
422 query.until_ms,
423 query.actor.as_ref().map(AuditActor::as_str),
424 query.kind.as_ref().map(AuditEventKind::as_str),
425 query.outcome.as_ref().map(AuditOutcome::as_str),
426 limit,
427 )
428 .await
429 .unwrap_or_default();
430
431 let mut events = Vec::with_capacity(rows.len());
432 for row in rows {
433 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
434 events.push(event);
435 }
436 }
437 return CoreAuditSnapshot { events };
438 }
439
440 let mut events = self.recent_audit_events();
441 events.reverse();
442 let filtered = events
443 .into_iter()
444 .filter(|event| {
445 query
446 .since_ms
447 .map(|v| event.timestamp_ms >= v)
448 .unwrap_or(true)
449 })
450 .filter(|event| {
451 query
452 .until_ms
453 .map(|v| event.timestamp_ms <= v)
454 .unwrap_or(true)
455 })
456 .filter(|event| {
457 query
458 .actor
459 .as_ref()
460 .map(|v| &event.actor == v)
461 .unwrap_or(true)
462 })
463 .filter(|event| {
464 query
465 .kind
466 .as_ref()
467 .map(|v| &event.kind == v)
468 .unwrap_or(true)
469 })
470 .filter(|event| {
471 query
472 .outcome
473 .as_ref()
474 .map(|v| &event.outcome == v)
475 .unwrap_or(true)
476 })
477 .take(limit)
478 .collect();
479 CoreAuditSnapshot { events: filtered }
480 }
481
482 pub async fn get_flow(&self, id: String) -> Option<Flow> {
483 let (tx, rx) = oneshot::channel();
484 if let Err(e) = self
485 .flow_store
486 .send(FlowStoreMessage::GetFlow {
487 id: id.clone(),
488 respond_to: tx,
489 })
490 .await
491 {
492 error!("Failed to send GetFlow request: {}", e);
493 if let Some(store) = &self.store {
494 return store
495 .load_flow(&id)
496 .await
497 .ok()
498 .flatten()
499 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
500 }
501 return None;
502 }
503 let flow = rx
504 .await
505 .map_err(|e| {
506 error!("Failed to receive Flow response: {}", e);
507 e
508 })
509 .unwrap_or(None);
510 if let Some(flow) = flow {
511 return Some(redact_flow(flow, &self.current_redaction_policy()));
512 }
513 if let Some(store) = &self.store {
514 return store
515 .load_flow(&id)
516 .await
517 .ok()
518 .flatten()
519 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
520 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
521 }
522 None
523 }
524
525 pub async fn get_rules(&self) -> Vec<Rule> {
526 let (tx, rx) = oneshot::channel();
527 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
528 error!("Failed to send GetRules request: {}", e);
529 return Vec::new();
530 }
531 rx.await
532 .map_err(|e| {
533 error!("Failed to receive Rules response: {}", e);
534 e
535 })
536 .unwrap_or_default()
537 }
538
539 pub async fn set_rules(&self, rules: Vec<Rule>) {
540 let _ = self
541 .set_rules_from(
542 AuditActor::Runtime,
543 "rules.replace",
544 "rule_set".to_string(),
545 json!({}),
546 rules,
547 )
548 .await;
549 }
550
551 pub async fn upsert_rule_from(
552 &self,
553 actor: AuditActor,
554 operation: &str,
555 target: String,
556 details: serde_json::Value,
557 rule: Rule,
558 ) -> Result<(), String> {
559 let rule_id = rule.id.clone();
560 let mut rules = self.get_rules().await;
561 rules.retain(|existing| existing.id != rule_id);
562 rules.push(rule);
563 self.set_rules_from(actor, operation, target, details, rules)
564 .await
565 }
566
567 pub async fn delete_rule_from(
568 &self,
569 actor: AuditActor,
570 operation: &str,
571 target: String,
572 details: serde_json::Value,
573 rule_id: &str,
574 ) -> Result<bool, String> {
575 let mut rules = self.get_rules().await;
576 let before = rules.len();
577 rules.retain(|rule| rule.id != rule_id);
578 if rules.len() == before {
579 return Ok(false);
580 }
581 self.set_rules_from(actor, operation, target, details, rules)
582 .await
583 .map(|_| true)
584 }
585
586 pub async fn create_mock_response_rule_from(
587 &self,
588 actor: AuditActor,
589 target: String,
590 details: serde_json::Value,
591 config: MockResponseRuleConfig,
592 ) -> Result<String, String> {
593 let rule_id = config.rule_id.clone();
594 let rule = build_mock_response_rule(config);
595 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
596 .await
597 .map(|_| rule_id)
598 }
599
600 pub async fn create_intercept_rule_from(
601 &self,
602 actor: AuditActor,
603 target: String,
604 details: serde_json::Value,
605 config: InterceptRuleConfig,
606 ) -> Result<String, String> {
607 let rule_id = config.rule_id.clone();
608 let mut rules = self.get_rules().await;
609 rules.extend(build_intercept_rules(config));
610 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
611 .await
612 .map(|_| rule_id)
613 }
614
615 pub async fn upsert_legacy_intercept_rule_from(
616 &self,
617 actor: AuditActor,
618 target: String,
619 details: serde_json::Value,
620 rule: InterceptRule,
621 ) -> Result<(), String> {
622 let rule_id = rule.id.clone();
623 let mut rules = self.get_rules().await;
624 rules.retain(|existing| {
625 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
626 });
627 rules.extend(rule.to_rules());
628 self.set_rules_from(
629 actor,
630 "rule.intercept_legacy_upsert",
631 target,
632 details,
633 rules,
634 )
635 .await
636 }
637
638 pub async fn set_rules_from(
639 &self,
640 actor: AuditActor,
641 operation: &str,
642 target: String,
643 details: serde_json::Value,
644 rules: Vec<Rule>,
645 ) -> Result<(), String> {
646 let rule_count = rules.len();
647 if let Err(e) = self
648 .rule_store
649 .send(RuleStoreMessage::SetRules(rules))
650 .await
651 {
652 error!("Failed to send SetRules request: {}", e);
653 self.record_audit_event(AuditEvent::new(
654 actor,
655 AuditEventKind::RuleChanged,
656 target,
657 AuditOutcome::Failed,
658 json!({
659 "operation": operation,
660 "rule_count": rule_count,
661 "details": details,
662 "error": e.to_string()
663 }),
664 ));
665 return Err(e.to_string());
666 }
667
668 self.record_audit_event(AuditEvent::new(
669 actor,
670 AuditEventKind::RuleChanged,
671 target,
672 AuditOutcome::Success,
673 json!({
674 "operation": operation,
675 "rule_count": rule_count,
676 "details": details
677 }),
678 ));
679 Ok(())
680 }
681
682 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
683 let mut new_rules = Vec::new();
684 for rule in rules {
685 new_rules.extend(rule.to_rules());
686 }
687 self.set_rules(new_rules).await;
688 }
689
690 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
691 let (tx, rx) = oneshot::channel();
692 if let Err(e) = self
693 .rule_store
694 .send(RuleStoreMessage::GetRuleEngine(tx))
695 .await
696 {
697 error!("Failed to send GetRuleEngine request: {}", e);
698 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
699 }
700 rx.await
701 .map_err(|e| {
702 error!("Failed to receive RuleEngine response: {}", e);
703 e
704 })
705 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
706 }
707
708 pub fn update_policy(&self, policy: ProxyPolicy) {
709 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
710 }
711
712 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
713 let mut policy = self.policy_snapshot();
714 policy.apply_patch(patch);
715 self.update_policy_from(actor, target, policy);
716 }
717
718 pub fn policy_snapshot(&self) -> ProxyPolicy {
719 self.policy_tx.borrow().clone()
720 }
721
722 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
723 let details = json!({
724 "strict_http_semantics": policy.strict_http_semantics,
725 "request_timeout_ms": policy.request_timeout_ms,
726 "max_body_size": policy.max_body_size,
727 "transparent_enabled": policy.transparent_enabled,
728 "redaction_enabled": policy.redaction.enabled,
729 "redact_bodies": policy.redaction.redact_bodies
730 });
731
732 self.policy_tx.send_replace(policy);
733 self.record_audit_event(AuditEvent::new(
734 actor,
735 AuditEventKind::PolicyUpdated,
736 target,
737 AuditOutcome::Success,
738 details,
739 ));
740 }
741
742 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
743 if let Err(e) = self
744 .intercept_broker
745 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
746 .await
747 {
748 error!("Failed to send RegisterIntercept request: {}", e);
749 }
750 }
751
752 pub async fn resolve_intercept(
753 &self,
754 key: String,
755 result: InterceptionResult,
756 ) -> Result<(), String> {
757 let (tx, rx) = oneshot::channel();
758 if let Err(e) = self
759 .intercept_broker
760 .send(InterceptBrokerMessage::ResolveIntercept {
761 key,
762 result,
763 respond_to: tx,
764 })
765 .await
766 {
767 error!("Failed to send ResolveIntercept request: {}", e);
768 return Err(e.to_string());
769 }
770 rx.await.map_err(|_| "Actor dropped".to_string())?
771 }
772
773 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
774 let (tx, rx) = oneshot::channel();
775 if let Err(e) = self
776 .intercept_broker
777 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
778 key,
779 respond_to: tx,
780 })
781 .await
782 {
783 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
784 return None;
785 }
786 rx.await
787 .map_err(|e| {
788 error!("Failed to receive WebSocketMessage response: {}", e);
789 e
790 })
791 .unwrap_or(None)
792 }
793
794 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
795 if let Err(e) = self
796 .intercept_broker
797 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
798 .await
799 {
800 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
801 }
802 }
803
804 pub async fn is_intercept_pending(&self, key: String) -> bool {
805 let (tx, rx) = oneshot::channel();
806 if let Err(e) = self
807 .intercept_broker
808 .send(InterceptBrokerMessage::GetPendingIntercept {
809 key,
810 respond_to: tx,
811 })
812 .await
813 {
814 error!("Failed to send GetPendingIntercept request: {}", e);
815 return false;
816 }
817 rx.await
818 .map_err(|e| {
819 error!("Failed to receive PendingIntercept response: {}", e);
820 e
821 })
822 .unwrap_or(false)
823 }
824
825 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
826 let (tx, rx) = oneshot::channel();
827 if let Err(e) = self
828 .intercept_broker
829 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
830 flow_id,
831 respond_to: tx,
832 })
833 .await
834 {
835 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
836 return false;
837 }
838 rx.await
839 .map_err(|e| {
840 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
841 e
842 })
843 .unwrap_or(false)
844 }
845
846 pub fn upsert_flow(&self, flow: Box<Flow>) {
847 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
848 error!("FlowStore dropped flow: {}", e);
850 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
851 }
852 }
853
854 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
855 if let Err(e) = self
856 .flow_store
857 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
858 {
859 error!("FlowStore dropped WS message: {}", e);
860 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
861 }
862 }
863
864 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
865 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
866 flow_id,
867 body,
868 direction,
869 }) {
870 error!("FlowStore dropped HTTP body: {}", e);
871 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
872 }
873 }
874
875 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
877 self.flow_broadcast_tx.subscribe()
878 }
879
880 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
881 self.audit_broadcast_tx.subscribe()
882 }
883
884 fn current_redaction_policy(&self) -> RedactionPolicy {
885 self.policy_tx.borrow().redaction.clone()
886 }
887
888 pub fn record_flow_events_lagged(&self, skipped: u64) {
889 self.flow_events_lagged_total
890 .fetch_add(skipped as usize, Ordering::Relaxed);
891 }
892
893 pub fn record_audit_events_lagged(&self, skipped: u64) {
894 self.audit_events_lagged_total
895 .fetch_add(skipped as usize, Ordering::Relaxed);
896 }
897
898 pub fn lifecycle(&self) -> RuntimeLifecycle {
899 self.lifecycle_tx.borrow().clone()
900 }
901
902 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
903 self.lifecycle_tx.subscribe()
904 }
905
906 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
907 let current = self.lifecycle();
908 if current.is_active() {
909 return Err(format!(
910 "Proxy is already {} on port {}",
911 current.phase.as_str(),
912 current.port.unwrap_or(port)
913 ));
914 }
915
916 let mut guard = self
917 .shutdown_tx
918 .lock()
919 .map_err(|_| "shutdown state poisoned".to_string())?;
920 *guard = Some(shutdown_tx);
921 drop(guard);
922
923 self.update_lifecycle(RuntimeLifecycle {
924 phase: RuntimeLifecyclePhase::Starting,
925 port: Some(port),
926 started_at_ms: None,
927 last_error: None,
928 });
929 Ok(())
930 }
931
932 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
933 let mut guard = self
934 .shutdown_tx
935 .lock()
936 .map_err(|_| "shutdown state poisoned".to_string())?;
937 let Some(tx) = guard.take() else {
938 return Ok(ProxyStopResult::NotRunning);
939 };
940 drop(guard);
941
942 let current = self.lifecycle();
943 self.update_lifecycle(RuntimeLifecycle {
944 phase: RuntimeLifecyclePhase::Stopping,
945 port: current.port,
946 started_at_ms: current.started_at_ms,
947 last_error: current.last_error,
948 });
949 let _ = tx.send(());
950 Ok(ProxyStopResult::Stopping)
951 }
952
953 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
954 self.audit_history
955 .lock()
956 .map(|events| events.iter().cloned().collect())
957 .unwrap_or_default()
958 }
959
960 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
962 let redaction = self.current_redaction_policy();
963 if let Some(store) = &self.store {
964 return store
965 .query_flow_summaries(&query)
966 .await
967 .unwrap_or_default()
968 .into_iter()
969 .map(|summary| redact_flow_summary(summary, &redaction))
970 .collect();
971 }
972 let (tx, rx) = oneshot::channel();
973 if let Err(e) = self
974 .flow_store
975 .send(FlowStoreMessage::SearchFlows {
976 query,
977 respond_to: tx,
978 })
979 .await
980 {
981 error!("Failed to send SearchFlows request: {}", e);
982 return Vec::new();
983 }
984 rx.await
985 .unwrap_or_default()
986 .into_iter()
987 .map(|summary| redact_flow_summary(summary, &redaction))
988 .collect()
989 }
990
991 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
992 let redaction = self.current_redaction_policy();
993 redact_flow_update(update, &redaction)
994 }
995
996 pub async fn resolve_intercept_with_modifications(
1006 &self,
1007 key: String,
1008 action: &str,
1009 mods: Option<relay_core_api::modification::FlowModification>,
1010 ) -> Result<(), String> {
1011 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1012 .await
1013 }
1014
1015 pub async fn resolve_intercept_with_modifications_from(
1016 &self,
1017 actor: AuditActor,
1018 key: String,
1019 action: &str,
1020 mods: Option<relay_core_api::modification::FlowModification>,
1021 ) -> Result<(), String> {
1022 let modified_fields = modification_field_names(mods.as_ref());
1023 let result = match action {
1024 "drop" => InterceptionResult::Drop,
1025 _ => match mods {
1026 None => InterceptionResult::Continue,
1027 Some(m) => {
1028 let parts: Vec<&str> = key.splitn(4, ':').collect();
1029 match parts.as_slice() {
1030 [flow_id, phase] => {
1031 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1032 modification::apply_flow_modification(&flow, phase, m)
1033 } else {
1034 InterceptionResult::Continue
1035 }
1036 }
1037 [_, "ws_msg", _] => {
1040 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1041 modification::apply_ws_modification(&msg, m)
1042 } else {
1043 InterceptionResult::Continue
1044 }
1045 }
1046 _ => InterceptionResult::Continue,
1047 }
1048 }
1049 },
1050 };
1051 let outcome = self.resolve_intercept(key.clone(), result).await;
1052 let audit_outcome = if outcome.is_ok() {
1053 AuditOutcome::Success
1054 } else {
1055 AuditOutcome::Failed
1056 };
1057 let error_message = outcome.as_ref().err().cloned();
1058
1059 self.record_audit_event(AuditEvent::new(
1060 actor,
1061 AuditEventKind::InterceptResolved,
1062 key,
1063 audit_outcome,
1064 json!({
1065 "action": action,
1066 "has_modifications": !modified_fields.is_empty(),
1067 "modified_fields": modified_fields,
1068 "error": error_message
1069 }),
1070 ));
1071 outcome
1072 }
1073
1074 #[cfg(feature = "script")]
1075 pub async fn load_script_from(
1076 &self,
1077 actor: AuditActor,
1078 target: String,
1079 script: &str,
1080 ) -> Result<(), String> {
1081 let result = self
1082 .script_interceptor
1083 .load_script(script)
1084 .await
1085 .map_err(|e| e.to_string());
1086
1087 let outcome = if result.is_ok() {
1088 AuditOutcome::Success
1089 } else {
1090 AuditOutcome::Failed
1091 };
1092 let error_message = result.as_ref().err().cloned();
1093
1094 self.record_audit_event(AuditEvent::new(
1095 actor,
1096 AuditEventKind::ScriptReloaded,
1097 target,
1098 outcome,
1099 json!({
1100 "script_bytes": script.len(),
1101 "error": error_message
1102 }),
1103 ));
1104
1105 result
1106 }
1107
1108 fn record_audit_event(&self, event: AuditEvent) {
1109 const AUDIT_HISTORY_LIMIT: usize = 200;
1110 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1111 if event.outcome == AuditOutcome::Failed {
1112 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1113 }
1114
1115 let details = event.details.to_string();
1116 tracing::info!(
1117 target: "relay_core_audit",
1118 event_id = %event.id,
1119 actor = %event.actor.as_str(),
1120 kind = %event.kind.as_str(),
1121 target = %event.target,
1122 outcome = %event.outcome.as_str(),
1123 details = %details
1124 );
1125
1126 if let Ok(mut history) = self.audit_history.lock() {
1127 if history.len() >= AUDIT_HISTORY_LIMIT {
1128 history.pop_front();
1129 }
1130 history.push_back(event.clone());
1131 }
1132
1133 if let Some(store) = self.store.clone() {
1134 let event_json = serde_json::to_value(&event).unwrap_or_default();
1135 let event_id = event.id.clone();
1136 let timestamp_ms = event.timestamp_ms;
1137 let actor = event.actor.as_str().to_string();
1138 let kind = event.kind.as_str().to_string();
1139 let target = event.target.clone();
1140 let outcome = event.outcome.as_str().to_string();
1141 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1142 handle.spawn(async move {
1143 if let Err(e) = store
1144 .save_audit_event(AuditEventRecord {
1145 id: &event_id,
1146 timestamp_ms,
1147 actor: &actor,
1148 kind: &kind,
1149 target: &target,
1150 outcome: &outcome,
1151 content: &event_json,
1152 })
1153 .await
1154 {
1155 tracing::error!("Failed to persist audit event: {}", e);
1156 }
1157 });
1158 }
1159 }
1160
1161 let _ = self.audit_broadcast_tx.send(event);
1162 }
1163
1164 fn transition_to_running(&self, port: u16) {
1165 self.update_lifecycle(RuntimeLifecycle {
1166 phase: RuntimeLifecyclePhase::Running,
1167 port: Some(port),
1168 started_at_ms: Some(now_unix_ms()),
1169 last_error: None,
1170 });
1171 }
1172
1173 fn transition_to_stopped(&self) {
1174 if let Ok(mut guard) = self.shutdown_tx.lock() {
1175 *guard = None;
1176 }
1177
1178 self.update_lifecycle(RuntimeLifecycle {
1179 phase: RuntimeLifecyclePhase::Stopped,
1180 port: None,
1181 started_at_ms: None,
1182 last_error: None,
1183 });
1184 }
1185
1186 fn transition_to_failed(&self, port: u16, error: String) {
1187 if let Ok(mut guard) = self.shutdown_tx.lock() {
1188 *guard = None;
1189 }
1190
1191 self.update_lifecycle(RuntimeLifecycle {
1192 phase: RuntimeLifecyclePhase::Failed,
1193 port: Some(port),
1194 started_at_ms: None,
1195 last_error: Some(error),
1196 });
1197 }
1198
1199 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1200 tracing::info!(
1201 target: "relay_core_lifecycle",
1202 phase = %lifecycle.phase.as_str(),
1203 port = ?lifecycle.port,
1204 started_at_ms = ?lifecycle.started_at_ms,
1205 last_error = ?lifecycle.last_error
1206 );
1207 self.lifecycle_tx.send_replace(lifecycle);
1208 }
1209
1210 pub fn spawn_proxy(
1211 self: &Arc<Self>,
1212 config: ProxyConfig,
1213 sink: mpsc::Sender<FlowUpdate>,
1214 extra_interceptor: Option<Arc<dyn Interceptor>>,
1215 ) -> Result<ProxySpawnResult, String> {
1216 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1217 match self.prepare_start(config.port, shutdown_tx) {
1218 Ok(()) => {}
1219 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1220 Err(error) => return Err(error),
1221 }
1222 let state = self.clone();
1223 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1224 if let Err(error) = state
1225 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1226 .await
1227 {
1228 error!("Proxy failed: {}", error);
1229 }
1230 })))
1231 }
1232
1233 pub async fn start_proxy(
1234 self: &Arc<Self>,
1235 config: ProxyConfig,
1236 sink: mpsc::Sender<FlowUpdate>,
1237 extra_interceptor: Option<Arc<dyn Interceptor>>,
1238 ) -> Result<(), String> {
1239 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1240 self.prepare_start(config.port, shutdown_tx)?;
1241 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1242 .await
1243 }
1244
1245 async fn run_proxy(
1246 self: &Arc<Self>,
1247 config: ProxyConfig,
1248 sink: mpsc::Sender<FlowUpdate>,
1249 extra_interceptor: Option<Arc<dyn Interceptor>>,
1250 shutdown_rx: oneshot::Receiver<()>,
1251 ) -> Result<(), String> {
1252 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1253 let state = self.clone();
1254
1255 if let Some(parent) = config.ca_cert_path.parent()
1256 && !parent.exists()
1257 {
1258 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1259 }
1260
1261 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1262 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1263 let ca = Arc::new(ca);
1264
1265 #[cfg(feature = "script")]
1266 let script_interceptor = self.script_interceptor.clone();
1267
1268 #[cfg(feature = "script")]
1269 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1270
1271 #[cfg(not(feature = "script"))]
1272 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1273
1274 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1275 self.clone(),
1276 )));
1277
1278 if let Some(interceptor) = extra_interceptor {
1279 interceptors.push(interceptor);
1280 }
1281
1282 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1283 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1284
1285 tokio::spawn(async move {
1286 while let Some(update) = proxy_rx.recv().await {
1287 match update.clone() {
1288 FlowUpdate::Full(flow) => {
1289 state.upsert_flow(flow);
1290 }
1291 FlowUpdate::WebSocketMessage { flow_id, message } => {
1292 state.append_ws_message(flow_id, message);
1293 }
1294 FlowUpdate::HttpBody {
1295 flow_id,
1296 direction,
1297 body,
1298 } => {
1299 state.update_http_body(flow_id, body, direction);
1300 }
1301 }
1302
1303 let _ = state.flow_broadcast_tx.send(update.clone());
1304
1305 if sink.try_send(update).is_err() {
1306 relay_core_lib::metrics::inc_flows_dropped();
1307 }
1308 }
1309 });
1310
1311 if let Some(udp_port) = config.udp_tproxy_port {
1312 let udp_proxy_tx = proxy_tx.clone();
1313 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1314 tokio::spawn(async move {
1315 match tokio::net::UdpSocket::bind(udp_addr).await {
1316 Ok(socket) => {
1317 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1318 if let Err(e) = proxy.run(udp_proxy_tx).await {
1319 error!("UDP TPROXY failed: {}", e);
1320 }
1321 }
1322 Err(e) => {
1323 error!("Failed to bind UDP TPROXY socket: {}", e);
1324 }
1325 }
1326 });
1327 }
1328
1329 let listener = match TcpListener::bind(addr).await {
1330 Ok(listener) => listener,
1331 Err(e) => {
1332 if let Ok(mut guard) = self.shutdown_tx.lock() {
1333 *guard = None;
1334 }
1335 let message = format!("Failed to bind to address {}: {}", addr, e);
1336 self.transition_to_failed(config.port, message.clone());
1337 return Err(message);
1338 }
1339 };
1340
1341 self.transition_to_running(config.port);
1342 let shutdown_rx = Some(shutdown_rx);
1343
1344 if config.transparent {
1345 let policy = ProxyPolicy {
1346 transparent_enabled: true,
1347 ..Default::default()
1348 };
1349 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1350 let policy_rx = self.policy_tx.subscribe();
1351
1352 let provider: Arc<dyn OriginalDstProvider> = {
1353 let mut addrs = BTreeSet::new();
1354 if let Ok(local) = listener.local_addr() {
1355 addrs.insert(local);
1356 }
1357
1358 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1359 {
1360 Arc::new(LinuxOriginalDstProvider::new(addrs))
1361 }
1362 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1363 {
1364 match MacOsOriginalDstProvider::new(addrs.clone()) {
1365 Ok(provider) => Arc::new(provider),
1366 Err(e) => {
1367 error!("Failed to initialize macOS PF provider: {}", e);
1368 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1369 }
1370 }
1371 }
1372 #[cfg(target_os = "windows")]
1373 {
1374 let filter =
1375 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1376 .to_string();
1377 let port = config.port;
1378 tokio::spawn(async move {
1379 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1380 .await;
1381 });
1382
1383 Arc::new(WindowsOriginalDstProvider::new(addrs))
1384 }
1385 #[cfg(not(any(
1386 all(target_os = "linux", feature = "transparent-linux"),
1387 all(target_os = "macos", feature = "transparent-macos"),
1388 target_os = "windows"
1389 )))]
1390 {
1391 Arc::new(NoOpOriginalDstProvider::new(addrs))
1392 }
1393 };
1394
1395 let source = TransparentTcpCaptureSource::new(listener, provider);
1396 let result = relay_core_lib::start_proxy(
1397 source,
1398 proxy_tx,
1399 interceptor,
1400 ca,
1401 policy_rx,
1402 None,
1403 shutdown_rx,
1404 )
1405 .await
1406 .map_err(|e| e.to_string());
1407 if let Err(error) = &result {
1408 self.transition_to_failed(config.port, error.clone());
1409 } else {
1410 self.transition_to_stopped();
1411 }
1412 result
1413 } else {
1414 let source = TcpCaptureSource::new(listener);
1415 let policy = ProxyPolicy::default();
1416 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1417 let policy_rx = self.policy_tx.subscribe();
1418
1419 let result = relay_core_lib::start_proxy(
1420 source,
1421 proxy_tx,
1422 interceptor,
1423 ca,
1424 policy_rx,
1425 None,
1426 shutdown_rx,
1427 )
1428 .await
1429 .map_err(|e| e.to_string());
1430 if let Err(error) = &result {
1431 self.transition_to_failed(config.port, error.clone());
1432 } else {
1433 self.transition_to_stopped();
1434 }
1435 result
1436 }
1437 }
1438}
1439
1440fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1441 match update {
1442 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1443 FlowUpdate::WebSocketMessage {
1444 flow_id,
1445 mut message,
1446 } => {
1447 message.content = redact_body(message.content, redaction);
1448 FlowUpdate::WebSocketMessage { flow_id, message }
1449 }
1450 FlowUpdate::HttpBody {
1451 flow_id,
1452 direction,
1453 body,
1454 } => FlowUpdate::HttpBody {
1455 flow_id,
1456 direction,
1457 body: redact_body(body, redaction),
1458 },
1459 }
1460}
1461
1462fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1463 if !redaction.enabled {
1464 return flow;
1465 }
1466 match &mut flow.layer {
1467 Layer::Http(http) => {
1468 redact_http_request(&mut http.request, redaction);
1469 if let Some(response) = &mut http.response {
1470 redact_headers(&mut response.headers, redaction);
1471 response.body = response
1472 .body
1473 .take()
1474 .map(|body| redact_body(body, redaction));
1475 }
1476 }
1477 Layer::WebSocket(ws) => {
1478 redact_http_request(&mut ws.handshake_request, redaction);
1479 redact_headers(&mut ws.handshake_response.headers, redaction);
1480 ws.handshake_response.body = ws
1481 .handshake_response
1482 .body
1483 .take()
1484 .map(|body| redact_body(body, redaction));
1485 for message in &mut ws.messages {
1486 message.content = redact_body(message.content.clone(), redaction);
1487 }
1488 }
1489 _ => {}
1490 }
1491 flow
1492}
1493
1494fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1495 if !redaction.enabled {
1496 return summary;
1497 }
1498 summary.url = redact_url_string(&summary.url, redaction);
1499 summary
1500}
1501
1502fn redact_http_request(
1503 request: &mut relay_core_api::flow::HttpRequest,
1504 redaction: &RedactionPolicy,
1505) {
1506 redact_headers(&mut request.headers, redaction);
1507 redact_query_pairs(&mut request.query, redaction);
1508 request.url = redact_url(&request.url, redaction);
1509 request.body = request.body.take().map(|body| redact_body(body, redaction));
1510}
1511
1512fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1513 if !redaction.enabled {
1514 return;
1515 }
1516 let sensitive = redaction_set(&redaction.sensitive_header_names);
1517 for (name, value) in headers.iter_mut() {
1518 if sensitive.contains(&name.to_ascii_lowercase()) {
1519 *value = "[REDACTED]".to_string();
1520 }
1521 }
1522}
1523
1524fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1525 if !redaction.enabled {
1526 return;
1527 }
1528 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1529 for (name, value) in query.iter_mut() {
1530 if sensitive.contains(&name.to_ascii_lowercase()) {
1531 *value = "[REDACTED]".to_string();
1532 }
1533 }
1534}
1535
1536fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1537 if !redaction.enabled {
1538 return url.clone();
1539 }
1540 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1541 let pairs: Vec<(String, String)> = url
1542 .query_pairs()
1543 .map(|(k, v)| {
1544 let key = k.to_string();
1545 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1546 "[REDACTED]".to_string()
1547 } else {
1548 v.to_string()
1549 };
1550 (key, value)
1551 })
1552 .collect();
1553 let mut next = url.clone();
1554 if pairs.is_empty() {
1555 return next;
1556 }
1557 next.query_pairs_mut().clear();
1558 for (k, v) in pairs {
1559 next.query_pairs_mut().append_pair(&k, &v);
1560 }
1561 next
1562}
1563
1564fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1565 match url::Url::parse(input) {
1566 Ok(url) => redact_url(&url, redaction).to_string(),
1567 Err(_) => input.to_string(),
1568 }
1569}
1570
1571fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1572 if redaction.enabled && redaction.redact_bodies {
1573 body.content = "[REDACTED]".to_string();
1574 }
1575 body
1576}
1577
1578fn redaction_set(values: &[String]) -> HashSet<String> {
1579 values
1580 .iter()
1581 .map(|value| value.to_ascii_lowercase())
1582 .collect()
1583}
1584
1585#[derive(Clone)]
1586pub struct ProxyConfig {
1587 pub port: u16,
1588 pub ca_cert_path: std::path::PathBuf,
1589 pub ca_key_path: std::path::PathBuf,
1590 pub transparent: bool,
1591 pub udp_tproxy_port: Option<u16>,
1592}
1593
1594impl ProxyConfig {
1595 pub fn new(
1596 port: u16,
1597 ca_cert_path: std::path::PathBuf,
1598 ca_key_path: std::path::PathBuf,
1599 ) -> Self {
1600 Self {
1601 port,
1602 ca_cert_path,
1603 ca_key_path,
1604 transparent: false,
1605 udp_tproxy_port: None,
1606 }
1607 }
1608
1609 pub fn from_app_data_dir(
1610 app_data_dir: impl Into<std::path::PathBuf>,
1611 port: u16,
1612 ) -> Result<Self, String> {
1613 let app_data_dir = app_data_dir.into();
1614 if !app_data_dir.exists() {
1615 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1616 }
1617
1618 Ok(Self::new(
1619 port,
1620 app_data_dir.join("ca_cert.pem"),
1621 app_data_dir.join("ca_key.pem"),
1622 ))
1623 }
1624
1625 pub fn with_transparent(mut self, transparent: bool) -> Self {
1626 self.transparent = transparent;
1627 self
1628 }
1629
1630 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1631 self.udp_tproxy_port = udp_tproxy_port;
1632 self
1633 }
1634}
1635
1636fn now_unix_ms() -> u64 {
1637 SystemTime::now()
1638 .duration_since(UNIX_EPOCH)
1639 .unwrap_or_default()
1640 .as_millis() as u64
1641}
1642
1643fn modification_field_names(
1644 mods: Option<&relay_core_api::modification::FlowModification>,
1645) -> Vec<&'static str> {
1646 let Some(mods) = mods else {
1647 return Vec::new();
1648 };
1649
1650 let mut fields = Vec::new();
1651 if mods.method.is_some() {
1652 fields.push("method");
1653 }
1654 if mods.url.is_some() {
1655 fields.push("url");
1656 }
1657 if mods.request_headers.is_some() {
1658 fields.push("request_headers");
1659 }
1660 if mods.request_body.is_some() {
1661 fields.push("request_body");
1662 }
1663 if mods.status_code.is_some() {
1664 fields.push("status_code");
1665 }
1666 if mods.response_headers.is_some() {
1667 fields.push("response_headers");
1668 }
1669 if mods.response_body.is_some() {
1670 fields.push("response_body");
1671 }
1672 if mods.message_content.is_some() {
1673 fields.push("message_content");
1674 }
1675 fields
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680 use super::*;
1681 use chrono::Utc;
1682 use relay_core_api::flow::{
1683 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1684 ResponseTiming, TransportProtocol,
1685 };
1686 use std::sync::atomic::{AtomicU64, Ordering};
1687 use tokio::time::{Duration, sleep};
1688 use url::Url;
1689 use uuid::Uuid;
1690
1691 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1692
1693 fn sqlite_url() -> String {
1694 let nanos = SystemTime::now()
1695 .duration_since(UNIX_EPOCH)
1696 .expect("clock drift")
1697 .as_nanos();
1698 let pid = std::process::id();
1699 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1700 let db_dir = std::env::current_dir()
1701 .expect("cwd")
1702 .join("target")
1703 .join("test-dbs");
1704 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1705 let db_path = db_dir.join(format!(
1706 "relay-core-runtime-test-{}-{}-{}.db",
1707 pid, nanos, seq
1708 ));
1709 format!("sqlite://{}?mode=rwc", db_path.display())
1710 }
1711
1712 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1713 let start_time =
1714 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1715 let request_url =
1716 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1717 Flow {
1718 id: Uuid::new_v4(),
1719 start_time,
1720 end_time: Some(start_time),
1721 network: NetworkInfo {
1722 client_ip: "127.0.0.1".to_string(),
1723 client_port: 12000,
1724 server_ip: "127.0.0.1".to_string(),
1725 server_port: 8080,
1726 protocol: TransportProtocol::TCP,
1727 tls: false,
1728 tls_version: None,
1729 sni: None,
1730 },
1731 layer: Layer::Http(HttpLayer {
1732 request: HttpRequest {
1733 method: method.to_string(),
1734 url: request_url,
1735 version: "HTTP/1.1".to_string(),
1736 headers: vec![],
1737 cookies: vec![],
1738 query: vec![],
1739 body: None,
1740 },
1741 response: Some(HttpResponse {
1742 status,
1743 status_text: "OK".to_string(),
1744 version: "HTTP/1.1".to_string(),
1745 headers: vec![],
1746 cookies: vec![],
1747 body: None,
1748 timing: ResponseTiming {
1749 time_to_first_byte: None,
1750 time_to_last_byte: None,
1751 connect_time_ms: None,
1752 ssl_time_ms: None,
1753 },
1754 }),
1755 error: None,
1756 }),
1757 tags: vec![],
1758 meta: std::collections::HashMap::new(),
1759 }
1760 }
1761
1762 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1763 let start_time =
1764 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1765 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1766 .expect("url should parse");
1767 Flow {
1768 id: Uuid::new_v4(),
1769 start_time,
1770 end_time: Some(start_time),
1771 network: NetworkInfo {
1772 client_ip: "127.0.0.1".to_string(),
1773 client_port: 12000,
1774 server_ip: "127.0.0.1".to_string(),
1775 server_port: 8080,
1776 protocol: TransportProtocol::TCP,
1777 tls: false,
1778 tls_version: None,
1779 sni: None,
1780 },
1781 layer: Layer::Http(HttpLayer {
1782 request: HttpRequest {
1783 method: "GET".to_string(),
1784 url: request_url,
1785 version: "HTTP/1.1".to_string(),
1786 headers: vec![
1787 (
1788 "Authorization".to_string(),
1789 "Bearer secret-token".to_string(),
1790 ),
1791 ("X-Normal".to_string(), "visible".to_string()),
1792 ],
1793 cookies: vec![],
1794 query: vec![
1795 ("token".to_string(), "abc123".to_string()),
1796 ("ok".to_string(), "1".to_string()),
1797 ],
1798 body: Some(BodyData {
1799 encoding: "utf-8".to_string(),
1800 content: "secret request body".to_string(),
1801 size: 19,
1802 }),
1803 },
1804 response: Some(HttpResponse {
1805 status: 200,
1806 status_text: "OK".to_string(),
1807 version: "HTTP/1.1".to_string(),
1808 headers: vec![
1809 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1810 ("X-Response".to_string(), "visible".to_string()),
1811 ],
1812 cookies: vec![],
1813 body: Some(BodyData {
1814 encoding: "utf-8".to_string(),
1815 content: "secret response body".to_string(),
1816 size: 20,
1817 }),
1818 timing: ResponseTiming {
1819 time_to_first_byte: None,
1820 time_to_last_byte: None,
1821 connect_time_ms: None,
1822 ssl_time_ms: None,
1823 },
1824 }),
1825 error: None,
1826 }),
1827 tags: vec![],
1828 meta: std::collections::HashMap::new(),
1829 }
1830 }
1831
1832 #[tokio::test]
1833 async fn set_rules_from_records_audit_event() {
1834 let state = CoreState::new(None).await;
1835
1836 state
1837 .set_rules_from(
1838 AuditActor::Http,
1839 "rule.upsert",
1840 "rule-1".to_string(),
1841 json!({ "route": "/api/v1/rules" }),
1842 Vec::new(),
1843 )
1844 .await
1845 .expect("set rules should succeed");
1846
1847 let events = state.recent_audit_events();
1848 let event = events.last().expect("audit event should exist");
1849 assert_eq!(event.actor, AuditActor::Http);
1850 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1851 assert_eq!(event.outcome, AuditOutcome::Success);
1852 assert_eq!(event.target, "rule-1");
1853 assert_eq!(event.details["operation"], "rule.upsert");
1854 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1855 }
1856
1857 #[tokio::test]
1858 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1859 let state = CoreState::new(None).await;
1860
1861 state
1862 .upsert_rule_from(
1863 AuditActor::Probe,
1864 "rule.upsert",
1865 "rule-1".to_string(),
1866 json!({ "tool": "set_rule" }),
1867 Rule {
1868 id: "rule-1".to_string(),
1869 name: "first".to_string(),
1870 active: true,
1871 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1872 priority: 1,
1873 termination: relay_core_lib::rule::RuleTermination::Continue,
1874 filter: relay_core_lib::rule::Filter::Url(
1875 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1876 ),
1877 actions: vec![],
1878 constraints: None,
1879 },
1880 )
1881 .await
1882 .expect("initial upsert should succeed");
1883
1884 state
1885 .upsert_rule_from(
1886 AuditActor::Probe,
1887 "rule.upsert",
1888 "rule-1".to_string(),
1889 json!({ "tool": "set_rule" }),
1890 Rule {
1891 id: "rule-1".to_string(),
1892 name: "second".to_string(),
1893 active: true,
1894 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1895 priority: 2,
1896 termination: relay_core_lib::rule::RuleTermination::Continue,
1897 filter: relay_core_lib::rule::Filter::Url(
1898 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1899 ),
1900 actions: vec![],
1901 constraints: None,
1902 },
1903 )
1904 .await
1905 .expect("replacement upsert should succeed");
1906
1907 let rules = state.get_rules().await;
1908 assert_eq!(rules.len(), 1);
1909 assert_eq!(rules[0].name, "second");
1910 let event = state
1911 .recent_audit_events()
1912 .last()
1913 .cloned()
1914 .expect("audit event");
1915 assert_eq!(event.details["operation"], "rule.upsert");
1916 assert_eq!(event.details["details"]["tool"], "set_rule");
1917 }
1918
1919 #[tokio::test]
1920 async fn delete_rule_from_returns_false_when_rule_missing() {
1921 let state = CoreState::new(None).await;
1922
1923 let deleted = state
1924 .delete_rule_from(
1925 AuditActor::Http,
1926 "rule.delete",
1927 "missing".to_string(),
1928 json!({ "route": "/api/v1/rules/{id}" }),
1929 "missing",
1930 )
1931 .await
1932 .expect("delete should not fail");
1933
1934 assert!(!deleted);
1935 assert!(state.recent_audit_events().is_empty());
1936 }
1937
1938 #[tokio::test]
1939 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1940 let state = CoreState::new(None).await;
1941
1942 let rule_id = state
1943 .create_mock_response_rule_from(
1944 AuditActor::Http,
1945 "api-mock-1".to_string(),
1946 json!({ "route": "/api/v1/mock", "status": 201 }),
1947 MockResponseRuleConfig {
1948 rule_id: "api-mock-1".to_string(),
1949 url_pattern: "example.com".to_string(),
1950 name: "api-mock:example.com".to_string(),
1951 status: 201,
1952 content_type: "application/json".to_string(),
1953 body: "{\"ok\":true}".to_string(),
1954 },
1955 )
1956 .await
1957 .expect("mock rule should be created");
1958
1959 assert_eq!(rule_id, "api-mock-1");
1960 let rules = state.get_rules().await;
1961 assert_eq!(rules.len(), 1);
1962 assert_eq!(rules[0].id, "api-mock-1");
1963 let event = state
1964 .recent_audit_events()
1965 .last()
1966 .cloned()
1967 .expect("audit event");
1968 assert_eq!(event.details["operation"], "rule.mock_create");
1969 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1970 }
1971
1972 #[tokio::test]
1973 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1974 let state = CoreState::new(None).await;
1975
1976 let rule_id = state
1977 .create_intercept_rule_from(
1978 AuditActor::Http,
1979 "intercept-1".to_string(),
1980 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1981 InterceptRuleConfig {
1982 rule_id: "intercept-1".to_string(),
1983 active: true,
1984 url_pattern: "example.com".to_string(),
1985 method: None,
1986 phase: "request".to_string(),
1987 name: "api-intercept:example.com".to_string(),
1988 priority: 100,
1989 termination: relay_core_lib::rule::RuleTermination::Stop,
1990 },
1991 )
1992 .await
1993 .expect("intercept rule should be created");
1994
1995 assert_eq!(rule_id, "intercept-1");
1996 let rules = state.get_rules().await;
1997 assert_eq!(rules.len(), 1);
1998 assert_eq!(rules[0].id, "intercept-1");
1999 assert_eq!(rules[0].name, "api-intercept:example.com");
2000 let event = state
2001 .recent_audit_events()
2002 .last()
2003 .cloned()
2004 .expect("audit event");
2005 assert_eq!(event.details["operation"], "rule.intercept_create");
2006 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2007 }
2008
2009 #[tokio::test]
2010 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2011 let state = CoreState::new(None).await;
2012
2013 state
2014 .upsert_legacy_intercept_rule_from(
2015 AuditActor::Tauri,
2016 "legacy-1".to_string(),
2017 json!({ "command": "set_intercept_rule" }),
2018 InterceptRule {
2019 id: "legacy-1".to_string(),
2020 active: true,
2021 url_pattern: "example.com".to_string(),
2022 method: None,
2023 phase: "both".to_string(),
2024 },
2025 )
2026 .await
2027 .expect("initial family upsert should succeed");
2028 assert_eq!(state.get_rules().await.len(), 2);
2029
2030 state
2031 .upsert_legacy_intercept_rule_from(
2032 AuditActor::Tauri,
2033 "legacy-1".to_string(),
2034 json!({ "command": "set_intercept_rule" }),
2035 InterceptRule {
2036 id: "legacy-1".to_string(),
2037 active: true,
2038 url_pattern: "example.org".to_string(),
2039 method: Some("POST".to_string()),
2040 phase: "request".to_string(),
2041 },
2042 )
2043 .await
2044 .expect("replacement family upsert should succeed");
2045
2046 let rules = state.get_rules().await;
2047 assert_eq!(rules.len(), 1);
2048 assert_eq!(rules[0].id, "legacy-1");
2049 let event = state
2050 .recent_audit_events()
2051 .last()
2052 .cloned()
2053 .expect("audit event");
2054 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2055 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2056 }
2057
2058 #[tokio::test]
2059 async fn resolve_intercept_failure_records_failed_audit_event() {
2060 let state = CoreState::new(None).await;
2061
2062 let result = state
2063 .resolve_intercept_with_modifications_from(
2064 AuditActor::Probe,
2065 "missing-flow:request".to_string(),
2066 "drop",
2067 None,
2068 )
2069 .await;
2070
2071 assert!(result.is_err());
2072 let events = state.recent_audit_events();
2073 let event = events.last().expect("audit event should exist");
2074 assert_eq!(event.actor, AuditActor::Probe);
2075 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2076 assert_eq!(event.outcome, AuditOutcome::Failed);
2077 assert_eq!(event.details["action"], "drop");
2078 assert!(
2079 event.details["error"]
2080 .as_str()
2081 .unwrap_or_default()
2082 .contains("Interception not found")
2083 );
2084 }
2085
2086 #[tokio::test]
2087 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2088 let state = CoreState::new(None).await;
2089 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2090
2091 state
2092 .prepare_start(8080, shutdown_tx)
2093 .expect("prepare start should succeed");
2094 let lifecycle = state.lifecycle();
2095 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2096 assert_eq!(lifecycle.port, Some(8080));
2097 assert!(lifecycle.started_at_ms.is_none());
2098 assert!(lifecycle.last_error.is_none());
2099
2100 assert_eq!(
2101 state.stop_proxy().expect("stop should succeed"),
2102 ProxyStopResult::Stopping
2103 );
2104 let lifecycle = state.lifecycle();
2105 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2106 assert_eq!(lifecycle.port, Some(8080));
2107 assert!(shutdown_rx.await.is_ok());
2108 }
2109
2110 #[tokio::test]
2111 async fn status_snapshot_derives_runtime_facing_fields() {
2112 let state = CoreState::new(None).await;
2113 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2114 state
2115 .prepare_start(8080, shutdown_tx)
2116 .expect("prepare start should succeed");
2117
2118 let status = state.status_snapshot();
2119 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2120 assert!(status.running);
2121 assert_eq!(status.port, Some(8080));
2122 assert!(status.uptime.is_none());
2123 assert!(status.last_error.is_none());
2124 }
2125
2126 #[tokio::test]
2127 async fn status_report_combines_status_and_metrics() {
2128 let state = CoreState::new(None).await;
2129 let report = state.status_report().await;
2130
2131 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2132 assert!(!report.status.running);
2133 assert_eq!(report.metrics.intercepts_pending, 0);
2134 assert_eq!(report.metrics.ws_pending_messages, 0);
2135 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2136 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2137 assert_eq!(report.metrics.audit_events_total, 0);
2138 assert_eq!(report.metrics.audit_events_failed, 0);
2139 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2140 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2141 }
2142
2143 #[test]
2144 fn proxy_config_new_and_transport_setters_preserve_values() {
2145 let config = ProxyConfig::new(
2146 8080,
2147 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2148 std::path::PathBuf::from("/tmp/ca_key.pem"),
2149 )
2150 .with_transparent(true)
2151 .with_udp_tproxy_port(Some(15000));
2152
2153 assert_eq!(config.port, 8080);
2154 assert_eq!(
2155 config.ca_cert_path,
2156 std::path::PathBuf::from("/tmp/ca_cert.pem")
2157 );
2158 assert_eq!(
2159 config.ca_key_path,
2160 std::path::PathBuf::from("/tmp/ca_key.pem")
2161 );
2162 assert!(config.transparent);
2163 assert_eq!(config.udp_tproxy_port, Some(15000));
2164 }
2165
2166 #[test]
2167 fn proxy_config_from_app_data_dir_creates_default_paths() {
2168 let unique = SystemTime::now()
2169 .duration_since(UNIX_EPOCH)
2170 .expect("clock drift")
2171 .as_nanos();
2172 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2173
2174 let config =
2175 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2176
2177 assert!(dir.exists());
2178 assert_eq!(config.port, 8899);
2179 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2180 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2181 assert!(!config.transparent);
2182 assert!(config.udp_tproxy_port.is_none());
2183 }
2184
2185 #[tokio::test]
2186 async fn intercept_snapshot_maps_pending_counts() {
2187 let state = CoreState::new(None).await;
2188 let snapshot = state.intercept_snapshot().await;
2189
2190 assert_eq!(snapshot.pending_count, 0);
2191 assert_eq!(snapshot.ws_pending_count, 0);
2192 }
2193
2194 #[tokio::test]
2195 async fn audit_snapshot_returns_latest_events_in_order() {
2196 let state = CoreState::new(None).await;
2197 state.record_audit_event(AuditEvent::new(
2198 AuditActor::Runtime,
2199 AuditEventKind::RuleChanged,
2200 "first",
2201 AuditOutcome::Success,
2202 json!({ "index": 1 }),
2203 ));
2204 state.record_audit_event(AuditEvent::new(
2205 AuditActor::Http,
2206 AuditEventKind::PolicyUpdated,
2207 "second",
2208 AuditOutcome::Success,
2209 json!({ "index": 2 }),
2210 ));
2211
2212 let snapshot = state.audit_snapshot(1);
2213
2214 assert_eq!(snapshot.events.len(), 1);
2215 assert_eq!(snapshot.events[0].target, "second");
2216 assert_eq!(snapshot.events[0].details["index"], 2);
2217 }
2218
2219 #[tokio::test]
2220 async fn query_audit_snapshot_filters_in_memory_events() {
2221 let state = CoreState::new(None).await;
2222 state.record_audit_event(AuditEvent::new(
2223 AuditActor::Http,
2224 AuditEventKind::RuleChanged,
2225 "rule-1",
2226 AuditOutcome::Success,
2227 json!({ "idx": 1 }),
2228 ));
2229 state.record_audit_event(AuditEvent::new(
2230 AuditActor::Probe,
2231 AuditEventKind::PolicyUpdated,
2232 "policy",
2233 AuditOutcome::Failed,
2234 json!({ "idx": 2 }),
2235 ));
2236
2237 let snapshot = state
2238 .query_audit_snapshot(CoreAuditQuery {
2239 actor: Some(AuditActor::Probe),
2240 kind: Some(AuditEventKind::PolicyUpdated),
2241 outcome: Some(AuditOutcome::Failed),
2242 limit: 10,
2243 ..Default::default()
2244 })
2245 .await;
2246
2247 assert_eq!(snapshot.events.len(), 1);
2248 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2249 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2250 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2251 }
2252
2253 #[tokio::test]
2254 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2255 let state = CoreState::new(Some(sqlite_url())).await;
2256 state.update_policy_from(
2257 AuditActor::Http,
2258 "policy".to_string(),
2259 ProxyPolicy {
2260 transparent_enabled: true,
2261 ..Default::default()
2262 },
2263 );
2264
2265 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2266 for _ in 0..10 {
2267 snapshot = state
2268 .query_audit_snapshot(CoreAuditQuery {
2269 actor: Some(AuditActor::Http),
2270 kind: Some(AuditEventKind::PolicyUpdated),
2271 limit: 10,
2272 ..Default::default()
2273 })
2274 .await;
2275 if !snapshot.events.is_empty() {
2276 break;
2277 }
2278 sleep(Duration::from_millis(20)).await;
2279 }
2280
2281 assert!(!snapshot.events.is_empty());
2282 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2283 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2284 }
2285
2286 #[tokio::test]
2287 async fn prepare_start_rejects_second_active_start() {
2288 let state = CoreState::new(None).await;
2289 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2290 state
2291 .prepare_start(8080, shutdown_tx)
2292 .expect("first start should succeed");
2293
2294 let (second_tx, _second_rx) = oneshot::channel();
2295 let error = state
2296 .prepare_start(8081, second_tx)
2297 .expect_err("second active start should be rejected");
2298 assert!(error.contains("already"));
2299 }
2300
2301 #[test]
2302 fn update_policy_records_audit_event() {
2303 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2304 let state = runtime.block_on(CoreState::new(None));
2305
2306 state.update_policy_from(
2307 AuditActor::Runtime,
2308 "policy".to_string(),
2309 ProxyPolicy {
2310 transparent_enabled: true,
2311 ..Default::default()
2312 },
2313 );
2314
2315 let events = state.recent_audit_events();
2316 let event = events.last().expect("audit event should exist");
2317 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2318 assert_eq!(event.outcome, AuditOutcome::Success);
2319 assert_eq!(event.details["transparent_enabled"], true);
2320 }
2321
2322 #[test]
2323 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2324 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2325 let state = runtime.block_on(CoreState::new(None));
2326 let original_timeout = state.policy_snapshot().request_timeout_ms;
2327
2328 state.patch_policy_from(
2329 AuditActor::Runtime,
2330 "policy.patch".to_string(),
2331 relay_core_api::policy::ProxyPolicyPatch {
2332 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2333 enabled: Some(true),
2334 redact_bodies: Some(true),
2335 ..Default::default()
2336 }),
2337 },
2338 );
2339
2340 let policy = state.policy_snapshot();
2341 assert_eq!(policy.request_timeout_ms, original_timeout);
2342 assert!(policy.redaction.enabled);
2343 assert!(policy.redaction.redact_bodies);
2344
2345 let events = state.recent_audit_events();
2346 let event = events.last().expect("audit event should exist");
2347 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2348 assert_eq!(event.details["redaction_enabled"], true);
2349 }
2350
2351 #[tokio::test]
2352 async fn metrics_include_audit_and_lagged_event_counters() {
2353 let state = CoreState::new(None).await;
2354
2355 state.update_policy_from(
2356 AuditActor::Runtime,
2357 "policy".to_string(),
2358 ProxyPolicy::default(),
2359 );
2360 let _ = state
2361 .resolve_intercept_with_modifications_from(
2362 AuditActor::Probe,
2363 "missing-flow:request".to_string(),
2364 "drop",
2365 None,
2366 )
2367 .await;
2368
2369 state.record_flow_events_lagged(3);
2370 state.record_audit_events_lagged(5);
2371
2372 let metrics = state.get_metrics().await;
2373 assert_eq!(metrics.audit_events_total, 2);
2374 assert_eq!(metrics.audit_events_failed, 1);
2375 assert_eq!(metrics.flow_events_lagged_total, 3);
2376 assert_eq!(metrics.audit_events_lagged_total, 5);
2377 }
2378
2379 #[tokio::test]
2380 async fn prometheus_metrics_text_contains_observability_fields() {
2381 let state = CoreState::new(None).await;
2382 state.record_flow_events_lagged(2);
2383 state.record_audit_events_lagged(4);
2384
2385 let text = state.get_metrics_prometheus_text().await;
2386 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2387 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2388 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2389 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2390 }
2391
2392 #[tokio::test]
2393 async fn search_flows_uses_store_with_offset_pagination() {
2394 let state = CoreState::new(Some(sqlite_url())).await;
2395 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2396 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2397 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2398
2399 state.upsert_flow(Box::new(flow_a));
2400 state.upsert_flow(Box::new(flow_b));
2401 state.upsert_flow(Box::new(flow_c));
2402
2403 let mut baseline = Vec::new();
2404 for _ in 0..20 {
2405 baseline = state
2406 .search_flows(FlowQuery {
2407 host: Some("api.example.com".to_string()),
2408 path_contains: None,
2409 method: None,
2410 status_min: None,
2411 status_max: None,
2412 has_error: None,
2413 is_websocket: None,
2414 limit: Some(3),
2415 offset: Some(0),
2416 })
2417 .await;
2418 if baseline.len() == 3 {
2419 break;
2420 }
2421 sleep(Duration::from_millis(20)).await;
2422 }
2423
2424 assert_eq!(baseline.len(), 3);
2425 let page = state
2426 .search_flows(FlowQuery {
2427 host: Some("api.example.com".to_string()),
2428 path_contains: None,
2429 method: None,
2430 status_min: None,
2431 status_max: None,
2432 has_error: None,
2433 is_websocket: None,
2434 limit: Some(1),
2435 offset: Some(1),
2436 })
2437 .await;
2438 assert_eq!(page.len(), 1);
2439 assert_eq!(page[0].id, baseline[1].id);
2440 }
2441
2442 #[tokio::test]
2443 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2444 let state = CoreState::new(Some(sqlite_url())).await;
2445 let first_flow = sample_http_flow(
2446 "persist.example.com",
2447 "/first",
2448 "GET",
2449 200,
2450 1_700_000_010_000,
2451 );
2452 let first_id = first_flow.id.to_string();
2453 state.upsert_flow(Box::new(first_flow));
2454 for i in 0..240 {
2455 state.upsert_flow(Box::new(sample_http_flow(
2456 "persist.example.com",
2457 &format!("/{}", i),
2458 "GET",
2459 200,
2460 1_700_000_020_000 + i,
2461 )));
2462 }
2463
2464 sleep(Duration::from_millis(200)).await;
2465
2466 let loaded = state.get_flow(first_id).await;
2467 assert!(loaded.is_some());
2468 }
2469
2470 #[tokio::test]
2471 async fn search_flows_redacts_summary_url_when_enabled() {
2472 let state = CoreState::new(None).await;
2473 state.update_policy_from(
2474 AuditActor::Runtime,
2475 "policy.redaction".to_string(),
2476 ProxyPolicy {
2477 redaction: RedactionPolicy {
2478 enabled: true,
2479 sensitive_query_keys: vec!["token".to_string()],
2480 redact_bodies: false,
2481 ..Default::default()
2482 },
2483 ..Default::default()
2484 },
2485 );
2486 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2487
2488 let mut items = Vec::new();
2489 for _ in 0..20 {
2490 items = state
2491 .search_flows(FlowQuery {
2492 host: Some("api.example.com".to_string()),
2493 path_contains: Some("/private".to_string()),
2494 ..Default::default()
2495 })
2496 .await;
2497 if !items.is_empty() {
2498 break;
2499 }
2500 sleep(Duration::from_millis(20)).await;
2501 }
2502
2503 assert!(!items.is_empty());
2504 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2505 let token = redacted
2506 .query_pairs()
2507 .find(|(k, _)| k == "token")
2508 .map(|(_, v)| v.to_string());
2509 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2510 }
2511
2512 #[tokio::test]
2513 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2514 let state = CoreState::new(Some(sqlite_url())).await;
2515 state.update_policy_from(
2516 AuditActor::Runtime,
2517 "policy.redaction".to_string(),
2518 ProxyPolicy {
2519 redaction: RedactionPolicy {
2520 enabled: true,
2521 sensitive_header_names: vec![
2522 "authorization".to_string(),
2523 "set-cookie".to_string(),
2524 ],
2525 sensitive_query_keys: vec!["token".to_string()],
2526 redact_bodies: true,
2527 },
2528 ..Default::default()
2529 },
2530 );
2531
2532 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2533 let flow_id = flow.id.to_string();
2534 state.upsert_flow(Box::new(flow));
2535 sleep(Duration::from_millis(80)).await;
2536
2537 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2538 let Layer::Http(http) = loaded.layer else {
2539 panic!("expected http layer");
2540 };
2541
2542 let auth = http
2543 .request
2544 .headers
2545 .iter()
2546 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2547 .map(|(_, v)| v.as_str());
2548 assert_eq!(auth, Some("[REDACTED]"));
2549
2550 let req_query_token = http
2551 .request
2552 .query
2553 .iter()
2554 .find(|(k, _)| k == "token")
2555 .map(|(_, v)| v.as_str());
2556 assert_eq!(req_query_token, Some("[REDACTED]"));
2557
2558 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2559 assert_eq!(req_body, Some("[REDACTED]"));
2560
2561 let response = http.response.expect("response should exist");
2562 let set_cookie = response
2563 .headers
2564 .iter()
2565 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2566 .map(|(_, v)| v.as_str());
2567 assert_eq!(set_cookie, Some("[REDACTED]"));
2568 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2569 assert_eq!(res_body, Some("[REDACTED]"));
2570 }
2571
2572 #[test]
2573 fn redact_flow_update_masks_http_body_when_enabled() {
2574 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2575 let state = runtime.block_on(CoreState::new(None));
2576 state.update_policy_from(
2577 AuditActor::Runtime,
2578 "policy.redaction".to_string(),
2579 ProxyPolicy {
2580 redaction: RedactionPolicy {
2581 enabled: true,
2582 redact_bodies: true,
2583 ..Default::default()
2584 },
2585 ..Default::default()
2586 },
2587 );
2588
2589 let update = FlowUpdate::HttpBody {
2590 flow_id: "f-1".to_string(),
2591 direction: relay_core_api::flow::Direction::ClientToServer,
2592 body: BodyData {
2593 encoding: "utf-8".to_string(),
2594 content: "super-secret".to_string(),
2595 size: 12,
2596 },
2597 };
2598 let redacted = state.redact_flow_update_for_output(update);
2599 match redacted {
2600 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2601 _ => panic!("expected http body update"),
2602 }
2603 }
2604
2605 #[cfg(feature = "script")]
2606 #[tokio::test]
2607 async fn load_script_from_records_audit_event() {
2608 let state = CoreState::new(None).await;
2609
2610 state
2611 .load_script_from(
2612 AuditActor::Tauri,
2613 "tauri.load_script".to_string(),
2614 "globalThis.onRequestHeaders = (_flow) => {};",
2615 )
2616 .await
2617 .expect("script should load");
2618
2619 let events = state.recent_audit_events();
2620 let event = events.last().expect("audit event should exist");
2621 assert_eq!(event.actor, AuditActor::Tauri);
2622 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2623 assert_eq!(event.outcome, AuditOutcome::Success);
2624 assert_eq!(event.target, "tauri.load_script");
2625 }
2626}