1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62mod log_format;
63pub mod modification;
64pub mod paths;
65pub mod rule;
66pub mod services;
67
68pub use paths::CaPaths;
71pub use relay_core_api::{flow, policy};
72pub use relay_core_lib::InterceptionResult;
73pub use relay_core_lib::rule as lib_rule;
74
75use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
76use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
77use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
78
79pub mod rule_engine {
80 pub use relay_core_lib::rule_engine::*;
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
84pub struct CoreMetrics {
85 pub flows_total: usize,
86 pub flows_in_memory: usize,
87 pub flows_dropped: usize,
88 pub intercepts_pending: usize,
89 pub ws_pending_messages: usize,
90 pub oldest_intercept_age_ms: Option<u64>,
91 pub oldest_ws_message_age_ms: Option<u64>,
92 pub rule_exec_errors: usize,
93 pub audit_events_total: usize,
94 pub audit_events_failed: usize,
95 pub flow_events_lagged_total: usize,
96 pub audit_events_lagged_total: usize,
97 pub proxy_body_degraded_total: usize,
99 pub proxy_http_request_total: usize,
101 pub proxy_sandbox_reject_total: usize,
103 pub proxy_invalid_method_total: usize,
105 pub proxy_invalid_status_total: usize,
107 pub proxy_retry_total: usize,
109 pub proxy_stream_mode_tap_total: usize,
111 pub proxy_stream_mode_degrade_total: usize,
113}
114
115impl CoreMetrics {
116 pub fn to_prometheus_text(&self) -> String {
117 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
118 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
119 format!(
120 "relay_core_flows_total {}\n\
121relay_core_flows_in_memory {}\n\
122relay_core_flows_dropped_total {}\n\
123relay_core_intercepts_pending {}\n\
124relay_core_ws_pending_messages {}\n\
125relay_core_oldest_intercept_age_ms {}\n\
126relay_core_oldest_ws_message_age_ms {}\n\
127relay_core_rule_exec_errors_total {}\n\
128relay_core_audit_events_total {}\n\
129relay_core_audit_events_failed_total {}\n\
130relay_core_flow_events_lagged_total {}\n\
131relay_core_audit_events_lagged_total {}\n\
132relay_core_proxy_body_degraded_total {}\n\
133relay_core_proxy_http_request_total {}\n\
134relay_core_proxy_sandbox_reject_total {}\n\
135relay_core_proxy_invalid_method_total {}\n\
136relay_core_proxy_invalid_status_total {}\n\
137relay_core_proxy_retry_total {}\n\
138relay_core_proxy_stream_mode_tap_total {}\n\
139relay_core_proxy_stream_mode_degrade_total {}\n",
140 self.flows_total,
141 self.flows_in_memory,
142 self.flows_dropped,
143 self.intercepts_pending,
144 self.ws_pending_messages,
145 oldest_intercept_age_ms,
146 oldest_ws_message_age_ms,
147 self.rule_exec_errors,
148 self.audit_events_total,
149 self.audit_events_failed,
150 self.flow_events_lagged_total,
151 self.audit_events_lagged_total,
152 self.proxy_body_degraded_total,
153 self.proxy_http_request_total,
154 self.proxy_sandbox_reject_total,
155 self.proxy_invalid_method_total,
156 self.proxy_invalid_status_total,
157 self.proxy_retry_total,
158 self.proxy_stream_mode_tap_total,
159 self.proxy_stream_mode_degrade_total,
160 )
161 }
162}
163
164#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
165pub struct CoreStatusSnapshot {
166 pub phase: RuntimeLifecyclePhase,
167 pub running: bool,
168 pub port: Option<u16>,
169 pub uptime: Option<u64>,
170 pub last_error: Option<String>,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
174pub struct CoreStatusReport {
175 pub status: CoreStatusSnapshot,
176 pub metrics: CoreMetrics,
177}
178
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
180pub struct CoreInterceptSnapshot {
181 pub pending_count: usize,
182 pub ws_pending_count: usize,
183}
184
185#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
186pub struct CoreAuditSnapshot {
187 pub events: Vec<AuditEvent>,
188}
189
190#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
191pub struct CoreAuditQuery {
192 pub since_ms: Option<u64>,
193 pub until_ms: Option<u64>,
194 pub actor: Option<AuditActor>,
195 pub kind: Option<AuditEventKind>,
196 pub outcome: Option<AuditOutcome>,
197 pub limit: usize,
198}
199
200impl Default for CoreAuditQuery {
201 fn default() -> Self {
202 Self {
203 since_ms: None,
204 until_ms: None,
205 actor: None,
206 kind: None,
207 outcome: None,
208 limit: 50,
209 }
210 }
211}
212
213#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
214#[serde(rename_all = "snake_case")]
215pub enum RuntimeLifecyclePhase {
216 Created,
217 Starting,
218 Running,
219 Stopping,
220 Stopped,
221 Failed,
222}
223
224impl RuntimeLifecyclePhase {
225 pub fn as_str(&self) -> &'static str {
226 match self {
227 Self::Created => "created",
228 Self::Starting => "starting",
229 Self::Running => "running",
230 Self::Stopping => "stopping",
231 Self::Stopped => "stopped",
232 Self::Failed => "failed",
233 }
234 }
235
236 pub fn is_active(&self) -> bool {
237 matches!(self, Self::Starting | Self::Running | Self::Stopping)
238 }
239}
240
241#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
242pub struct RuntimeLifecycle {
243 pub phase: RuntimeLifecyclePhase,
244 pub port: Option<u16>,
245 pub started_at_ms: Option<u64>,
246 pub last_error: Option<String>,
247}
248
249impl RuntimeLifecycle {
250 pub fn created() -> Self {
251 Self {
252 phase: RuntimeLifecyclePhase::Created,
253 port: None,
254 started_at_ms: None,
255 last_error: None,
256 }
257 }
258
259 pub fn is_active(&self) -> bool {
260 self.phase.is_active()
261 }
262
263 pub fn uptime_seconds(&self) -> Option<u64> {
264 let started_at_ms = self.started_at_ms?;
265 let now_ms = now_unix_ms();
266 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
267 }
268}
269
270pub enum ProxySpawnResult {
271 Started(tokio::task::JoinHandle<()>),
272 AlreadyRunning,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum ProxyStopResult {
277 Stopping,
278 NotRunning,
279}
280
281impl From<RuntimeLifecycle> for CoreStatusSnapshot {
282 fn from(lifecycle: RuntimeLifecycle) -> Self {
283 Self {
284 running: lifecycle.is_active(),
285 uptime: lifecycle.uptime_seconds(),
286 port: lifecycle.port,
287 last_error: lifecycle.last_error,
288 phase: lifecycle.phase,
289 }
290 }
291}
292
293pub struct CoreState {
294 flow_store: mpsc::Sender<FlowStoreMessage>,
295 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
296 rule_store: mpsc::Sender<RuleStoreMessage>,
297 store: Option<Store>,
298 #[cfg(feature = "script")]
299 pub script_interceptor: Arc<ScriptInterceptor>,
300 pub policy_tx: watch::Sender<ProxyPolicy>,
301 pub flows_dropped: Arc<AtomicUsize>,
302 audit_events_total: Arc<AtomicUsize>,
303 audit_events_failed: Arc<AtomicUsize>,
304 flow_events_lagged_total: Arc<AtomicUsize>,
305 audit_events_lagged_total: Arc<AtomicUsize>,
306 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
308 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
309 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
310 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
311 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
312}
313
314impl CoreState {
315 pub async fn new(db_url: Option<String>) -> Self {
316 const AUDIT_HISTORY_LIMIT: usize = 200;
317
318 let store = if let Some(url) = db_url {
319 match Store::connect(&url).await {
320 Ok(s) => {
321 if let Err(e) = s.init().await {
322 tracing::error!("Failed to init store: {}", e);
323 }
324 Some(s)
325 }
326 Err(e) => {
327 tracing::error!("Failed to connect to store: {}", e);
328 None
329 }
330 }
331 } else {
332 None
333 };
334
335 let (flow_tx, flow_rx) = mpsc::channel(10000);
336 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
337 tokio::spawn(flow_actor.run());
338
339 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
340 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
341 tokio::spawn(intercept_actor.run());
342
343 let (rule_tx, rule_rx) = mpsc::channel(100);
344 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
345 tokio::spawn(rule_actor.run());
346
347 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
348 let (flow_broadcast_tx, _) = broadcast::channel(1000);
349 let (audit_broadcast_tx, _) = broadcast::channel(256);
350 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
351
352 #[cfg(feature = "script")]
353 let script_interceptor = ScriptInterceptor::new()
354 .await
355 .expect("Failed to initialize ScriptInterceptor");
356 Self {
357 flow_store: flow_tx,
358 intercept_broker: intercept_tx,
359 rule_store: rule_tx,
360 store,
361 #[cfg(feature = "script")]
362 script_interceptor: Arc::new(script_interceptor),
363 policy_tx,
364 flows_dropped: Arc::new(AtomicUsize::new(0)),
365 audit_events_total: Arc::new(AtomicUsize::new(0)),
366 audit_events_failed: Arc::new(AtomicUsize::new(0)),
367 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
368 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
369 flow_broadcast_tx,
370 audit_broadcast_tx,
371 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
372 lifecycle_tx,
373 shutdown_tx: Mutex::new(None),
374 }
375 }
376
377 pub async fn get_metrics(&self) -> CoreMetrics {
378 let (flow_tx, flow_rx) = oneshot::channel();
379 let _ = self
380 .flow_store
381 .send(FlowStoreMessage::GetMetrics(flow_tx))
382 .await;
383 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
384
385 let (int_tx, int_rx) = oneshot::channel();
386 let _ = self
387 .intercept_broker
388 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
389 .await;
390 let (
391 intercepts_pending,
392 ws_pending_messages,
393 oldest_intercept_age_ms,
394 oldest_ws_message_age_ms,
395 ) = int_rx.await.unwrap_or((0, 0, None, None));
396
397 let (rule_tx, rule_rx) = oneshot::channel();
398 let _ = self
399 .rule_store
400 .send(RuleStoreMessage::GetMetrics(rule_tx))
401 .await;
402 let rule_exec_errors = rule_rx.await.unwrap_or(0);
403
404 CoreMetrics {
405 flows_total,
406 flows_in_memory,
407 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
408 intercepts_pending,
409 ws_pending_messages,
410 oldest_intercept_age_ms,
411 oldest_ws_message_age_ms,
412 rule_exec_errors,
413 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
414 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
415 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
416 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
417 proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
418 proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
419 proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
420 proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
421 proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
422 proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
423 proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
424 proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
425 ),
426 }
427 }
428
429 pub async fn get_metrics_prometheus_text(&self) -> String {
430 let mut text = self.get_metrics().await.to_prometheus_text();
431 #[cfg(feature = "script")]
432 {
433 text.push_str(&self.script_interceptor.metrics.prometheus_lines());
434 }
435 text
436 }
437
438 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
439 self.lifecycle().into()
440 }
441
442 pub async fn status_report(&self) -> CoreStatusReport {
443 CoreStatusReport {
444 status: self.status_snapshot(),
445 metrics: self.get_metrics().await,
446 }
447 }
448
449 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
450 let metrics = self.get_metrics().await;
451 CoreInterceptSnapshot {
452 pending_count: metrics.intercepts_pending,
453 ws_pending_count: metrics.ws_pending_messages,
454 }
455 }
456
457 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
458 let events = self.recent_audit_events();
459 let start = events.len().saturating_sub(limit);
460 CoreAuditSnapshot {
461 events: events.into_iter().skip(start).collect(),
462 }
463 }
464
465 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
466 let limit = query.limit.clamp(1, 500);
467 if let Some(store) = &self.store {
468 let rows = store
469 .query_audit_events(
470 query.since_ms,
471 query.until_ms,
472 query.actor.as_ref().map(AuditActor::as_str),
473 query.kind.as_ref().map(AuditEventKind::as_str),
474 query.outcome.as_ref().map(AuditOutcome::as_str),
475 limit,
476 )
477 .await
478 .unwrap_or_default();
479
480 let mut events = Vec::with_capacity(rows.len());
481 for row in rows {
482 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
483 events.push(event);
484 }
485 }
486 return CoreAuditSnapshot { events };
487 }
488
489 let mut events = self.recent_audit_events();
490 events.reverse();
491 let filtered = events
492 .into_iter()
493 .filter(|event| {
494 query
495 .since_ms
496 .map(|v| event.timestamp_ms >= v)
497 .unwrap_or(true)
498 })
499 .filter(|event| {
500 query
501 .until_ms
502 .map(|v| event.timestamp_ms <= v)
503 .unwrap_or(true)
504 })
505 .filter(|event| {
506 query
507 .actor
508 .as_ref()
509 .map(|v| &event.actor == v)
510 .unwrap_or(true)
511 })
512 .filter(|event| {
513 query
514 .kind
515 .as_ref()
516 .map(|v| &event.kind == v)
517 .unwrap_or(true)
518 })
519 .filter(|event| {
520 query
521 .outcome
522 .as_ref()
523 .map(|v| &event.outcome == v)
524 .unwrap_or(true)
525 })
526 .take(limit)
527 .collect();
528 CoreAuditSnapshot { events: filtered }
529 }
530
531 pub async fn get_flow(&self, id: String) -> Option<Flow> {
532 let (tx, rx) = oneshot::channel();
533 if let Err(e) = self
534 .flow_store
535 .send(FlowStoreMessage::GetFlow {
536 id: id.clone(),
537 respond_to: tx,
538 })
539 .await
540 {
541 error!("Failed to send GetFlow request: {}", e);
542 if let Some(store) = &self.store {
543 return store
544 .load_flow(&id)
545 .await
546 .ok()
547 .flatten()
548 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
549 }
550 return None;
551 }
552 let flow = rx
553 .await
554 .map_err(|e| {
555 error!("Failed to receive Flow response: {}", e);
556 e
557 })
558 .unwrap_or(None);
559 if let Some(flow) = flow {
560 return Some(redact_flow(flow, &self.current_redaction_policy()));
561 }
562 if let Some(store) = &self.store {
563 return store
564 .load_flow(&id)
565 .await
566 .ok()
567 .flatten()
568 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
569 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
570 }
571 None
572 }
573
574 pub async fn get_rules(&self) -> Vec<Rule> {
575 let (tx, rx) = oneshot::channel();
576 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
577 error!("Failed to send GetRules request: {}", e);
578 return Vec::new();
579 }
580 rx.await
581 .map_err(|e| {
582 error!("Failed to receive Rules response: {}", e);
583 e
584 })
585 .unwrap_or_default()
586 }
587
588 pub async fn set_rules(&self, rules: Vec<Rule>) {
589 let _ = self
590 .set_rules_from(
591 AuditActor::Runtime,
592 "rules.replace",
593 "rule_set".to_string(),
594 json!({}),
595 rules,
596 )
597 .await;
598 }
599
600 pub async fn upsert_rule_from(
601 &self,
602 actor: AuditActor,
603 operation: &str,
604 target: String,
605 details: serde_json::Value,
606 rule: Rule,
607 ) -> Result<(), String> {
608 let rule_id = rule.id.clone();
609 let mut rules = self.get_rules().await;
610 rules.retain(|existing| existing.id != rule_id);
611 rules.push(rule);
612 self.set_rules_from(actor, operation, target, details, rules)
613 .await
614 }
615
616 pub async fn delete_rule_from(
617 &self,
618 actor: AuditActor,
619 operation: &str,
620 target: String,
621 details: serde_json::Value,
622 rule_id: &str,
623 ) -> Result<bool, String> {
624 let mut rules = self.get_rules().await;
625 let before = rules.len();
626 rules.retain(|rule| rule.id != rule_id);
627 if rules.len() == before {
628 return Ok(false);
629 }
630 self.set_rules_from(actor, operation, target, details, rules)
631 .await
632 .map(|_| true)
633 }
634
635 pub async fn create_mock_response_rule_from(
636 &self,
637 actor: AuditActor,
638 target: String,
639 details: serde_json::Value,
640 config: MockResponseRuleConfig,
641 ) -> Result<String, String> {
642 let rule_id = config.rule_id.clone();
643 let rule = build_mock_response_rule(config);
644 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
645 .await
646 .map(|_| rule_id)
647 }
648
649 pub async fn create_intercept_rule_from(
650 &self,
651 actor: AuditActor,
652 target: String,
653 details: serde_json::Value,
654 config: InterceptRuleConfig,
655 ) -> Result<String, String> {
656 let rule_id = config.rule_id.clone();
657 let mut rules = self.get_rules().await;
658 rules.extend(build_intercept_rules(config));
659 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
660 .await
661 .map(|_| rule_id)
662 }
663
664 pub async fn upsert_legacy_intercept_rule_from(
665 &self,
666 actor: AuditActor,
667 target: String,
668 details: serde_json::Value,
669 rule: InterceptRule,
670 ) -> Result<(), String> {
671 let rule_id = rule.id.clone();
672 let mut rules = self.get_rules().await;
673 rules.retain(|existing| {
674 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
675 });
676 rules.extend(rule.to_rules());
677 self.set_rules_from(
678 actor,
679 "rule.intercept_legacy_upsert",
680 target,
681 details,
682 rules,
683 )
684 .await
685 }
686
687 pub async fn set_rules_from(
688 &self,
689 actor: AuditActor,
690 operation: &str,
691 target: String,
692 details: serde_json::Value,
693 rules: Vec<Rule>,
694 ) -> Result<(), String> {
695 let rule_count = rules.len();
696 if let Err(e) = self
697 .rule_store
698 .send(RuleStoreMessage::SetRules(rules))
699 .await
700 {
701 error!("Failed to send SetRules request: {}", e);
702 self.record_audit_event(AuditEvent::new(
703 actor,
704 AuditEventKind::RuleChanged,
705 target,
706 AuditOutcome::Failed,
707 json!({
708 "operation": operation,
709 "rule_count": rule_count,
710 "details": details,
711 "error": e.to_string()
712 }),
713 ));
714 return Err(e.to_string());
715 }
716
717 self.record_audit_event(AuditEvent::new(
718 actor,
719 AuditEventKind::RuleChanged,
720 target,
721 AuditOutcome::Success,
722 json!({
723 "operation": operation,
724 "rule_count": rule_count,
725 "details": details
726 }),
727 ));
728 Ok(())
729 }
730
731 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
732 let mut new_rules = Vec::new();
733 for rule in rules {
734 new_rules.extend(rule.to_rules());
735 }
736 self.set_rules(new_rules).await;
737 }
738
739 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
740 let (tx, rx) = oneshot::channel();
741 if let Err(e) = self
742 .rule_store
743 .send(RuleStoreMessage::GetRuleEngine(tx))
744 .await
745 {
746 error!("Failed to send GetRuleEngine request: {}", e);
747 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
748 }
749 rx.await
750 .map_err(|e| {
751 error!("Failed to receive RuleEngine response: {}", e);
752 e
753 })
754 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
755 }
756
757 pub fn update_policy(&self, policy: ProxyPolicy) {
758 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
759 }
760
761 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
762 let mut policy = self.policy_snapshot();
763 policy.apply_patch(patch);
764 self.update_policy_from(actor, target, policy);
765 }
766
767 pub fn policy_snapshot(&self) -> ProxyPolicy {
768 self.policy_tx.borrow().clone()
769 }
770
771 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
772 let details = json!({
773 "strict_http_semantics": policy.strict_http_semantics,
774 "request_timeout_ms": policy.request_timeout_ms,
775 "max_body_size": policy.max_body_size,
776 "transparent_enabled": policy.transparent_enabled,
777 "redaction_enabled": policy.redaction.enabled,
778 "redact_bodies": policy.redaction.redact_bodies
779 });
780
781 self.policy_tx.send_replace(policy);
782 self.record_audit_event(AuditEvent::new(
783 actor,
784 AuditEventKind::PolicyUpdated,
785 target,
786 AuditOutcome::Success,
787 details,
788 ));
789 }
790
791 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
792 if let Err(e) = self
793 .intercept_broker
794 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
795 .await
796 {
797 error!("Failed to send RegisterIntercept request: {}", e);
798 }
799 }
800
801 pub async fn resolve_intercept(
802 &self,
803 key: String,
804 result: InterceptionResult,
805 ) -> Result<(), String> {
806 let (tx, rx) = oneshot::channel();
807 if let Err(e) = self
808 .intercept_broker
809 .send(InterceptBrokerMessage::ResolveIntercept {
810 key,
811 result,
812 respond_to: tx,
813 })
814 .await
815 {
816 error!("Failed to send ResolveIntercept request: {}", e);
817 return Err(e.to_string());
818 }
819 rx.await.map_err(|_| "Actor dropped".to_string())?
820 }
821
822 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
823 let (tx, rx) = oneshot::channel();
824 if let Err(e) = self
825 .intercept_broker
826 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
827 key,
828 respond_to: tx,
829 })
830 .await
831 {
832 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
833 return None;
834 }
835 rx.await
836 .map_err(|e| {
837 error!("Failed to receive WebSocketMessage response: {}", e);
838 e
839 })
840 .unwrap_or(None)
841 }
842
843 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
844 if let Err(e) = self
845 .intercept_broker
846 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
847 .await
848 {
849 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
850 }
851 }
852
853 pub async fn is_intercept_pending(&self, key: String) -> bool {
854 let (tx, rx) = oneshot::channel();
855 if let Err(e) = self
856 .intercept_broker
857 .send(InterceptBrokerMessage::GetPendingIntercept {
858 key,
859 respond_to: tx,
860 })
861 .await
862 {
863 error!("Failed to send GetPendingIntercept request: {}", e);
864 return false;
865 }
866 rx.await
867 .map_err(|e| {
868 error!("Failed to receive PendingIntercept response: {}", e);
869 e
870 })
871 .unwrap_or(false)
872 }
873
874 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
875 let (tx, rx) = oneshot::channel();
876 if let Err(e) = self
877 .intercept_broker
878 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
879 flow_id,
880 respond_to: tx,
881 })
882 .await
883 {
884 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
885 return false;
886 }
887 rx.await
888 .map_err(|e| {
889 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
890 e
891 })
892 .unwrap_or(false)
893 }
894
895 pub fn upsert_flow(&self, flow: Box<Flow>) {
896 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
897 error!("FlowStore dropped flow: {}", e);
899 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
900 }
901 }
902
903 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
904 if let Err(e) = self
905 .flow_store
906 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
907 {
908 error!("FlowStore dropped WS message: {}", e);
909 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
910 }
911 }
912
913 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
914 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
915 flow_id,
916 body,
917 direction,
918 }) {
919 error!("FlowStore dropped HTTP body: {}", e);
920 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
921 }
922 }
923
924 pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
926 if let Err(e) = self
927 .flow_store
928 .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
929 {
930 error!("FlowStore dropped budget-exceeded tag: {}", e);
931 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
932 }
933 }
934
935 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
937 self.flow_broadcast_tx.subscribe()
938 }
939
940 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
941 self.audit_broadcast_tx.subscribe()
942 }
943
944 fn current_redaction_policy(&self) -> RedactionPolicy {
945 self.policy_tx.borrow().redaction.clone()
946 }
947
948 pub fn record_flow_events_lagged(&self, skipped: u64) {
949 self.flow_events_lagged_total
950 .fetch_add(skipped as usize, Ordering::Relaxed);
951 }
952
953 pub fn record_audit_events_lagged(&self, skipped: u64) {
954 self.audit_events_lagged_total
955 .fetch_add(skipped as usize, Ordering::Relaxed);
956 }
957
958 pub fn lifecycle(&self) -> RuntimeLifecycle {
959 self.lifecycle_tx.borrow().clone()
960 }
961
962 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
963 self.lifecycle_tx.subscribe()
964 }
965
966 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
967 let current = self.lifecycle();
968 if current.is_active() {
969 return Err(format!(
970 "Proxy is already {} on port {}",
971 current.phase.as_str(),
972 current.port.unwrap_or(port)
973 ));
974 }
975
976 let mut guard = self
977 .shutdown_tx
978 .lock()
979 .map_err(|_| "shutdown state poisoned".to_string())?;
980 *guard = Some(shutdown_tx);
981 drop(guard);
982
983 self.update_lifecycle(RuntimeLifecycle {
984 phase: RuntimeLifecyclePhase::Starting,
985 port: Some(port),
986 started_at_ms: None,
987 last_error: None,
988 });
989 Ok(())
990 }
991
992 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
993 let mut guard = self
994 .shutdown_tx
995 .lock()
996 .map_err(|_| "shutdown state poisoned".to_string())?;
997 let Some(tx) = guard.take() else {
998 return Ok(ProxyStopResult::NotRunning);
999 };
1000 drop(guard);
1001
1002 let current = self.lifecycle();
1003 self.update_lifecycle(RuntimeLifecycle {
1004 phase: RuntimeLifecyclePhase::Stopping,
1005 port: current.port,
1006 started_at_ms: current.started_at_ms,
1007 last_error: current.last_error,
1008 });
1009 let _ = tx.send(());
1010 Ok(ProxyStopResult::Stopping)
1011 }
1012
1013 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1014 self.audit_history
1015 .lock()
1016 .map(|events| events.iter().cloned().collect())
1017 .unwrap_or_default()
1018 }
1019
1020 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1022 let redaction = self.current_redaction_policy();
1023 if let Some(store) = &self.store {
1024 return store
1025 .query_flow_summaries(&query)
1026 .await
1027 .unwrap_or_default()
1028 .into_iter()
1029 .map(|summary| redact_flow_summary(summary, &redaction))
1030 .collect();
1031 }
1032 let (tx, rx) = oneshot::channel();
1033 if let Err(e) = self
1034 .flow_store
1035 .send(FlowStoreMessage::SearchFlows {
1036 query,
1037 respond_to: tx,
1038 })
1039 .await
1040 {
1041 error!("Failed to send SearchFlows request: {}", e);
1042 return Vec::new();
1043 }
1044 rx.await
1045 .unwrap_or_default()
1046 .into_iter()
1047 .map(|summary| redact_flow_summary(summary, &redaction))
1048 .collect()
1049 }
1050
1051 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1052 let redaction = self.current_redaction_policy();
1053 redact_flow_update(update, &redaction)
1054 }
1055
1056 pub async fn resolve_intercept_with_modifications(
1066 &self,
1067 key: String,
1068 action: &str,
1069 mods: Option<relay_core_api::modification::FlowModification>,
1070 ) -> Result<(), String> {
1071 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1072 .await
1073 }
1074
1075 pub async fn resolve_intercept_with_modifications_from(
1076 &self,
1077 actor: AuditActor,
1078 key: String,
1079 action: &str,
1080 mods: Option<relay_core_api::modification::FlowModification>,
1081 ) -> Result<(), String> {
1082 let modified_fields = modification_field_names(mods.as_ref());
1083 let result = match action {
1084 "drop" => InterceptionResult::Drop,
1085 _ => match mods {
1086 None => InterceptionResult::Continue,
1087 Some(m) => {
1088 let parts: Vec<&str> = key.splitn(4, ':').collect();
1089 match parts.as_slice() {
1090 [flow_id, phase] => {
1091 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1092 modification::apply_flow_modification(&flow, phase, m)
1093 } else {
1094 InterceptionResult::Continue
1095 }
1096 }
1097 [_, "ws_msg", _] => {
1100 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1101 modification::apply_ws_modification(&msg, m)
1102 } else {
1103 InterceptionResult::Continue
1104 }
1105 }
1106 _ => InterceptionResult::Continue,
1107 }
1108 }
1109 },
1110 };
1111 let outcome = self.resolve_intercept(key.clone(), result).await;
1112 let audit_outcome = if outcome.is_ok() {
1113 AuditOutcome::Success
1114 } else {
1115 AuditOutcome::Failed
1116 };
1117 let error_message = outcome.as_ref().err().cloned();
1118
1119 self.record_audit_event(AuditEvent::new(
1120 actor,
1121 AuditEventKind::InterceptResolved,
1122 key,
1123 audit_outcome,
1124 json!({
1125 "action": action,
1126 "has_modifications": !modified_fields.is_empty(),
1127 "modified_fields": modified_fields,
1128 "error": error_message
1129 }),
1130 ));
1131 outcome
1132 }
1133
1134 #[cfg(feature = "script")]
1135 pub async fn load_script_from(
1136 &self,
1137 actor: AuditActor,
1138 target: String,
1139 script: &str,
1140 ) -> Result<(), String> {
1141 let result = self
1142 .script_interceptor
1143 .load_script(script)
1144 .await
1145 .map_err(|e| e.to_string());
1146
1147 let outcome = if result.is_ok() {
1148 AuditOutcome::Success
1149 } else {
1150 AuditOutcome::Failed
1151 };
1152 let error_message = result.as_ref().err().cloned();
1153
1154 self.record_audit_event(AuditEvent::new(
1155 actor,
1156 AuditEventKind::ScriptReloaded,
1157 target,
1158 outcome,
1159 json!({
1160 "script_bytes": script.len(),
1161 "error": error_message
1162 }),
1163 ));
1164
1165 result
1166 }
1167
1168 #[cfg(feature = "script")]
1170 pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1171 self.script_interceptor.set_env_allow(env_allow).await;
1172 }
1173
1174 fn record_audit_event(&self, event: AuditEvent) {
1175 const AUDIT_HISTORY_LIMIT: usize = 200;
1176 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1177 if event.outcome == AuditOutcome::Failed {
1178 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1179 }
1180
1181 let details = log_format::audit_details_log(&event.kind, &event.details);
1182 tracing::info!(
1183 target: "relay_core_audit",
1184 event_id = %event.id,
1185 actor = %event.actor.as_str(),
1186 kind = %event.kind.as_str(),
1187 target = %event.target,
1188 outcome = %event.outcome.as_str(),
1189 details = %details
1190 );
1191
1192 if let Ok(mut history) = self.audit_history.lock() {
1193 if history.len() >= AUDIT_HISTORY_LIMIT {
1194 history.pop_front();
1195 }
1196 history.push_back(event.clone());
1197 }
1198
1199 if let Some(store) = self.store.clone() {
1200 let event_json = serde_json::to_value(&event).unwrap_or_default();
1201 let event_id = event.id.clone();
1202 let timestamp_ms = event.timestamp_ms;
1203 let actor = event.actor.as_str().to_string();
1204 let kind = event.kind.as_str().to_string();
1205 let target = event.target.clone();
1206 let outcome = event.outcome.as_str().to_string();
1207 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1208 handle.spawn(async move {
1209 if let Err(e) = store
1210 .save_audit_event(AuditEventRecord {
1211 id: &event_id,
1212 timestamp_ms,
1213 actor: &actor,
1214 kind: &kind,
1215 target: &target,
1216 outcome: &outcome,
1217 content: &event_json,
1218 })
1219 .await
1220 {
1221 tracing::error!("Failed to persist audit event: {}", e);
1222 }
1223 });
1224 }
1225 }
1226
1227 let _ = self.audit_broadcast_tx.send(event);
1228 }
1229
1230 fn transition_to_running(&self, port: u16) {
1231 self.update_lifecycle(RuntimeLifecycle {
1232 phase: RuntimeLifecyclePhase::Running,
1233 port: Some(port),
1234 started_at_ms: Some(now_unix_ms()),
1235 last_error: None,
1236 });
1237 }
1238
1239 fn transition_to_stopped(&self) {
1240 if let Ok(mut guard) = self.shutdown_tx.lock() {
1241 *guard = None;
1242 }
1243
1244 self.update_lifecycle(RuntimeLifecycle {
1245 phase: RuntimeLifecyclePhase::Stopped,
1246 port: None,
1247 started_at_ms: None,
1248 last_error: None,
1249 });
1250 }
1251
1252 fn transition_to_failed(&self, port: u16, error: String) {
1253 if let Ok(mut guard) = self.shutdown_tx.lock() {
1254 *guard = None;
1255 }
1256
1257 self.update_lifecycle(RuntimeLifecycle {
1258 phase: RuntimeLifecyclePhase::Failed,
1259 port: Some(port),
1260 started_at_ms: None,
1261 last_error: Some(error),
1262 });
1263 }
1264
1265 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1266 let err = lifecycle
1267 .last_error
1268 .as_deref()
1269 .filter(|s| !s.is_empty())
1270 .unwrap_or("-");
1271 tracing::info!(
1272 target: "relay_core_lifecycle",
1273 phase = %lifecycle.phase.as_str(),
1274 port = %log_format::opt_u16(lifecycle.port),
1275 started_at_ms = %log_format::opt_u64(lifecycle.started_at_ms),
1276 last_error = %err,
1277 );
1278 self.lifecycle_tx.send_replace(lifecycle);
1279 }
1280
1281 pub fn spawn_proxy(
1282 self: &Arc<Self>,
1283 config: ProxyConfig,
1284 sink: mpsc::Sender<FlowUpdate>,
1285 extra_interceptor: Option<Arc<dyn Interceptor>>,
1286 ) -> Result<ProxySpawnResult, String> {
1287 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1288 match self.prepare_start(config.port, shutdown_tx) {
1289 Ok(()) => {}
1290 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1291 Err(error) => return Err(error),
1292 }
1293 let state = self.clone();
1294 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1295 if let Err(error) = state
1296 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1297 .await
1298 {
1299 error!("Proxy failed: {}", error);
1300 }
1301 })))
1302 }
1303
1304 pub async fn start_proxy(
1305 self: &Arc<Self>,
1306 config: ProxyConfig,
1307 sink: mpsc::Sender<FlowUpdate>,
1308 extra_interceptor: Option<Arc<dyn Interceptor>>,
1309 ) -> Result<(), String> {
1310 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1311 self.prepare_start(config.port, shutdown_tx)?;
1312 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1313 .await
1314 }
1315
1316 async fn run_proxy(
1317 self: &Arc<Self>,
1318 config: ProxyConfig,
1319 sink: mpsc::Sender<FlowUpdate>,
1320 extra_interceptor: Option<Arc<dyn Interceptor>>,
1321 shutdown_rx: oneshot::Receiver<()>,
1322 ) -> Result<(), String> {
1323 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1324 let state = self.clone();
1325
1326 if let Some(parent) = config.ca_cert_path.parent()
1327 && !parent.exists()
1328 {
1329 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1330 }
1331
1332 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1333 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1334 let ca = Arc::new(ca);
1335
1336 #[cfg(feature = "script")]
1337 let script_interceptor = self.script_interceptor.clone();
1338
1339 #[cfg(feature = "script")]
1340 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1341
1342 #[cfg(not(feature = "script"))]
1343 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1344
1345 interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1346 self.clone(),
1347 )));
1348
1349 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1350 self.clone(),
1351 )));
1352
1353 if let Some(interceptor) = extra_interceptor {
1354 interceptors.push(interceptor);
1355 }
1356
1357 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1358 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1359
1360 tokio::spawn(async move {
1361 while let Some(update) = proxy_rx.recv().await {
1362 match update.clone() {
1363 FlowUpdate::Full(flow) => {
1364 state.upsert_flow(flow);
1365 }
1366 FlowUpdate::WebSocketMessage { flow_id, message } => {
1367 state.append_ws_message(flow_id, message);
1368 }
1369 FlowUpdate::HttpBody {
1370 flow_id,
1371 direction,
1372 body,
1373 } => {
1374 state.update_http_body(flow_id, body, direction);
1375 }
1376 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1377 state.tag_flow_budget_exceeded(flow_id, direction);
1379 }
1380 }
1381
1382 let _ = state.flow_broadcast_tx.send(update.clone());
1383
1384 if sink.try_send(update).is_err() {
1385 relay_core_lib::metrics::inc_flows_dropped();
1386 }
1387 }
1388 });
1389
1390 if let Some(udp_port) = config.udp_tproxy_port {
1391 let udp_proxy_tx = proxy_tx.clone();
1392 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1393 tokio::spawn(async move {
1394 match tokio::net::UdpSocket::bind(udp_addr).await {
1395 Ok(socket) => {
1396 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1397 if let Err(e) = proxy.run(udp_proxy_tx).await {
1398 error!("UDP TPROXY failed: {}", e);
1399 }
1400 }
1401 Err(e) => {
1402 error!("Failed to bind UDP TPROXY socket: {}", e);
1403 }
1404 }
1405 });
1406 }
1407
1408 let listener = match TcpListener::bind(addr).await {
1409 Ok(listener) => listener,
1410 Err(e) => {
1411 if let Ok(mut guard) = self.shutdown_tx.lock() {
1412 *guard = None;
1413 }
1414 let message = format!("Failed to bind to address {}: {}", addr, e);
1415 self.transition_to_failed(config.port, message.clone());
1416 return Err(message);
1417 }
1418 };
1419
1420 self.transition_to_running(config.port);
1421 let shutdown_rx = Some(shutdown_rx);
1422
1423 if config.transparent {
1424 let policy = ProxyPolicy {
1425 transparent_enabled: true,
1426 ..Default::default()
1427 };
1428 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1429 let policy_rx = self.policy_tx.subscribe();
1430
1431 let provider: Arc<dyn OriginalDstProvider> = {
1432 let mut addrs = BTreeSet::new();
1433 if let Ok(local) = listener.local_addr() {
1434 addrs.insert(local);
1435 }
1436
1437 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1438 {
1439 Arc::new(LinuxOriginalDstProvider::new(addrs))
1440 }
1441 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1442 {
1443 match MacOsOriginalDstProvider::new(addrs.clone()) {
1444 Ok(provider) => Arc::new(provider),
1445 Err(e) => {
1446 error!("Failed to initialize macOS PF provider: {}", e);
1447 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1448 }
1449 }
1450 }
1451 #[cfg(target_os = "windows")]
1452 {
1453 let filter =
1454 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1455 .to_string();
1456 let port = config.port;
1457 tokio::spawn(async move {
1458 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1459 .await;
1460 });
1461
1462 Arc::new(WindowsOriginalDstProvider::new(addrs))
1463 }
1464 #[cfg(not(any(
1465 all(target_os = "linux", feature = "transparent-linux"),
1466 all(target_os = "macos", feature = "transparent-macos"),
1467 target_os = "windows"
1468 )))]
1469 {
1470 Arc::new(NoOpOriginalDstProvider::new(addrs))
1471 }
1472 };
1473
1474 let source = TransparentTcpCaptureSource::new(listener, provider);
1475 let result = relay_core_lib::start_proxy(
1476 source,
1477 proxy_tx,
1478 interceptor,
1479 ca,
1480 policy_rx,
1481 None,
1482 shutdown_rx,
1483 Some(self.get_rule_engine().await),
1484 )
1485 .await
1486 .map_err(|e| e.to_string());
1487 if let Err(error) = &result {
1488 self.transition_to_failed(config.port, error.clone());
1489 } else {
1490 self.transition_to_stopped();
1491 }
1492 result
1493 } else {
1494 let source = TcpCaptureSource::new(listener);
1495 let policy = ProxyPolicy::default();
1496 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1497 let policy_rx = self.policy_tx.subscribe();
1498
1499 let result = relay_core_lib::start_proxy(
1500 source,
1501 proxy_tx,
1502 interceptor,
1503 ca,
1504 policy_rx,
1505 None,
1506 shutdown_rx,
1507 Some(self.get_rule_engine().await),
1508 )
1509 .await
1510 .map_err(|e| e.to_string());
1511 if let Err(error) = &result {
1512 self.transition_to_failed(config.port, error.clone());
1513 } else {
1514 self.transition_to_stopped();
1515 }
1516 result
1517 }
1518 }
1519}
1520
1521fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1522 match update {
1523 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1524 FlowUpdate::WebSocketMessage {
1525 flow_id,
1526 mut message,
1527 } => {
1528 message.content = redact_body(message.content, redaction);
1529 FlowUpdate::WebSocketMessage { flow_id, message }
1530 }
1531 FlowUpdate::HttpBody {
1532 flow_id,
1533 direction,
1534 body,
1535 } => FlowUpdate::HttpBody {
1536 flow_id,
1537 direction,
1538 body: redact_body(body, redaction),
1539 },
1540 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1541 FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1542 }
1543 }
1544}
1545
1546fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1547 if !redaction.enabled {
1548 return flow;
1549 }
1550 match &mut flow.layer {
1551 Layer::Http(http) => {
1552 redact_http_request(&mut http.request, redaction);
1553 if let Some(response) = &mut http.response {
1554 redact_headers(&mut response.headers, redaction);
1555 response.body = response
1556 .body
1557 .take()
1558 .map(|body| redact_body(body, redaction));
1559 }
1560 }
1561 Layer::WebSocket(ws) => {
1562 redact_http_request(&mut ws.handshake_request, redaction);
1563 redact_headers(&mut ws.handshake_response.headers, redaction);
1564 ws.handshake_response.body = ws
1565 .handshake_response
1566 .body
1567 .take()
1568 .map(|body| redact_body(body, redaction));
1569 for message in &mut ws.messages {
1570 message.content = redact_body(message.content.clone(), redaction);
1571 }
1572 }
1573 _ => {}
1574 }
1575 flow
1576}
1577
1578fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1579 if !redaction.enabled {
1580 return summary;
1581 }
1582 summary.url = redact_url_string(&summary.url, redaction);
1583 summary
1584}
1585
1586fn redact_http_request(
1587 request: &mut relay_core_api::flow::HttpRequest,
1588 redaction: &RedactionPolicy,
1589) {
1590 redact_headers(&mut request.headers, redaction);
1591 redact_query_pairs(&mut request.query, redaction);
1592 request.url = redact_url(&request.url, redaction);
1593 request.body = request.body.take().map(|body| redact_body(body, redaction));
1594}
1595
1596fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1597 if !redaction.enabled {
1598 return;
1599 }
1600 let sensitive = redaction_set(&redaction.sensitive_header_names);
1601 for (name, value) in headers.iter_mut() {
1602 if sensitive.contains(&name.to_ascii_lowercase()) {
1603 *value = "[REDACTED]".to_string();
1604 }
1605 }
1606}
1607
1608fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1609 if !redaction.enabled {
1610 return;
1611 }
1612 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1613 for (name, value) in query.iter_mut() {
1614 if sensitive.contains(&name.to_ascii_lowercase()) {
1615 *value = "[REDACTED]".to_string();
1616 }
1617 }
1618}
1619
1620fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1621 if !redaction.enabled {
1622 return url.clone();
1623 }
1624 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1625 let pairs: Vec<(String, String)> = url
1626 .query_pairs()
1627 .map(|(k, v)| {
1628 let key = k.to_string();
1629 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1630 "[REDACTED]".to_string()
1631 } else {
1632 v.to_string()
1633 };
1634 (key, value)
1635 })
1636 .collect();
1637 let mut next = url.clone();
1638 if pairs.is_empty() {
1639 return next;
1640 }
1641 next.query_pairs_mut().clear();
1642 for (k, v) in pairs {
1643 next.query_pairs_mut().append_pair(&k, &v);
1644 }
1645 next
1646}
1647
1648fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1649 match url::Url::parse(input) {
1650 Ok(url) => redact_url(&url, redaction).to_string(),
1651 Err(_) => input.to_string(),
1652 }
1653}
1654
1655fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1656 if redaction.enabled && redaction.redact_bodies {
1657 body.content = "[REDACTED]".to_string();
1658 }
1659 body
1660}
1661
1662fn redaction_set(values: &[String]) -> HashSet<String> {
1663 values
1664 .iter()
1665 .map(|value| value.to_ascii_lowercase())
1666 .collect()
1667}
1668
1669#[derive(Clone)]
1670pub struct ProxyConfig {
1671 pub port: u16,
1672 pub ca_cert_path: std::path::PathBuf,
1673 pub ca_key_path: std::path::PathBuf,
1674 pub transparent: bool,
1675 pub udp_tproxy_port: Option<u16>,
1676}
1677
1678impl ProxyConfig {
1679 pub fn new(
1680 port: u16,
1681 ca_cert_path: std::path::PathBuf,
1682 ca_key_path: std::path::PathBuf,
1683 ) -> Self {
1684 Self {
1685 port,
1686 ca_cert_path,
1687 ca_key_path,
1688 transparent: false,
1689 udp_tproxy_port: None,
1690 }
1691 }
1692
1693 pub fn from_app_data_dir(
1694 app_data_dir: impl Into<std::path::PathBuf>,
1695 port: u16,
1696 ) -> Result<Self, String> {
1697 let app_data_dir = app_data_dir.into();
1698 if !app_data_dir.exists() {
1699 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1700 }
1701
1702 Ok(Self::new(
1703 port,
1704 app_data_dir.join("ca_cert.pem"),
1705 app_data_dir.join("ca_key.pem"),
1706 ))
1707 }
1708
1709 pub fn with_transparent(mut self, transparent: bool) -> Self {
1710 self.transparent = transparent;
1711 self
1712 }
1713
1714 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1715 self.udp_tproxy_port = udp_tproxy_port;
1716 self
1717 }
1718}
1719
1720fn now_unix_ms() -> u64 {
1721 SystemTime::now()
1722 .duration_since(UNIX_EPOCH)
1723 .unwrap_or_default()
1724 .as_millis() as u64
1725}
1726
1727fn modification_field_names(
1728 mods: Option<&relay_core_api::modification::FlowModification>,
1729) -> Vec<&'static str> {
1730 let Some(mods) = mods else {
1731 return Vec::new();
1732 };
1733
1734 let mut fields = Vec::new();
1735 if mods.method.is_some() {
1736 fields.push("method");
1737 }
1738 if mods.url.is_some() {
1739 fields.push("url");
1740 }
1741 if mods.request_headers.is_some() {
1742 fields.push("request_headers");
1743 }
1744 if mods.request_body.is_some() {
1745 fields.push("request_body");
1746 }
1747 if mods.status_code.is_some() {
1748 fields.push("status_code");
1749 }
1750 if mods.response_headers.is_some() {
1751 fields.push("response_headers");
1752 }
1753 if mods.response_body.is_some() {
1754 fields.push("response_body");
1755 }
1756 if mods.message_content.is_some() {
1757 fields.push("message_content");
1758 }
1759 fields
1760}
1761
1762#[cfg(test)]
1763mod tests {
1764 use super::*;
1765 use chrono::Utc;
1766 use relay_core_api::flow::{
1767 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1768 ResponseTiming, TransportProtocol,
1769 };
1770 use std::sync::atomic::{AtomicU64, Ordering};
1771 use tokio::time::{Duration, sleep};
1772 use url::Url;
1773 use uuid::Uuid;
1774
1775 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1776
1777 fn sqlite_url() -> String {
1778 let nanos = SystemTime::now()
1779 .duration_since(UNIX_EPOCH)
1780 .expect("clock drift")
1781 .as_nanos();
1782 let pid = std::process::id();
1783 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1784 let db_dir = std::env::current_dir()
1785 .expect("cwd")
1786 .join("target")
1787 .join("test-dbs");
1788 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1789 let db_path = db_dir.join(format!(
1790 "relay-core-runtime-test-{}-{}-{}.db",
1791 pid, nanos, seq
1792 ));
1793 format!("sqlite://{}?mode=rwc", db_path.display())
1794 }
1795
1796 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1797 let start_time =
1798 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1799 let request_url =
1800 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1801 Flow {
1802 id: Uuid::new_v4(),
1803 start_time,
1804 end_time: Some(start_time),
1805 network: NetworkInfo {
1806 client_ip: "127.0.0.1".to_string(),
1807 client_port: 12000,
1808 server_ip: "127.0.0.1".to_string(),
1809 server_port: 8080,
1810 protocol: TransportProtocol::TCP,
1811 tls: false,
1812 tls_version: None,
1813 sni: None,
1814 },
1815 layer: Layer::Http(HttpLayer {
1816 request: HttpRequest {
1817 method: method.to_string(),
1818 url: request_url,
1819 version: "HTTP/1.1".to_string(),
1820 headers: vec![],
1821 cookies: vec![],
1822 query: vec![],
1823 body: None,
1824 },
1825 response: Some(HttpResponse {
1826 status,
1827 status_text: "OK".to_string(),
1828 version: "HTTP/1.1".to_string(),
1829 headers: vec![],
1830 cookies: vec![],
1831 body: None,
1832 timing: ResponseTiming {
1833 time_to_first_byte: None,
1834 time_to_last_byte: None,
1835 connect_time_ms: None,
1836 ssl_time_ms: None,
1837 },
1838 }),
1839 error: None,
1840 }),
1841 tags: vec![],
1842 meta: std::collections::HashMap::new(),
1843 resilience_trace: None,
1844 rule_variables: std::collections::HashMap::new(),
1845 matched_rules: vec![],
1846 }
1847 }
1848
1849 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1850 let start_time =
1851 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1852 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1853 .expect("url should parse");
1854 Flow {
1855 id: Uuid::new_v4(),
1856 start_time,
1857 end_time: Some(start_time),
1858 network: NetworkInfo {
1859 client_ip: "127.0.0.1".to_string(),
1860 client_port: 12000,
1861 server_ip: "127.0.0.1".to_string(),
1862 server_port: 8080,
1863 protocol: TransportProtocol::TCP,
1864 tls: false,
1865 tls_version: None,
1866 sni: None,
1867 },
1868 layer: Layer::Http(HttpLayer {
1869 request: HttpRequest {
1870 method: "GET".to_string(),
1871 url: request_url,
1872 version: "HTTP/1.1".to_string(),
1873 headers: vec![
1874 (
1875 "Authorization".to_string(),
1876 "Bearer secret-token".to_string(),
1877 ),
1878 ("X-Normal".to_string(), "visible".to_string()),
1879 ],
1880 cookies: vec![],
1881 query: vec![
1882 ("token".to_string(), "abc123".to_string()),
1883 ("ok".to_string(), "1".to_string()),
1884 ],
1885 body: Some(BodyData {
1886 encoding: "utf-8".to_string(),
1887 content: "secret request body".to_string(),
1888 size: 19,
1889 }),
1890 },
1891 response: Some(HttpResponse {
1892 status: 200,
1893 status_text: "OK".to_string(),
1894 version: "HTTP/1.1".to_string(),
1895 headers: vec![
1896 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1897 ("X-Response".to_string(), "visible".to_string()),
1898 ],
1899 cookies: vec![],
1900 body: Some(BodyData {
1901 encoding: "utf-8".to_string(),
1902 content: "secret response body".to_string(),
1903 size: 20,
1904 }),
1905 timing: ResponseTiming {
1906 time_to_first_byte: None,
1907 time_to_last_byte: None,
1908 connect_time_ms: None,
1909 ssl_time_ms: None,
1910 },
1911 }),
1912 error: None,
1913 }),
1914 tags: vec![],
1915 meta: std::collections::HashMap::new(),
1916 resilience_trace: None,
1917 rule_variables: std::collections::HashMap::new(),
1918 matched_rules: vec![],
1919 }
1920 }
1921
1922 #[tokio::test]
1923 async fn set_rules_from_records_audit_event() {
1924 let state = CoreState::new(None).await;
1925
1926 state
1927 .set_rules_from(
1928 AuditActor::Http,
1929 "rule.upsert",
1930 "rule-1".to_string(),
1931 json!({ "route": "/api/v1/rules" }),
1932 Vec::new(),
1933 )
1934 .await
1935 .expect("set rules should succeed");
1936
1937 let events = state.recent_audit_events();
1938 let event = events.last().expect("audit event should exist");
1939 assert_eq!(event.actor, AuditActor::Http);
1940 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1941 assert_eq!(event.outcome, AuditOutcome::Success);
1942 assert_eq!(event.target, "rule-1");
1943 assert_eq!(event.details["operation"], "rule.upsert");
1944 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1945 }
1946
1947 #[tokio::test]
1948 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1949 let state = CoreState::new(None).await;
1950
1951 state
1952 .upsert_rule_from(
1953 AuditActor::Probe,
1954 "rule.upsert",
1955 "rule-1".to_string(),
1956 json!({ "tool": "set_rule" }),
1957 Rule {
1958 id: "rule-1".to_string(),
1959 name: "first".to_string(),
1960 active: true,
1961 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1962 priority: 1,
1963 termination: relay_core_lib::rule::RuleTermination::Continue,
1964 filter: relay_core_lib::rule::Filter::Url(
1965 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1966 ),
1967 actions: vec![],
1968 constraints: None,
1969 },
1970 )
1971 .await
1972 .expect("initial upsert should succeed");
1973
1974 state
1975 .upsert_rule_from(
1976 AuditActor::Probe,
1977 "rule.upsert",
1978 "rule-1".to_string(),
1979 json!({ "tool": "set_rule" }),
1980 Rule {
1981 id: "rule-1".to_string(),
1982 name: "second".to_string(),
1983 active: true,
1984 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1985 priority: 2,
1986 termination: relay_core_lib::rule::RuleTermination::Continue,
1987 filter: relay_core_lib::rule::Filter::Url(
1988 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1989 ),
1990 actions: vec![],
1991 constraints: None,
1992 },
1993 )
1994 .await
1995 .expect("replacement upsert should succeed");
1996
1997 let rules = state.get_rules().await;
1998 assert_eq!(rules.len(), 1);
1999 assert_eq!(rules[0].name, "second");
2000 let event = state
2001 .recent_audit_events()
2002 .last()
2003 .cloned()
2004 .expect("audit event");
2005 assert_eq!(event.details["operation"], "rule.upsert");
2006 assert_eq!(event.details["details"]["tool"], "set_rule");
2007 }
2008
2009 #[tokio::test]
2010 async fn delete_rule_from_returns_false_when_rule_missing() {
2011 let state = CoreState::new(None).await;
2012
2013 let deleted = state
2014 .delete_rule_from(
2015 AuditActor::Http,
2016 "rule.delete",
2017 "missing".to_string(),
2018 json!({ "route": "/api/v1/rules/{id}" }),
2019 "missing",
2020 )
2021 .await
2022 .expect("delete should not fail");
2023
2024 assert!(!deleted);
2025 assert!(state.recent_audit_events().is_empty());
2026 }
2027
2028 #[tokio::test]
2029 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2030 let state = CoreState::new(None).await;
2031
2032 let rule_id = state
2033 .create_mock_response_rule_from(
2034 AuditActor::Http,
2035 "api-mock-1".to_string(),
2036 json!({ "route": "/api/v1/mock", "status": 201 }),
2037 MockResponseRuleConfig {
2038 rule_id: "api-mock-1".to_string(),
2039 url_pattern: "example.com".to_string(),
2040 name: "api-mock:example.com".to_string(),
2041 status: 201,
2042 content_type: "application/json".to_string(),
2043 body: "{\"ok\":true}".to_string(),
2044 },
2045 )
2046 .await
2047 .expect("mock rule should be created");
2048
2049 assert_eq!(rule_id, "api-mock-1");
2050 let rules = state.get_rules().await;
2051 assert_eq!(rules.len(), 1);
2052 assert_eq!(rules[0].id, "api-mock-1");
2053 let event = state
2054 .recent_audit_events()
2055 .last()
2056 .cloned()
2057 .expect("audit event");
2058 assert_eq!(event.details["operation"], "rule.mock_create");
2059 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2060 }
2061
2062 #[tokio::test]
2063 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2064 let state = CoreState::new(None).await;
2065
2066 let rule_id = state
2067 .create_intercept_rule_from(
2068 AuditActor::Http,
2069 "intercept-1".to_string(),
2070 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2071 InterceptRuleConfig {
2072 rule_id: "intercept-1".to_string(),
2073 active: true,
2074 url_pattern: "example.com".to_string(),
2075 method: None,
2076 phase: "request".to_string(),
2077 name: "api-intercept:example.com".to_string(),
2078 priority: 100,
2079 termination: relay_core_lib::rule::RuleTermination::Stop,
2080 },
2081 )
2082 .await
2083 .expect("intercept rule should be created");
2084
2085 assert_eq!(rule_id, "intercept-1");
2086 let rules = state.get_rules().await;
2087 assert_eq!(rules.len(), 1);
2088 assert_eq!(rules[0].id, "intercept-1");
2089 assert_eq!(rules[0].name, "api-intercept:example.com");
2090 let event = state
2091 .recent_audit_events()
2092 .last()
2093 .cloned()
2094 .expect("audit event");
2095 assert_eq!(event.details["operation"], "rule.intercept_create");
2096 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2097 }
2098
2099 #[tokio::test]
2100 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2101 let state = CoreState::new(None).await;
2102
2103 state
2104 .upsert_legacy_intercept_rule_from(
2105 AuditActor::Tauri,
2106 "legacy-1".to_string(),
2107 json!({ "command": "set_intercept_rule" }),
2108 InterceptRule {
2109 id: "legacy-1".to_string(),
2110 active: true,
2111 url_pattern: "example.com".to_string(),
2112 method: None,
2113 phase: "both".to_string(),
2114 },
2115 )
2116 .await
2117 .expect("initial family upsert should succeed");
2118 assert_eq!(state.get_rules().await.len(), 2);
2119
2120 state
2121 .upsert_legacy_intercept_rule_from(
2122 AuditActor::Tauri,
2123 "legacy-1".to_string(),
2124 json!({ "command": "set_intercept_rule" }),
2125 InterceptRule {
2126 id: "legacy-1".to_string(),
2127 active: true,
2128 url_pattern: "example.org".to_string(),
2129 method: Some("POST".to_string()),
2130 phase: "request".to_string(),
2131 },
2132 )
2133 .await
2134 .expect("replacement family upsert should succeed");
2135
2136 let rules = state.get_rules().await;
2137 assert_eq!(rules.len(), 1);
2138 assert_eq!(rules[0].id, "legacy-1");
2139 let event = state
2140 .recent_audit_events()
2141 .last()
2142 .cloned()
2143 .expect("audit event");
2144 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2145 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2146 }
2147
2148 #[tokio::test]
2149 async fn resolve_intercept_failure_records_failed_audit_event() {
2150 let state = CoreState::new(None).await;
2151
2152 let result = state
2153 .resolve_intercept_with_modifications_from(
2154 AuditActor::Probe,
2155 "missing-flow:request".to_string(),
2156 "drop",
2157 None,
2158 )
2159 .await;
2160
2161 assert!(result.is_err());
2162 let events = state.recent_audit_events();
2163 let event = events.last().expect("audit event should exist");
2164 assert_eq!(event.actor, AuditActor::Probe);
2165 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2166 assert_eq!(event.outcome, AuditOutcome::Failed);
2167 assert_eq!(event.details["action"], "drop");
2168 assert!(
2169 event.details["error"]
2170 .as_str()
2171 .unwrap_or_default()
2172 .contains("Interception not found")
2173 );
2174 }
2175
2176 #[tokio::test]
2177 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2178 let state = CoreState::new(None).await;
2179 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2180
2181 state
2182 .prepare_start(8080, shutdown_tx)
2183 .expect("prepare start should succeed");
2184 let lifecycle = state.lifecycle();
2185 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2186 assert_eq!(lifecycle.port, Some(8080));
2187 assert!(lifecycle.started_at_ms.is_none());
2188 assert!(lifecycle.last_error.is_none());
2189
2190 assert_eq!(
2191 state.stop_proxy().expect("stop should succeed"),
2192 ProxyStopResult::Stopping
2193 );
2194 let lifecycle = state.lifecycle();
2195 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2196 assert_eq!(lifecycle.port, Some(8080));
2197 assert!(shutdown_rx.await.is_ok());
2198 }
2199
2200 #[tokio::test]
2201 async fn status_snapshot_derives_runtime_facing_fields() {
2202 let state = CoreState::new(None).await;
2203 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2204 state
2205 .prepare_start(8080, shutdown_tx)
2206 .expect("prepare start should succeed");
2207
2208 let status = state.status_snapshot();
2209 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2210 assert!(status.running);
2211 assert_eq!(status.port, Some(8080));
2212 assert!(status.uptime.is_none());
2213 assert!(status.last_error.is_none());
2214 }
2215
2216 #[tokio::test]
2217 async fn status_report_combines_status_and_metrics() {
2218 let state = CoreState::new(None).await;
2219 let report = state.status_report().await;
2220
2221 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2222 assert!(!report.status.running);
2223 assert_eq!(report.metrics.intercepts_pending, 0);
2224 assert_eq!(report.metrics.ws_pending_messages, 0);
2225 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2226 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2227 assert_eq!(report.metrics.audit_events_total, 0);
2228 assert_eq!(report.metrics.audit_events_failed, 0);
2229 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2230 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2231 }
2232
2233 #[test]
2234 fn proxy_config_new_and_transport_setters_preserve_values() {
2235 let config = ProxyConfig::new(
2236 8080,
2237 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2238 std::path::PathBuf::from("/tmp/ca_key.pem"),
2239 )
2240 .with_transparent(true)
2241 .with_udp_tproxy_port(Some(15000));
2242
2243 assert_eq!(config.port, 8080);
2244 assert_eq!(
2245 config.ca_cert_path,
2246 std::path::PathBuf::from("/tmp/ca_cert.pem")
2247 );
2248 assert_eq!(
2249 config.ca_key_path,
2250 std::path::PathBuf::from("/tmp/ca_key.pem")
2251 );
2252 assert!(config.transparent);
2253 assert_eq!(config.udp_tproxy_port, Some(15000));
2254 }
2255
2256 #[test]
2257 fn proxy_config_from_app_data_dir_creates_default_paths() {
2258 let unique = SystemTime::now()
2259 .duration_since(UNIX_EPOCH)
2260 .expect("clock drift")
2261 .as_nanos();
2262 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2263
2264 let config =
2265 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2266
2267 assert!(dir.exists());
2268 assert_eq!(config.port, 8899);
2269 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2270 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2271 assert!(!config.transparent);
2272 assert!(config.udp_tproxy_port.is_none());
2273 }
2274
2275 #[tokio::test]
2276 async fn intercept_snapshot_maps_pending_counts() {
2277 let state = CoreState::new(None).await;
2278 let snapshot = state.intercept_snapshot().await;
2279
2280 assert_eq!(snapshot.pending_count, 0);
2281 assert_eq!(snapshot.ws_pending_count, 0);
2282 }
2283
2284 #[tokio::test]
2285 async fn audit_snapshot_returns_latest_events_in_order() {
2286 let state = CoreState::new(None).await;
2287 state.record_audit_event(AuditEvent::new(
2288 AuditActor::Runtime,
2289 AuditEventKind::RuleChanged,
2290 "first",
2291 AuditOutcome::Success,
2292 json!({ "index": 1 }),
2293 ));
2294 state.record_audit_event(AuditEvent::new(
2295 AuditActor::Http,
2296 AuditEventKind::PolicyUpdated,
2297 "second",
2298 AuditOutcome::Success,
2299 json!({ "index": 2 }),
2300 ));
2301
2302 let snapshot = state.audit_snapshot(1);
2303
2304 assert_eq!(snapshot.events.len(), 1);
2305 assert_eq!(snapshot.events[0].target, "second");
2306 assert_eq!(snapshot.events[0].details["index"], 2);
2307 }
2308
2309 #[tokio::test]
2310 async fn query_audit_snapshot_filters_in_memory_events() {
2311 let state = CoreState::new(None).await;
2312 state.record_audit_event(AuditEvent::new(
2313 AuditActor::Http,
2314 AuditEventKind::RuleChanged,
2315 "rule-1",
2316 AuditOutcome::Success,
2317 json!({ "idx": 1 }),
2318 ));
2319 state.record_audit_event(AuditEvent::new(
2320 AuditActor::Probe,
2321 AuditEventKind::PolicyUpdated,
2322 "policy",
2323 AuditOutcome::Failed,
2324 json!({ "idx": 2 }),
2325 ));
2326
2327 let snapshot = state
2328 .query_audit_snapshot(CoreAuditQuery {
2329 actor: Some(AuditActor::Probe),
2330 kind: Some(AuditEventKind::PolicyUpdated),
2331 outcome: Some(AuditOutcome::Failed),
2332 limit: 10,
2333 ..Default::default()
2334 })
2335 .await;
2336
2337 assert_eq!(snapshot.events.len(), 1);
2338 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2339 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2340 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2341 }
2342
2343 #[tokio::test]
2344 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2345 let state = CoreState::new(Some(sqlite_url())).await;
2346 state.update_policy_from(
2347 AuditActor::Http,
2348 "policy".to_string(),
2349 ProxyPolicy {
2350 transparent_enabled: true,
2351 ..Default::default()
2352 },
2353 );
2354
2355 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2356 for _ in 0..10 {
2357 snapshot = state
2358 .query_audit_snapshot(CoreAuditQuery {
2359 actor: Some(AuditActor::Http),
2360 kind: Some(AuditEventKind::PolicyUpdated),
2361 limit: 10,
2362 ..Default::default()
2363 })
2364 .await;
2365 if !snapshot.events.is_empty() {
2366 break;
2367 }
2368 sleep(Duration::from_millis(20)).await;
2369 }
2370
2371 assert!(!snapshot.events.is_empty());
2372 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2373 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2374 }
2375
2376 #[tokio::test]
2377 async fn prepare_start_rejects_second_active_start() {
2378 let state = CoreState::new(None).await;
2379 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2380 state
2381 .prepare_start(8080, shutdown_tx)
2382 .expect("first start should succeed");
2383
2384 let (second_tx, _second_rx) = oneshot::channel();
2385 let error = state
2386 .prepare_start(8081, second_tx)
2387 .expect_err("second active start should be rejected");
2388 assert!(error.contains("already"));
2389 }
2390
2391 #[test]
2392 fn update_policy_records_audit_event() {
2393 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2394 let state = runtime.block_on(CoreState::new(None));
2395
2396 state.update_policy_from(
2397 AuditActor::Runtime,
2398 "policy".to_string(),
2399 ProxyPolicy {
2400 transparent_enabled: true,
2401 ..Default::default()
2402 },
2403 );
2404
2405 let events = state.recent_audit_events();
2406 let event = events.last().expect("audit event should exist");
2407 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2408 assert_eq!(event.outcome, AuditOutcome::Success);
2409 assert_eq!(event.details["transparent_enabled"], true);
2410 }
2411
2412 #[test]
2413 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2414 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2415 let state = runtime.block_on(CoreState::new(None));
2416 let original_timeout = state.policy_snapshot().request_timeout_ms;
2417
2418 state.patch_policy_from(
2419 AuditActor::Runtime,
2420 "policy.patch".to_string(),
2421 relay_core_api::policy::ProxyPolicyPatch {
2422 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2423 enabled: Some(true),
2424 redact_bodies: Some(true),
2425 ..Default::default()
2426 }),
2427 upstream: None,
2428 },
2429 );
2430
2431 let policy = state.policy_snapshot();
2432 assert_eq!(policy.request_timeout_ms, original_timeout);
2433 assert!(policy.redaction.enabled);
2434 assert!(policy.redaction.redact_bodies);
2435
2436 let events = state.recent_audit_events();
2437 let event = events.last().expect("audit event should exist");
2438 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2439 assert_eq!(event.details["redaction_enabled"], true);
2440 }
2441
2442 #[tokio::test]
2443 async fn metrics_include_audit_and_lagged_event_counters() {
2444 let state = CoreState::new(None).await;
2445
2446 state.update_policy_from(
2447 AuditActor::Runtime,
2448 "policy".to_string(),
2449 ProxyPolicy::default(),
2450 );
2451 let _ = state
2452 .resolve_intercept_with_modifications_from(
2453 AuditActor::Probe,
2454 "missing-flow:request".to_string(),
2455 "drop",
2456 None,
2457 )
2458 .await;
2459
2460 state.record_flow_events_lagged(3);
2461 state.record_audit_events_lagged(5);
2462
2463 let metrics = state.get_metrics().await;
2464 assert_eq!(metrics.audit_events_total, 2);
2465 assert_eq!(metrics.audit_events_failed, 1);
2466 assert_eq!(metrics.flow_events_lagged_total, 3);
2467 assert_eq!(metrics.audit_events_lagged_total, 5);
2468 }
2469
2470 #[tokio::test]
2471 async fn prometheus_metrics_text_contains_observability_fields() {
2472 let state = CoreState::new(None).await;
2473 state.record_flow_events_lagged(2);
2474 state.record_audit_events_lagged(4);
2475
2476 let text = state.get_metrics_prometheus_text().await;
2477 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2478 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2479 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2480 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2481 }
2482
2483 #[tokio::test]
2484 async fn search_flows_uses_store_with_offset_pagination() {
2485 let state = CoreState::new(Some(sqlite_url())).await;
2486 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2487 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2488 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2489
2490 state.upsert_flow(Box::new(flow_a));
2491 state.upsert_flow(Box::new(flow_b));
2492 state.upsert_flow(Box::new(flow_c));
2493
2494 let mut baseline = Vec::new();
2495 for _ in 0..20 {
2496 baseline = state
2497 .search_flows(FlowQuery {
2498 host: Some("api.example.com".to_string()),
2499 path_contains: None,
2500 method: None,
2501 status_min: None,
2502 status_max: None,
2503 has_error: None,
2504 is_websocket: None,
2505 limit: Some(3),
2506 offset: Some(0),
2507 })
2508 .await;
2509 if baseline.len() == 3 {
2510 break;
2511 }
2512 sleep(Duration::from_millis(20)).await;
2513 }
2514
2515 assert_eq!(baseline.len(), 3);
2516 let page = state
2517 .search_flows(FlowQuery {
2518 host: Some("api.example.com".to_string()),
2519 path_contains: None,
2520 method: None,
2521 status_min: None,
2522 status_max: None,
2523 has_error: None,
2524 is_websocket: None,
2525 limit: Some(1),
2526 offset: Some(1),
2527 })
2528 .await;
2529 assert_eq!(page.len(), 1);
2530 assert_eq!(page[0].id, baseline[1].id);
2531 }
2532
2533 #[tokio::test]
2534 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2535 let state = CoreState::new(Some(sqlite_url())).await;
2536 let first_flow = sample_http_flow(
2537 "persist.example.com",
2538 "/first",
2539 "GET",
2540 200,
2541 1_700_000_010_000,
2542 );
2543 let first_id = first_flow.id.to_string();
2544 state.upsert_flow(Box::new(first_flow));
2545 for i in 0..240 {
2546 state.upsert_flow(Box::new(sample_http_flow(
2547 "persist.example.com",
2548 &format!("/{}", i),
2549 "GET",
2550 200,
2551 1_700_000_020_000 + i,
2552 )));
2553 }
2554
2555 sleep(Duration::from_millis(200)).await;
2556
2557 let loaded = state.get_flow(first_id).await;
2558 assert!(loaded.is_some());
2559 }
2560
2561 #[tokio::test]
2562 async fn search_flows_redacts_summary_url_when_enabled() {
2563 let state = CoreState::new(None).await;
2564 state.update_policy_from(
2565 AuditActor::Runtime,
2566 "policy.redaction".to_string(),
2567 ProxyPolicy {
2568 redaction: RedactionPolicy {
2569 enabled: true,
2570 sensitive_query_keys: vec!["token".to_string()],
2571 redact_bodies: false,
2572 ..Default::default()
2573 },
2574 ..Default::default()
2575 },
2576 );
2577 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2578
2579 let mut items = Vec::new();
2580 for _ in 0..20 {
2581 items = state
2582 .search_flows(FlowQuery {
2583 host: Some("api.example.com".to_string()),
2584 path_contains: Some("/private".to_string()),
2585 ..Default::default()
2586 })
2587 .await;
2588 if !items.is_empty() {
2589 break;
2590 }
2591 sleep(Duration::from_millis(20)).await;
2592 }
2593
2594 assert!(!items.is_empty());
2595 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2596 let token = redacted
2597 .query_pairs()
2598 .find(|(k, _)| k == "token")
2599 .map(|(_, v)| v.to_string());
2600 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2601 }
2602
2603 #[tokio::test]
2604 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2605 let state = CoreState::new(Some(sqlite_url())).await;
2606 state.update_policy_from(
2607 AuditActor::Runtime,
2608 "policy.redaction".to_string(),
2609 ProxyPolicy {
2610 redaction: RedactionPolicy {
2611 enabled: true,
2612 sensitive_header_names: vec![
2613 "authorization".to_string(),
2614 "set-cookie".to_string(),
2615 ],
2616 sensitive_query_keys: vec!["token".to_string()],
2617 redact_bodies: true,
2618 },
2619 ..Default::default()
2620 },
2621 );
2622
2623 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2624 let flow_id = flow.id.to_string();
2625 state.upsert_flow(Box::new(flow));
2626 sleep(Duration::from_millis(80)).await;
2627
2628 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2629 let Layer::Http(http) = loaded.layer else {
2630 panic!("expected http layer");
2631 };
2632
2633 let auth = http
2634 .request
2635 .headers
2636 .iter()
2637 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2638 .map(|(_, v)| v.as_str());
2639 assert_eq!(auth, Some("[REDACTED]"));
2640
2641 let req_query_token = http
2642 .request
2643 .query
2644 .iter()
2645 .find(|(k, _)| k == "token")
2646 .map(|(_, v)| v.as_str());
2647 assert_eq!(req_query_token, Some("[REDACTED]"));
2648
2649 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2650 assert_eq!(req_body, Some("[REDACTED]"));
2651
2652 let response = http.response.expect("response should exist");
2653 let set_cookie = response
2654 .headers
2655 .iter()
2656 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2657 .map(|(_, v)| v.as_str());
2658 assert_eq!(set_cookie, Some("[REDACTED]"));
2659 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2660 assert_eq!(res_body, Some("[REDACTED]"));
2661 }
2662
2663 #[test]
2664 fn redact_flow_update_masks_http_body_when_enabled() {
2665 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2666 let state = runtime.block_on(CoreState::new(None));
2667 state.update_policy_from(
2668 AuditActor::Runtime,
2669 "policy.redaction".to_string(),
2670 ProxyPolicy {
2671 redaction: RedactionPolicy {
2672 enabled: true,
2673 redact_bodies: true,
2674 ..Default::default()
2675 },
2676 ..Default::default()
2677 },
2678 );
2679
2680 let update = FlowUpdate::HttpBody {
2681 flow_id: "f-1".to_string(),
2682 direction: relay_core_api::flow::Direction::ClientToServer,
2683 body: BodyData {
2684 encoding: "utf-8".to_string(),
2685 content: "super-secret".to_string(),
2686 size: 12,
2687 },
2688 };
2689 let redacted = state.redact_flow_update_for_output(update);
2690 match redacted {
2691 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2692 _ => panic!("expected http body update"),
2693 }
2694 }
2695
2696 #[cfg(feature = "script")]
2697 #[tokio::test]
2698 async fn load_script_from_records_audit_event() {
2699 let state = CoreState::new(None).await;
2700
2701 state
2702 .load_script_from(
2703 AuditActor::Tauri,
2704 "tauri.load_script".to_string(),
2705 "globalThis.onRequestHeaders = (_flow) => {};",
2706 )
2707 .await
2708 .expect("script should load");
2709
2710 let events = state.recent_audit_events();
2711 let event = events.last().expect("audit event should exist");
2712 assert_eq!(event.actor, AuditActor::Tauri);
2713 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2714 assert_eq!(event.outcome, AuditOutcome::Success);
2715 assert_eq!(event.target, "tauri.load_script");
2716 }
2717}