1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77 pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82 pub flows_total: usize,
83 pub flows_in_memory: usize,
84 pub flows_dropped: usize,
85 pub intercepts_pending: usize,
86 pub ws_pending_messages: usize,
87 pub oldest_intercept_age_ms: Option<u64>,
88 pub oldest_ws_message_age_ms: Option<u64>,
89 pub rule_exec_errors: usize,
90 pub audit_events_total: usize,
91 pub audit_events_failed: usize,
92 pub flow_events_lagged_total: usize,
93 pub audit_events_lagged_total: usize,
94 pub proxy_body_degraded_total: usize,
96 pub proxy_http_request_total: usize,
98 pub proxy_sandbox_reject_total: usize,
100 pub proxy_invalid_method_total: usize,
102 pub proxy_invalid_status_total: usize,
104 pub proxy_retry_total: usize,
106 pub proxy_stream_mode_tap_total: usize,
108 pub proxy_stream_mode_degrade_total: usize,
110}
111
112impl CoreMetrics {
113 pub fn to_prometheus_text(&self) -> String {
114 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
115 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
116 format!(
117 "relay_core_flows_total {}\n\
118relay_core_flows_in_memory {}\n\
119relay_core_flows_dropped_total {}\n\
120relay_core_intercepts_pending {}\n\
121relay_core_ws_pending_messages {}\n\
122relay_core_oldest_intercept_age_ms {}\n\
123relay_core_oldest_ws_message_age_ms {}\n\
124relay_core_rule_exec_errors_total {}\n\
125relay_core_audit_events_total {}\n\
126relay_core_audit_events_failed_total {}\n\
127relay_core_flow_events_lagged_total {}\n\
128relay_core_audit_events_lagged_total {}\n\
129relay_core_proxy_body_degraded_total {}\n\
130relay_core_proxy_http_request_total {}\n\
131relay_core_proxy_sandbox_reject_total {}\n\
132relay_core_proxy_invalid_method_total {}\n\
133relay_core_proxy_invalid_status_total {}\n\
134relay_core_proxy_retry_total {}\n\
135relay_core_proxy_stream_mode_tap_total {}\n\
136relay_core_proxy_stream_mode_degrade_total {}\n",
137 self.flows_total,
138 self.flows_in_memory,
139 self.flows_dropped,
140 self.intercepts_pending,
141 self.ws_pending_messages,
142 oldest_intercept_age_ms,
143 oldest_ws_message_age_ms,
144 self.rule_exec_errors,
145 self.audit_events_total,
146 self.audit_events_failed,
147 self.flow_events_lagged_total,
148 self.audit_events_lagged_total,
149 self.proxy_body_degraded_total,
150 self.proxy_http_request_total,
151 self.proxy_sandbox_reject_total,
152 self.proxy_invalid_method_total,
153 self.proxy_invalid_status_total,
154 self.proxy_retry_total,
155 self.proxy_stream_mode_tap_total,
156 self.proxy_stream_mode_degrade_total,
157 )
158 }
159}
160
161#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
162pub struct CoreStatusSnapshot {
163 pub phase: RuntimeLifecyclePhase,
164 pub running: bool,
165 pub port: Option<u16>,
166 pub uptime: Option<u64>,
167 pub last_error: Option<String>,
168}
169
170#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
171pub struct CoreStatusReport {
172 pub status: CoreStatusSnapshot,
173 pub metrics: CoreMetrics,
174}
175
176#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
177pub struct CoreInterceptSnapshot {
178 pub pending_count: usize,
179 pub ws_pending_count: usize,
180}
181
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
183pub struct CoreAuditSnapshot {
184 pub events: Vec<AuditEvent>,
185}
186
187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
188pub struct CoreAuditQuery {
189 pub since_ms: Option<u64>,
190 pub until_ms: Option<u64>,
191 pub actor: Option<AuditActor>,
192 pub kind: Option<AuditEventKind>,
193 pub outcome: Option<AuditOutcome>,
194 pub limit: usize,
195}
196
197impl Default for CoreAuditQuery {
198 fn default() -> Self {
199 Self {
200 since_ms: None,
201 until_ms: None,
202 actor: None,
203 kind: None,
204 outcome: None,
205 limit: 50,
206 }
207 }
208}
209
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
211#[serde(rename_all = "snake_case")]
212pub enum RuntimeLifecyclePhase {
213 Created,
214 Starting,
215 Running,
216 Stopping,
217 Stopped,
218 Failed,
219}
220
221impl RuntimeLifecyclePhase {
222 pub fn as_str(&self) -> &'static str {
223 match self {
224 Self::Created => "created",
225 Self::Starting => "starting",
226 Self::Running => "running",
227 Self::Stopping => "stopping",
228 Self::Stopped => "stopped",
229 Self::Failed => "failed",
230 }
231 }
232
233 pub fn is_active(&self) -> bool {
234 matches!(self, Self::Starting | Self::Running | Self::Stopping)
235 }
236}
237
238#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
239pub struct RuntimeLifecycle {
240 pub phase: RuntimeLifecyclePhase,
241 pub port: Option<u16>,
242 pub started_at_ms: Option<u64>,
243 pub last_error: Option<String>,
244}
245
246impl RuntimeLifecycle {
247 pub fn created() -> Self {
248 Self {
249 phase: RuntimeLifecyclePhase::Created,
250 port: None,
251 started_at_ms: None,
252 last_error: None,
253 }
254 }
255
256 pub fn is_active(&self) -> bool {
257 self.phase.is_active()
258 }
259
260 pub fn uptime_seconds(&self) -> Option<u64> {
261 let started_at_ms = self.started_at_ms?;
262 let now_ms = now_unix_ms();
263 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
264 }
265}
266
267pub enum ProxySpawnResult {
268 Started(tokio::task::JoinHandle<()>),
269 AlreadyRunning,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub enum ProxyStopResult {
274 Stopping,
275 NotRunning,
276}
277
278impl From<RuntimeLifecycle> for CoreStatusSnapshot {
279 fn from(lifecycle: RuntimeLifecycle) -> Self {
280 Self {
281 running: lifecycle.is_active(),
282 uptime: lifecycle.uptime_seconds(),
283 port: lifecycle.port,
284 last_error: lifecycle.last_error,
285 phase: lifecycle.phase,
286 }
287 }
288}
289
290pub struct CoreState {
291 flow_store: mpsc::Sender<FlowStoreMessage>,
292 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
293 rule_store: mpsc::Sender<RuleStoreMessage>,
294 store: Option<Store>,
295 #[cfg(feature = "script")]
296 pub script_interceptor: Arc<ScriptInterceptor>,
297 pub policy_tx: watch::Sender<ProxyPolicy>,
298 pub flows_dropped: Arc<AtomicUsize>,
299 audit_events_total: Arc<AtomicUsize>,
300 audit_events_failed: Arc<AtomicUsize>,
301 flow_events_lagged_total: Arc<AtomicUsize>,
302 audit_events_lagged_total: Arc<AtomicUsize>,
303 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
305 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
306 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
307 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
308 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
309}
310
311impl CoreState {
312 pub async fn new(db_url: Option<String>) -> Self {
313 const AUDIT_HISTORY_LIMIT: usize = 200;
314
315 let store = if let Some(url) = db_url {
316 match Store::connect(&url).await {
317 Ok(s) => {
318 if let Err(e) = s.init().await {
319 tracing::error!("Failed to init store: {}", e);
320 }
321 Some(s)
322 }
323 Err(e) => {
324 tracing::error!("Failed to connect to store: {}", e);
325 None
326 }
327 }
328 } else {
329 None
330 };
331
332 let (flow_tx, flow_rx) = mpsc::channel(10000);
333 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
334 tokio::spawn(flow_actor.run());
335
336 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
337 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
338 tokio::spawn(intercept_actor.run());
339
340 let (rule_tx, rule_rx) = mpsc::channel(100);
341 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
342 tokio::spawn(rule_actor.run());
343
344 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
345 let (flow_broadcast_tx, _) = broadcast::channel(1000);
346 let (audit_broadcast_tx, _) = broadcast::channel(256);
347 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
348
349 #[cfg(feature = "script")]
350 let script_interceptor = ScriptInterceptor::new()
351 .await
352 .expect("Failed to initialize ScriptInterceptor");
353 Self {
354 flow_store: flow_tx,
355 intercept_broker: intercept_tx,
356 rule_store: rule_tx,
357 store,
358 #[cfg(feature = "script")]
359 script_interceptor: Arc::new(script_interceptor),
360 policy_tx,
361 flows_dropped: Arc::new(AtomicUsize::new(0)),
362 audit_events_total: Arc::new(AtomicUsize::new(0)),
363 audit_events_failed: Arc::new(AtomicUsize::new(0)),
364 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
365 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
366 flow_broadcast_tx,
367 audit_broadcast_tx,
368 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
369 lifecycle_tx,
370 shutdown_tx: Mutex::new(None),
371 }
372 }
373
374 pub async fn get_metrics(&self) -> CoreMetrics {
375 let (flow_tx, flow_rx) = oneshot::channel();
376 let _ = self
377 .flow_store
378 .send(FlowStoreMessage::GetMetrics(flow_tx))
379 .await;
380 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
381
382 let (int_tx, int_rx) = oneshot::channel();
383 let _ = self
384 .intercept_broker
385 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
386 .await;
387 let (
388 intercepts_pending,
389 ws_pending_messages,
390 oldest_intercept_age_ms,
391 oldest_ws_message_age_ms,
392 ) = int_rx.await.unwrap_or((0, 0, None, None));
393
394 let (rule_tx, rule_rx) = oneshot::channel();
395 let _ = self
396 .rule_store
397 .send(RuleStoreMessage::GetMetrics(rule_tx))
398 .await;
399 let rule_exec_errors = rule_rx.await.unwrap_or(0);
400
401 CoreMetrics {
402 flows_total,
403 flows_in_memory,
404 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
405 intercepts_pending,
406 ws_pending_messages,
407 oldest_intercept_age_ms,
408 oldest_ws_message_age_ms,
409 rule_exec_errors,
410 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
411 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
412 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
413 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
414 proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
415 proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
416 proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
417 proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
418 proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
419 proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
420 proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
421 proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
422 ),
423 }
424 }
425
426 pub async fn get_metrics_prometheus_text(&self) -> String {
427 let mut text = self.get_metrics().await.to_prometheus_text();
428 #[cfg(feature = "script")]
429 {
430 text.push_str(&self.script_interceptor.metrics.prometheus_lines());
431 }
432 text
433 }
434
435 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
436 self.lifecycle().into()
437 }
438
439 pub async fn status_report(&self) -> CoreStatusReport {
440 CoreStatusReport {
441 status: self.status_snapshot(),
442 metrics: self.get_metrics().await,
443 }
444 }
445
446 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
447 let metrics = self.get_metrics().await;
448 CoreInterceptSnapshot {
449 pending_count: metrics.intercepts_pending,
450 ws_pending_count: metrics.ws_pending_messages,
451 }
452 }
453
454 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
455 let events = self.recent_audit_events();
456 let start = events.len().saturating_sub(limit);
457 CoreAuditSnapshot {
458 events: events.into_iter().skip(start).collect(),
459 }
460 }
461
462 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
463 let limit = query.limit.clamp(1, 500);
464 if let Some(store) = &self.store {
465 let rows = store
466 .query_audit_events(
467 query.since_ms,
468 query.until_ms,
469 query.actor.as_ref().map(AuditActor::as_str),
470 query.kind.as_ref().map(AuditEventKind::as_str),
471 query.outcome.as_ref().map(AuditOutcome::as_str),
472 limit,
473 )
474 .await
475 .unwrap_or_default();
476
477 let mut events = Vec::with_capacity(rows.len());
478 for row in rows {
479 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
480 events.push(event);
481 }
482 }
483 return CoreAuditSnapshot { events };
484 }
485
486 let mut events = self.recent_audit_events();
487 events.reverse();
488 let filtered = events
489 .into_iter()
490 .filter(|event| {
491 query
492 .since_ms
493 .map(|v| event.timestamp_ms >= v)
494 .unwrap_or(true)
495 })
496 .filter(|event| {
497 query
498 .until_ms
499 .map(|v| event.timestamp_ms <= v)
500 .unwrap_or(true)
501 })
502 .filter(|event| {
503 query
504 .actor
505 .as_ref()
506 .map(|v| &event.actor == v)
507 .unwrap_or(true)
508 })
509 .filter(|event| {
510 query
511 .kind
512 .as_ref()
513 .map(|v| &event.kind == v)
514 .unwrap_or(true)
515 })
516 .filter(|event| {
517 query
518 .outcome
519 .as_ref()
520 .map(|v| &event.outcome == v)
521 .unwrap_or(true)
522 })
523 .take(limit)
524 .collect();
525 CoreAuditSnapshot { events: filtered }
526 }
527
528 pub async fn get_flow(&self, id: String) -> Option<Flow> {
529 let (tx, rx) = oneshot::channel();
530 if let Err(e) = self
531 .flow_store
532 .send(FlowStoreMessage::GetFlow {
533 id: id.clone(),
534 respond_to: tx,
535 })
536 .await
537 {
538 error!("Failed to send GetFlow request: {}", e);
539 if let Some(store) = &self.store {
540 return store
541 .load_flow(&id)
542 .await
543 .ok()
544 .flatten()
545 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
546 }
547 return None;
548 }
549 let flow = rx
550 .await
551 .map_err(|e| {
552 error!("Failed to receive Flow response: {}", e);
553 e
554 })
555 .unwrap_or(None);
556 if let Some(flow) = flow {
557 return Some(redact_flow(flow, &self.current_redaction_policy()));
558 }
559 if let Some(store) = &self.store {
560 return store
561 .load_flow(&id)
562 .await
563 .ok()
564 .flatten()
565 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
566 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
567 }
568 None
569 }
570
571 pub async fn get_rules(&self) -> Vec<Rule> {
572 let (tx, rx) = oneshot::channel();
573 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
574 error!("Failed to send GetRules request: {}", e);
575 return Vec::new();
576 }
577 rx.await
578 .map_err(|e| {
579 error!("Failed to receive Rules response: {}", e);
580 e
581 })
582 .unwrap_or_default()
583 }
584
585 pub async fn set_rules(&self, rules: Vec<Rule>) {
586 let _ = self
587 .set_rules_from(
588 AuditActor::Runtime,
589 "rules.replace",
590 "rule_set".to_string(),
591 json!({}),
592 rules,
593 )
594 .await;
595 }
596
597 pub async fn upsert_rule_from(
598 &self,
599 actor: AuditActor,
600 operation: &str,
601 target: String,
602 details: serde_json::Value,
603 rule: Rule,
604 ) -> Result<(), String> {
605 let rule_id = rule.id.clone();
606 let mut rules = self.get_rules().await;
607 rules.retain(|existing| existing.id != rule_id);
608 rules.push(rule);
609 self.set_rules_from(actor, operation, target, details, rules)
610 .await
611 }
612
613 pub async fn delete_rule_from(
614 &self,
615 actor: AuditActor,
616 operation: &str,
617 target: String,
618 details: serde_json::Value,
619 rule_id: &str,
620 ) -> Result<bool, String> {
621 let mut rules = self.get_rules().await;
622 let before = rules.len();
623 rules.retain(|rule| rule.id != rule_id);
624 if rules.len() == before {
625 return Ok(false);
626 }
627 self.set_rules_from(actor, operation, target, details, rules)
628 .await
629 .map(|_| true)
630 }
631
632 pub async fn create_mock_response_rule_from(
633 &self,
634 actor: AuditActor,
635 target: String,
636 details: serde_json::Value,
637 config: MockResponseRuleConfig,
638 ) -> Result<String, String> {
639 let rule_id = config.rule_id.clone();
640 let rule = build_mock_response_rule(config);
641 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
642 .await
643 .map(|_| rule_id)
644 }
645
646 pub async fn create_intercept_rule_from(
647 &self,
648 actor: AuditActor,
649 target: String,
650 details: serde_json::Value,
651 config: InterceptRuleConfig,
652 ) -> Result<String, String> {
653 let rule_id = config.rule_id.clone();
654 let mut rules = self.get_rules().await;
655 rules.extend(build_intercept_rules(config));
656 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
657 .await
658 .map(|_| rule_id)
659 }
660
661 pub async fn upsert_legacy_intercept_rule_from(
662 &self,
663 actor: AuditActor,
664 target: String,
665 details: serde_json::Value,
666 rule: InterceptRule,
667 ) -> Result<(), String> {
668 let rule_id = rule.id.clone();
669 let mut rules = self.get_rules().await;
670 rules.retain(|existing| {
671 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
672 });
673 rules.extend(rule.to_rules());
674 self.set_rules_from(
675 actor,
676 "rule.intercept_legacy_upsert",
677 target,
678 details,
679 rules,
680 )
681 .await
682 }
683
684 pub async fn set_rules_from(
685 &self,
686 actor: AuditActor,
687 operation: &str,
688 target: String,
689 details: serde_json::Value,
690 rules: Vec<Rule>,
691 ) -> Result<(), String> {
692 let rule_count = rules.len();
693 if let Err(e) = self
694 .rule_store
695 .send(RuleStoreMessage::SetRules(rules))
696 .await
697 {
698 error!("Failed to send SetRules request: {}", e);
699 self.record_audit_event(AuditEvent::new(
700 actor,
701 AuditEventKind::RuleChanged,
702 target,
703 AuditOutcome::Failed,
704 json!({
705 "operation": operation,
706 "rule_count": rule_count,
707 "details": details,
708 "error": e.to_string()
709 }),
710 ));
711 return Err(e.to_string());
712 }
713
714 self.record_audit_event(AuditEvent::new(
715 actor,
716 AuditEventKind::RuleChanged,
717 target,
718 AuditOutcome::Success,
719 json!({
720 "operation": operation,
721 "rule_count": rule_count,
722 "details": details
723 }),
724 ));
725 Ok(())
726 }
727
728 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
729 let mut new_rules = Vec::new();
730 for rule in rules {
731 new_rules.extend(rule.to_rules());
732 }
733 self.set_rules(new_rules).await;
734 }
735
736 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
737 let (tx, rx) = oneshot::channel();
738 if let Err(e) = self
739 .rule_store
740 .send(RuleStoreMessage::GetRuleEngine(tx))
741 .await
742 {
743 error!("Failed to send GetRuleEngine request: {}", e);
744 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
745 }
746 rx.await
747 .map_err(|e| {
748 error!("Failed to receive RuleEngine response: {}", e);
749 e
750 })
751 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
752 }
753
754 pub fn update_policy(&self, policy: ProxyPolicy) {
755 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
756 }
757
758 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
759 let mut policy = self.policy_snapshot();
760 policy.apply_patch(patch);
761 self.update_policy_from(actor, target, policy);
762 }
763
764 pub fn policy_snapshot(&self) -> ProxyPolicy {
765 self.policy_tx.borrow().clone()
766 }
767
768 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
769 let details = json!({
770 "strict_http_semantics": policy.strict_http_semantics,
771 "request_timeout_ms": policy.request_timeout_ms,
772 "max_body_size": policy.max_body_size,
773 "transparent_enabled": policy.transparent_enabled,
774 "redaction_enabled": policy.redaction.enabled,
775 "redact_bodies": policy.redaction.redact_bodies
776 });
777
778 self.policy_tx.send_replace(policy);
779 self.record_audit_event(AuditEvent::new(
780 actor,
781 AuditEventKind::PolicyUpdated,
782 target,
783 AuditOutcome::Success,
784 details,
785 ));
786 }
787
788 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
789 if let Err(e) = self
790 .intercept_broker
791 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
792 .await
793 {
794 error!("Failed to send RegisterIntercept request: {}", e);
795 }
796 }
797
798 pub async fn resolve_intercept(
799 &self,
800 key: String,
801 result: InterceptionResult,
802 ) -> Result<(), String> {
803 let (tx, rx) = oneshot::channel();
804 if let Err(e) = self
805 .intercept_broker
806 .send(InterceptBrokerMessage::ResolveIntercept {
807 key,
808 result,
809 respond_to: tx,
810 })
811 .await
812 {
813 error!("Failed to send ResolveIntercept request: {}", e);
814 return Err(e.to_string());
815 }
816 rx.await.map_err(|_| "Actor dropped".to_string())?
817 }
818
819 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
820 let (tx, rx) = oneshot::channel();
821 if let Err(e) = self
822 .intercept_broker
823 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
824 key,
825 respond_to: tx,
826 })
827 .await
828 {
829 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
830 return None;
831 }
832 rx.await
833 .map_err(|e| {
834 error!("Failed to receive WebSocketMessage response: {}", e);
835 e
836 })
837 .unwrap_or(None)
838 }
839
840 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
841 if let Err(e) = self
842 .intercept_broker
843 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
844 .await
845 {
846 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
847 }
848 }
849
850 pub async fn is_intercept_pending(&self, key: String) -> bool {
851 let (tx, rx) = oneshot::channel();
852 if let Err(e) = self
853 .intercept_broker
854 .send(InterceptBrokerMessage::GetPendingIntercept {
855 key,
856 respond_to: tx,
857 })
858 .await
859 {
860 error!("Failed to send GetPendingIntercept request: {}", e);
861 return false;
862 }
863 rx.await
864 .map_err(|e| {
865 error!("Failed to receive PendingIntercept response: {}", e);
866 e
867 })
868 .unwrap_or(false)
869 }
870
871 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
872 let (tx, rx) = oneshot::channel();
873 if let Err(e) = self
874 .intercept_broker
875 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
876 flow_id,
877 respond_to: tx,
878 })
879 .await
880 {
881 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
882 return false;
883 }
884 rx.await
885 .map_err(|e| {
886 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
887 e
888 })
889 .unwrap_or(false)
890 }
891
892 pub fn upsert_flow(&self, flow: Box<Flow>) {
893 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
894 error!("FlowStore dropped flow: {}", e);
896 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
897 }
898 }
899
900 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
901 if let Err(e) = self
902 .flow_store
903 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
904 {
905 error!("FlowStore dropped WS message: {}", e);
906 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
907 }
908 }
909
910 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
911 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
912 flow_id,
913 body,
914 direction,
915 }) {
916 error!("FlowStore dropped HTTP body: {}", e);
917 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
918 }
919 }
920
921 pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
923 if let Err(e) = self
924 .flow_store
925 .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
926 {
927 error!("FlowStore dropped budget-exceeded tag: {}", e);
928 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
929 }
930 }
931
932 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
934 self.flow_broadcast_tx.subscribe()
935 }
936
937 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
938 self.audit_broadcast_tx.subscribe()
939 }
940
941 fn current_redaction_policy(&self) -> RedactionPolicy {
942 self.policy_tx.borrow().redaction.clone()
943 }
944
945 pub fn record_flow_events_lagged(&self, skipped: u64) {
946 self.flow_events_lagged_total
947 .fetch_add(skipped as usize, Ordering::Relaxed);
948 }
949
950 pub fn record_audit_events_lagged(&self, skipped: u64) {
951 self.audit_events_lagged_total
952 .fetch_add(skipped as usize, Ordering::Relaxed);
953 }
954
955 pub fn lifecycle(&self) -> RuntimeLifecycle {
956 self.lifecycle_tx.borrow().clone()
957 }
958
959 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
960 self.lifecycle_tx.subscribe()
961 }
962
963 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
964 let current = self.lifecycle();
965 if current.is_active() {
966 return Err(format!(
967 "Proxy is already {} on port {}",
968 current.phase.as_str(),
969 current.port.unwrap_or(port)
970 ));
971 }
972
973 let mut guard = self
974 .shutdown_tx
975 .lock()
976 .map_err(|_| "shutdown state poisoned".to_string())?;
977 *guard = Some(shutdown_tx);
978 drop(guard);
979
980 self.update_lifecycle(RuntimeLifecycle {
981 phase: RuntimeLifecyclePhase::Starting,
982 port: Some(port),
983 started_at_ms: None,
984 last_error: None,
985 });
986 Ok(())
987 }
988
989 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
990 let mut guard = self
991 .shutdown_tx
992 .lock()
993 .map_err(|_| "shutdown state poisoned".to_string())?;
994 let Some(tx) = guard.take() else {
995 return Ok(ProxyStopResult::NotRunning);
996 };
997 drop(guard);
998
999 let current = self.lifecycle();
1000 self.update_lifecycle(RuntimeLifecycle {
1001 phase: RuntimeLifecyclePhase::Stopping,
1002 port: current.port,
1003 started_at_ms: current.started_at_ms,
1004 last_error: current.last_error,
1005 });
1006 let _ = tx.send(());
1007 Ok(ProxyStopResult::Stopping)
1008 }
1009
1010 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1011 self.audit_history
1012 .lock()
1013 .map(|events| events.iter().cloned().collect())
1014 .unwrap_or_default()
1015 }
1016
1017 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1019 let redaction = self.current_redaction_policy();
1020 if let Some(store) = &self.store {
1021 return store
1022 .query_flow_summaries(&query)
1023 .await
1024 .unwrap_or_default()
1025 .into_iter()
1026 .map(|summary| redact_flow_summary(summary, &redaction))
1027 .collect();
1028 }
1029 let (tx, rx) = oneshot::channel();
1030 if let Err(e) = self
1031 .flow_store
1032 .send(FlowStoreMessage::SearchFlows {
1033 query,
1034 respond_to: tx,
1035 })
1036 .await
1037 {
1038 error!("Failed to send SearchFlows request: {}", e);
1039 return Vec::new();
1040 }
1041 rx.await
1042 .unwrap_or_default()
1043 .into_iter()
1044 .map(|summary| redact_flow_summary(summary, &redaction))
1045 .collect()
1046 }
1047
1048 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1049 let redaction = self.current_redaction_policy();
1050 redact_flow_update(update, &redaction)
1051 }
1052
1053 pub async fn resolve_intercept_with_modifications(
1063 &self,
1064 key: String,
1065 action: &str,
1066 mods: Option<relay_core_api::modification::FlowModification>,
1067 ) -> Result<(), String> {
1068 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1069 .await
1070 }
1071
1072 pub async fn resolve_intercept_with_modifications_from(
1073 &self,
1074 actor: AuditActor,
1075 key: String,
1076 action: &str,
1077 mods: Option<relay_core_api::modification::FlowModification>,
1078 ) -> Result<(), String> {
1079 let modified_fields = modification_field_names(mods.as_ref());
1080 let result = match action {
1081 "drop" => InterceptionResult::Drop,
1082 _ => match mods {
1083 None => InterceptionResult::Continue,
1084 Some(m) => {
1085 let parts: Vec<&str> = key.splitn(4, ':').collect();
1086 match parts.as_slice() {
1087 [flow_id, phase] => {
1088 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1089 modification::apply_flow_modification(&flow, phase, m)
1090 } else {
1091 InterceptionResult::Continue
1092 }
1093 }
1094 [_, "ws_msg", _] => {
1097 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1098 modification::apply_ws_modification(&msg, m)
1099 } else {
1100 InterceptionResult::Continue
1101 }
1102 }
1103 _ => InterceptionResult::Continue,
1104 }
1105 }
1106 },
1107 };
1108 let outcome = self.resolve_intercept(key.clone(), result).await;
1109 let audit_outcome = if outcome.is_ok() {
1110 AuditOutcome::Success
1111 } else {
1112 AuditOutcome::Failed
1113 };
1114 let error_message = outcome.as_ref().err().cloned();
1115
1116 self.record_audit_event(AuditEvent::new(
1117 actor,
1118 AuditEventKind::InterceptResolved,
1119 key,
1120 audit_outcome,
1121 json!({
1122 "action": action,
1123 "has_modifications": !modified_fields.is_empty(),
1124 "modified_fields": modified_fields,
1125 "error": error_message
1126 }),
1127 ));
1128 outcome
1129 }
1130
1131 #[cfg(feature = "script")]
1132 pub async fn load_script_from(
1133 &self,
1134 actor: AuditActor,
1135 target: String,
1136 script: &str,
1137 ) -> Result<(), String> {
1138 let result = self
1139 .script_interceptor
1140 .load_script(script)
1141 .await
1142 .map_err(|e| e.to_string());
1143
1144 let outcome = if result.is_ok() {
1145 AuditOutcome::Success
1146 } else {
1147 AuditOutcome::Failed
1148 };
1149 let error_message = result.as_ref().err().cloned();
1150
1151 self.record_audit_event(AuditEvent::new(
1152 actor,
1153 AuditEventKind::ScriptReloaded,
1154 target,
1155 outcome,
1156 json!({
1157 "script_bytes": script.len(),
1158 "error": error_message
1159 }),
1160 ));
1161
1162 result
1163 }
1164
1165 #[cfg(feature = "script")]
1167 pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1168 self.script_interceptor.set_env_allow(env_allow).await;
1169 }
1170
1171 fn record_audit_event(&self, event: AuditEvent) {
1172 const AUDIT_HISTORY_LIMIT: usize = 200;
1173 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1174 if event.outcome == AuditOutcome::Failed {
1175 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1176 }
1177
1178 let details = event.details.to_string();
1179 tracing::info!(
1180 target: "relay_core_audit",
1181 event_id = %event.id,
1182 actor = %event.actor.as_str(),
1183 kind = %event.kind.as_str(),
1184 target = %event.target,
1185 outcome = %event.outcome.as_str(),
1186 details = %details
1187 );
1188
1189 if let Ok(mut history) = self.audit_history.lock() {
1190 if history.len() >= AUDIT_HISTORY_LIMIT {
1191 history.pop_front();
1192 }
1193 history.push_back(event.clone());
1194 }
1195
1196 if let Some(store) = self.store.clone() {
1197 let event_json = serde_json::to_value(&event).unwrap_or_default();
1198 let event_id = event.id.clone();
1199 let timestamp_ms = event.timestamp_ms;
1200 let actor = event.actor.as_str().to_string();
1201 let kind = event.kind.as_str().to_string();
1202 let target = event.target.clone();
1203 let outcome = event.outcome.as_str().to_string();
1204 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1205 handle.spawn(async move {
1206 if let Err(e) = store
1207 .save_audit_event(AuditEventRecord {
1208 id: &event_id,
1209 timestamp_ms,
1210 actor: &actor,
1211 kind: &kind,
1212 target: &target,
1213 outcome: &outcome,
1214 content: &event_json,
1215 })
1216 .await
1217 {
1218 tracing::error!("Failed to persist audit event: {}", e);
1219 }
1220 });
1221 }
1222 }
1223
1224 let _ = self.audit_broadcast_tx.send(event);
1225 }
1226
1227 fn transition_to_running(&self, port: u16) {
1228 self.update_lifecycle(RuntimeLifecycle {
1229 phase: RuntimeLifecyclePhase::Running,
1230 port: Some(port),
1231 started_at_ms: Some(now_unix_ms()),
1232 last_error: None,
1233 });
1234 }
1235
1236 fn transition_to_stopped(&self) {
1237 if let Ok(mut guard) = self.shutdown_tx.lock() {
1238 *guard = None;
1239 }
1240
1241 self.update_lifecycle(RuntimeLifecycle {
1242 phase: RuntimeLifecyclePhase::Stopped,
1243 port: None,
1244 started_at_ms: None,
1245 last_error: None,
1246 });
1247 }
1248
1249 fn transition_to_failed(&self, port: u16, error: String) {
1250 if let Ok(mut guard) = self.shutdown_tx.lock() {
1251 *guard = None;
1252 }
1253
1254 self.update_lifecycle(RuntimeLifecycle {
1255 phase: RuntimeLifecyclePhase::Failed,
1256 port: Some(port),
1257 started_at_ms: None,
1258 last_error: Some(error),
1259 });
1260 }
1261
1262 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1263 tracing::info!(
1264 target: "relay_core_lifecycle",
1265 phase = %lifecycle.phase.as_str(),
1266 port = ?lifecycle.port,
1267 started_at_ms = ?lifecycle.started_at_ms,
1268 last_error = ?lifecycle.last_error
1269 );
1270 self.lifecycle_tx.send_replace(lifecycle);
1271 }
1272
1273 pub fn spawn_proxy(
1274 self: &Arc<Self>,
1275 config: ProxyConfig,
1276 sink: mpsc::Sender<FlowUpdate>,
1277 extra_interceptor: Option<Arc<dyn Interceptor>>,
1278 ) -> Result<ProxySpawnResult, String> {
1279 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1280 match self.prepare_start(config.port, shutdown_tx) {
1281 Ok(()) => {}
1282 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1283 Err(error) => return Err(error),
1284 }
1285 let state = self.clone();
1286 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1287 if let Err(error) = state
1288 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1289 .await
1290 {
1291 error!("Proxy failed: {}", error);
1292 }
1293 })))
1294 }
1295
1296 pub async fn start_proxy(
1297 self: &Arc<Self>,
1298 config: ProxyConfig,
1299 sink: mpsc::Sender<FlowUpdate>,
1300 extra_interceptor: Option<Arc<dyn Interceptor>>,
1301 ) -> Result<(), String> {
1302 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1303 self.prepare_start(config.port, shutdown_tx)?;
1304 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1305 .await
1306 }
1307
1308 async fn run_proxy(
1309 self: &Arc<Self>,
1310 config: ProxyConfig,
1311 sink: mpsc::Sender<FlowUpdate>,
1312 extra_interceptor: Option<Arc<dyn Interceptor>>,
1313 shutdown_rx: oneshot::Receiver<()>,
1314 ) -> Result<(), String> {
1315 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1316 let state = self.clone();
1317
1318 if let Some(parent) = config.ca_cert_path.parent()
1319 && !parent.exists()
1320 {
1321 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1322 }
1323
1324 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1325 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1326 let ca = Arc::new(ca);
1327
1328 #[cfg(feature = "script")]
1329 let script_interceptor = self.script_interceptor.clone();
1330
1331 #[cfg(feature = "script")]
1332 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1333
1334 #[cfg(not(feature = "script"))]
1335 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1336
1337 interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1338 self.clone(),
1339 )));
1340
1341 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1342 self.clone(),
1343 )));
1344
1345 if let Some(interceptor) = extra_interceptor {
1346 interceptors.push(interceptor);
1347 }
1348
1349 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1350 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1351
1352 tokio::spawn(async move {
1353 while let Some(update) = proxy_rx.recv().await {
1354 match update.clone() {
1355 FlowUpdate::Full(flow) => {
1356 state.upsert_flow(flow);
1357 }
1358 FlowUpdate::WebSocketMessage { flow_id, message } => {
1359 state.append_ws_message(flow_id, message);
1360 }
1361 FlowUpdate::HttpBody {
1362 flow_id,
1363 direction,
1364 body,
1365 } => {
1366 state.update_http_body(flow_id, body, direction);
1367 }
1368 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1369 state.tag_flow_budget_exceeded(flow_id, direction);
1371 }
1372 }
1373
1374 let _ = state.flow_broadcast_tx.send(update.clone());
1375
1376 if sink.try_send(update).is_err() {
1377 relay_core_lib::metrics::inc_flows_dropped();
1378 }
1379 }
1380 });
1381
1382 if let Some(udp_port) = config.udp_tproxy_port {
1383 let udp_proxy_tx = proxy_tx.clone();
1384 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1385 tokio::spawn(async move {
1386 match tokio::net::UdpSocket::bind(udp_addr).await {
1387 Ok(socket) => {
1388 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1389 if let Err(e) = proxy.run(udp_proxy_tx).await {
1390 error!("UDP TPROXY failed: {}", e);
1391 }
1392 }
1393 Err(e) => {
1394 error!("Failed to bind UDP TPROXY socket: {}", e);
1395 }
1396 }
1397 });
1398 }
1399
1400 let listener = match TcpListener::bind(addr).await {
1401 Ok(listener) => listener,
1402 Err(e) => {
1403 if let Ok(mut guard) = self.shutdown_tx.lock() {
1404 *guard = None;
1405 }
1406 let message = format!("Failed to bind to address {}: {}", addr, e);
1407 self.transition_to_failed(config.port, message.clone());
1408 return Err(message);
1409 }
1410 };
1411
1412 self.transition_to_running(config.port);
1413 let shutdown_rx = Some(shutdown_rx);
1414
1415 if config.transparent {
1416 let policy = ProxyPolicy {
1417 transparent_enabled: true,
1418 ..Default::default()
1419 };
1420 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1421 let policy_rx = self.policy_tx.subscribe();
1422
1423 let provider: Arc<dyn OriginalDstProvider> = {
1424 let mut addrs = BTreeSet::new();
1425 if let Ok(local) = listener.local_addr() {
1426 addrs.insert(local);
1427 }
1428
1429 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1430 {
1431 Arc::new(LinuxOriginalDstProvider::new(addrs))
1432 }
1433 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1434 {
1435 match MacOsOriginalDstProvider::new(addrs.clone()) {
1436 Ok(provider) => Arc::new(provider),
1437 Err(e) => {
1438 error!("Failed to initialize macOS PF provider: {}", e);
1439 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1440 }
1441 }
1442 }
1443 #[cfg(target_os = "windows")]
1444 {
1445 let filter =
1446 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1447 .to_string();
1448 let port = config.port;
1449 tokio::spawn(async move {
1450 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1451 .await;
1452 });
1453
1454 Arc::new(WindowsOriginalDstProvider::new(addrs))
1455 }
1456 #[cfg(not(any(
1457 all(target_os = "linux", feature = "transparent-linux"),
1458 all(target_os = "macos", feature = "transparent-macos"),
1459 target_os = "windows"
1460 )))]
1461 {
1462 Arc::new(NoOpOriginalDstProvider::new(addrs))
1463 }
1464 };
1465
1466 let source = TransparentTcpCaptureSource::new(listener, provider);
1467 let result = relay_core_lib::start_proxy(
1468 source,
1469 proxy_tx,
1470 interceptor,
1471 ca,
1472 policy_rx,
1473 None,
1474 shutdown_rx,
1475 Some(self.get_rule_engine().await),
1476 )
1477 .await
1478 .map_err(|e| e.to_string());
1479 if let Err(error) = &result {
1480 self.transition_to_failed(config.port, error.clone());
1481 } else {
1482 self.transition_to_stopped();
1483 }
1484 result
1485 } else {
1486 let source = TcpCaptureSource::new(listener);
1487 let policy = ProxyPolicy::default();
1488 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1489 let policy_rx = self.policy_tx.subscribe();
1490
1491 let result = relay_core_lib::start_proxy(
1492 source,
1493 proxy_tx,
1494 interceptor,
1495 ca,
1496 policy_rx,
1497 None,
1498 shutdown_rx,
1499 Some(self.get_rule_engine().await),
1500 )
1501 .await
1502 .map_err(|e| e.to_string());
1503 if let Err(error) = &result {
1504 self.transition_to_failed(config.port, error.clone());
1505 } else {
1506 self.transition_to_stopped();
1507 }
1508 result
1509 }
1510 }
1511}
1512
1513fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1514 match update {
1515 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1516 FlowUpdate::WebSocketMessage {
1517 flow_id,
1518 mut message,
1519 } => {
1520 message.content = redact_body(message.content, redaction);
1521 FlowUpdate::WebSocketMessage { flow_id, message }
1522 }
1523 FlowUpdate::HttpBody {
1524 flow_id,
1525 direction,
1526 body,
1527 } => FlowUpdate::HttpBody {
1528 flow_id,
1529 direction,
1530 body: redact_body(body, redaction),
1531 },
1532 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1533 FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1534 }
1535 }
1536}
1537
1538fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1539 if !redaction.enabled {
1540 return flow;
1541 }
1542 match &mut flow.layer {
1543 Layer::Http(http) => {
1544 redact_http_request(&mut http.request, redaction);
1545 if let Some(response) = &mut http.response {
1546 redact_headers(&mut response.headers, redaction);
1547 response.body = response
1548 .body
1549 .take()
1550 .map(|body| redact_body(body, redaction));
1551 }
1552 }
1553 Layer::WebSocket(ws) => {
1554 redact_http_request(&mut ws.handshake_request, redaction);
1555 redact_headers(&mut ws.handshake_response.headers, redaction);
1556 ws.handshake_response.body = ws
1557 .handshake_response
1558 .body
1559 .take()
1560 .map(|body| redact_body(body, redaction));
1561 for message in &mut ws.messages {
1562 message.content = redact_body(message.content.clone(), redaction);
1563 }
1564 }
1565 _ => {}
1566 }
1567 flow
1568}
1569
1570fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1571 if !redaction.enabled {
1572 return summary;
1573 }
1574 summary.url = redact_url_string(&summary.url, redaction);
1575 summary
1576}
1577
1578fn redact_http_request(
1579 request: &mut relay_core_api::flow::HttpRequest,
1580 redaction: &RedactionPolicy,
1581) {
1582 redact_headers(&mut request.headers, redaction);
1583 redact_query_pairs(&mut request.query, redaction);
1584 request.url = redact_url(&request.url, redaction);
1585 request.body = request.body.take().map(|body| redact_body(body, redaction));
1586}
1587
1588fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1589 if !redaction.enabled {
1590 return;
1591 }
1592 let sensitive = redaction_set(&redaction.sensitive_header_names);
1593 for (name, value) in headers.iter_mut() {
1594 if sensitive.contains(&name.to_ascii_lowercase()) {
1595 *value = "[REDACTED]".to_string();
1596 }
1597 }
1598}
1599
1600fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1601 if !redaction.enabled {
1602 return;
1603 }
1604 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1605 for (name, value) in query.iter_mut() {
1606 if sensitive.contains(&name.to_ascii_lowercase()) {
1607 *value = "[REDACTED]".to_string();
1608 }
1609 }
1610}
1611
1612fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1613 if !redaction.enabled {
1614 return url.clone();
1615 }
1616 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1617 let pairs: Vec<(String, String)> = url
1618 .query_pairs()
1619 .map(|(k, v)| {
1620 let key = k.to_string();
1621 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1622 "[REDACTED]".to_string()
1623 } else {
1624 v.to_string()
1625 };
1626 (key, value)
1627 })
1628 .collect();
1629 let mut next = url.clone();
1630 if pairs.is_empty() {
1631 return next;
1632 }
1633 next.query_pairs_mut().clear();
1634 for (k, v) in pairs {
1635 next.query_pairs_mut().append_pair(&k, &v);
1636 }
1637 next
1638}
1639
1640fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1641 match url::Url::parse(input) {
1642 Ok(url) => redact_url(&url, redaction).to_string(),
1643 Err(_) => input.to_string(),
1644 }
1645}
1646
1647fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1648 if redaction.enabled && redaction.redact_bodies {
1649 body.content = "[REDACTED]".to_string();
1650 }
1651 body
1652}
1653
1654fn redaction_set(values: &[String]) -> HashSet<String> {
1655 values
1656 .iter()
1657 .map(|value| value.to_ascii_lowercase())
1658 .collect()
1659}
1660
1661#[derive(Clone)]
1662pub struct ProxyConfig {
1663 pub port: u16,
1664 pub ca_cert_path: std::path::PathBuf,
1665 pub ca_key_path: std::path::PathBuf,
1666 pub transparent: bool,
1667 pub udp_tproxy_port: Option<u16>,
1668}
1669
1670impl ProxyConfig {
1671 pub fn new(
1672 port: u16,
1673 ca_cert_path: std::path::PathBuf,
1674 ca_key_path: std::path::PathBuf,
1675 ) -> Self {
1676 Self {
1677 port,
1678 ca_cert_path,
1679 ca_key_path,
1680 transparent: false,
1681 udp_tproxy_port: None,
1682 }
1683 }
1684
1685 pub fn from_app_data_dir(
1686 app_data_dir: impl Into<std::path::PathBuf>,
1687 port: u16,
1688 ) -> Result<Self, String> {
1689 let app_data_dir = app_data_dir.into();
1690 if !app_data_dir.exists() {
1691 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1692 }
1693
1694 Ok(Self::new(
1695 port,
1696 app_data_dir.join("ca_cert.pem"),
1697 app_data_dir.join("ca_key.pem"),
1698 ))
1699 }
1700
1701 pub fn with_transparent(mut self, transparent: bool) -> Self {
1702 self.transparent = transparent;
1703 self
1704 }
1705
1706 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1707 self.udp_tproxy_port = udp_tproxy_port;
1708 self
1709 }
1710}
1711
1712fn now_unix_ms() -> u64 {
1713 SystemTime::now()
1714 .duration_since(UNIX_EPOCH)
1715 .unwrap_or_default()
1716 .as_millis() as u64
1717}
1718
1719fn modification_field_names(
1720 mods: Option<&relay_core_api::modification::FlowModification>,
1721) -> Vec<&'static str> {
1722 let Some(mods) = mods else {
1723 return Vec::new();
1724 };
1725
1726 let mut fields = Vec::new();
1727 if mods.method.is_some() {
1728 fields.push("method");
1729 }
1730 if mods.url.is_some() {
1731 fields.push("url");
1732 }
1733 if mods.request_headers.is_some() {
1734 fields.push("request_headers");
1735 }
1736 if mods.request_body.is_some() {
1737 fields.push("request_body");
1738 }
1739 if mods.status_code.is_some() {
1740 fields.push("status_code");
1741 }
1742 if mods.response_headers.is_some() {
1743 fields.push("response_headers");
1744 }
1745 if mods.response_body.is_some() {
1746 fields.push("response_body");
1747 }
1748 if mods.message_content.is_some() {
1749 fields.push("message_content");
1750 }
1751 fields
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756 use super::*;
1757 use chrono::Utc;
1758 use relay_core_api::flow::{
1759 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1760 ResponseTiming, TransportProtocol,
1761 };
1762 use std::sync::atomic::{AtomicU64, Ordering};
1763 use tokio::time::{Duration, sleep};
1764 use url::Url;
1765 use uuid::Uuid;
1766
1767 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1768
1769 fn sqlite_url() -> String {
1770 let nanos = SystemTime::now()
1771 .duration_since(UNIX_EPOCH)
1772 .expect("clock drift")
1773 .as_nanos();
1774 let pid = std::process::id();
1775 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1776 let db_dir = std::env::current_dir()
1777 .expect("cwd")
1778 .join("target")
1779 .join("test-dbs");
1780 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1781 let db_path = db_dir.join(format!(
1782 "relay-core-runtime-test-{}-{}-{}.db",
1783 pid, nanos, seq
1784 ));
1785 format!("sqlite://{}?mode=rwc", db_path.display())
1786 }
1787
1788 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1789 let start_time =
1790 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1791 let request_url =
1792 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1793 Flow {
1794 id: Uuid::new_v4(),
1795 start_time,
1796 end_time: Some(start_time),
1797 network: NetworkInfo {
1798 client_ip: "127.0.0.1".to_string(),
1799 client_port: 12000,
1800 server_ip: "127.0.0.1".to_string(),
1801 server_port: 8080,
1802 protocol: TransportProtocol::TCP,
1803 tls: false,
1804 tls_version: None,
1805 sni: None,
1806 },
1807 layer: Layer::Http(HttpLayer {
1808 request: HttpRequest {
1809 method: method.to_string(),
1810 url: request_url,
1811 version: "HTTP/1.1".to_string(),
1812 headers: vec![],
1813 cookies: vec![],
1814 query: vec![],
1815 body: None,
1816 },
1817 response: Some(HttpResponse {
1818 status,
1819 status_text: "OK".to_string(),
1820 version: "HTTP/1.1".to_string(),
1821 headers: vec![],
1822 cookies: vec![],
1823 body: None,
1824 timing: ResponseTiming {
1825 time_to_first_byte: None,
1826 time_to_last_byte: None,
1827 connect_time_ms: None,
1828 ssl_time_ms: None,
1829 },
1830 }),
1831 error: None,
1832 }),
1833 tags: vec![],
1834 meta: std::collections::HashMap::new(),
1835 resilience_trace: None,
1836 rule_variables: std::collections::HashMap::new(),
1837 matched_rules: vec![],
1838 }
1839 }
1840
1841 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1842 let start_time =
1843 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1844 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1845 .expect("url should parse");
1846 Flow {
1847 id: Uuid::new_v4(),
1848 start_time,
1849 end_time: Some(start_time),
1850 network: NetworkInfo {
1851 client_ip: "127.0.0.1".to_string(),
1852 client_port: 12000,
1853 server_ip: "127.0.0.1".to_string(),
1854 server_port: 8080,
1855 protocol: TransportProtocol::TCP,
1856 tls: false,
1857 tls_version: None,
1858 sni: None,
1859 },
1860 layer: Layer::Http(HttpLayer {
1861 request: HttpRequest {
1862 method: "GET".to_string(),
1863 url: request_url,
1864 version: "HTTP/1.1".to_string(),
1865 headers: vec![
1866 (
1867 "Authorization".to_string(),
1868 "Bearer secret-token".to_string(),
1869 ),
1870 ("X-Normal".to_string(), "visible".to_string()),
1871 ],
1872 cookies: vec![],
1873 query: vec![
1874 ("token".to_string(), "abc123".to_string()),
1875 ("ok".to_string(), "1".to_string()),
1876 ],
1877 body: Some(BodyData {
1878 encoding: "utf-8".to_string(),
1879 content: "secret request body".to_string(),
1880 size: 19,
1881 }),
1882 },
1883 response: Some(HttpResponse {
1884 status: 200,
1885 status_text: "OK".to_string(),
1886 version: "HTTP/1.1".to_string(),
1887 headers: vec![
1888 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1889 ("X-Response".to_string(), "visible".to_string()),
1890 ],
1891 cookies: vec![],
1892 body: Some(BodyData {
1893 encoding: "utf-8".to_string(),
1894 content: "secret response body".to_string(),
1895 size: 20,
1896 }),
1897 timing: ResponseTiming {
1898 time_to_first_byte: None,
1899 time_to_last_byte: None,
1900 connect_time_ms: None,
1901 ssl_time_ms: None,
1902 },
1903 }),
1904 error: None,
1905 }),
1906 tags: vec![],
1907 meta: std::collections::HashMap::new(),
1908 resilience_trace: None,
1909 rule_variables: std::collections::HashMap::new(),
1910 matched_rules: vec![],
1911 }
1912 }
1913
1914 #[tokio::test]
1915 async fn set_rules_from_records_audit_event() {
1916 let state = CoreState::new(None).await;
1917
1918 state
1919 .set_rules_from(
1920 AuditActor::Http,
1921 "rule.upsert",
1922 "rule-1".to_string(),
1923 json!({ "route": "/api/v1/rules" }),
1924 Vec::new(),
1925 )
1926 .await
1927 .expect("set rules should succeed");
1928
1929 let events = state.recent_audit_events();
1930 let event = events.last().expect("audit event should exist");
1931 assert_eq!(event.actor, AuditActor::Http);
1932 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1933 assert_eq!(event.outcome, AuditOutcome::Success);
1934 assert_eq!(event.target, "rule-1");
1935 assert_eq!(event.details["operation"], "rule.upsert");
1936 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1937 }
1938
1939 #[tokio::test]
1940 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1941 let state = CoreState::new(None).await;
1942
1943 state
1944 .upsert_rule_from(
1945 AuditActor::Probe,
1946 "rule.upsert",
1947 "rule-1".to_string(),
1948 json!({ "tool": "set_rule" }),
1949 Rule {
1950 id: "rule-1".to_string(),
1951 name: "first".to_string(),
1952 active: true,
1953 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1954 priority: 1,
1955 termination: relay_core_lib::rule::RuleTermination::Continue,
1956 filter: relay_core_lib::rule::Filter::Url(
1957 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1958 ),
1959 actions: vec![],
1960 constraints: None,
1961 },
1962 )
1963 .await
1964 .expect("initial upsert should succeed");
1965
1966 state
1967 .upsert_rule_from(
1968 AuditActor::Probe,
1969 "rule.upsert",
1970 "rule-1".to_string(),
1971 json!({ "tool": "set_rule" }),
1972 Rule {
1973 id: "rule-1".to_string(),
1974 name: "second".to_string(),
1975 active: true,
1976 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1977 priority: 2,
1978 termination: relay_core_lib::rule::RuleTermination::Continue,
1979 filter: relay_core_lib::rule::Filter::Url(
1980 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1981 ),
1982 actions: vec![],
1983 constraints: None,
1984 },
1985 )
1986 .await
1987 .expect("replacement upsert should succeed");
1988
1989 let rules = state.get_rules().await;
1990 assert_eq!(rules.len(), 1);
1991 assert_eq!(rules[0].name, "second");
1992 let event = state
1993 .recent_audit_events()
1994 .last()
1995 .cloned()
1996 .expect("audit event");
1997 assert_eq!(event.details["operation"], "rule.upsert");
1998 assert_eq!(event.details["details"]["tool"], "set_rule");
1999 }
2000
2001 #[tokio::test]
2002 async fn delete_rule_from_returns_false_when_rule_missing() {
2003 let state = CoreState::new(None).await;
2004
2005 let deleted = state
2006 .delete_rule_from(
2007 AuditActor::Http,
2008 "rule.delete",
2009 "missing".to_string(),
2010 json!({ "route": "/api/v1/rules/{id}" }),
2011 "missing",
2012 )
2013 .await
2014 .expect("delete should not fail");
2015
2016 assert!(!deleted);
2017 assert!(state.recent_audit_events().is_empty());
2018 }
2019
2020 #[tokio::test]
2021 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2022 let state = CoreState::new(None).await;
2023
2024 let rule_id = state
2025 .create_mock_response_rule_from(
2026 AuditActor::Http,
2027 "api-mock-1".to_string(),
2028 json!({ "route": "/api/v1/mock", "status": 201 }),
2029 MockResponseRuleConfig {
2030 rule_id: "api-mock-1".to_string(),
2031 url_pattern: "example.com".to_string(),
2032 name: "api-mock:example.com".to_string(),
2033 status: 201,
2034 content_type: "application/json".to_string(),
2035 body: "{\"ok\":true}".to_string(),
2036 },
2037 )
2038 .await
2039 .expect("mock rule should be created");
2040
2041 assert_eq!(rule_id, "api-mock-1");
2042 let rules = state.get_rules().await;
2043 assert_eq!(rules.len(), 1);
2044 assert_eq!(rules[0].id, "api-mock-1");
2045 let event = state
2046 .recent_audit_events()
2047 .last()
2048 .cloned()
2049 .expect("audit event");
2050 assert_eq!(event.details["operation"], "rule.mock_create");
2051 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2052 }
2053
2054 #[tokio::test]
2055 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2056 let state = CoreState::new(None).await;
2057
2058 let rule_id = state
2059 .create_intercept_rule_from(
2060 AuditActor::Http,
2061 "intercept-1".to_string(),
2062 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2063 InterceptRuleConfig {
2064 rule_id: "intercept-1".to_string(),
2065 active: true,
2066 url_pattern: "example.com".to_string(),
2067 method: None,
2068 phase: "request".to_string(),
2069 name: "api-intercept:example.com".to_string(),
2070 priority: 100,
2071 termination: relay_core_lib::rule::RuleTermination::Stop,
2072 },
2073 )
2074 .await
2075 .expect("intercept rule should be created");
2076
2077 assert_eq!(rule_id, "intercept-1");
2078 let rules = state.get_rules().await;
2079 assert_eq!(rules.len(), 1);
2080 assert_eq!(rules[0].id, "intercept-1");
2081 assert_eq!(rules[0].name, "api-intercept:example.com");
2082 let event = state
2083 .recent_audit_events()
2084 .last()
2085 .cloned()
2086 .expect("audit event");
2087 assert_eq!(event.details["operation"], "rule.intercept_create");
2088 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2089 }
2090
2091 #[tokio::test]
2092 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2093 let state = CoreState::new(None).await;
2094
2095 state
2096 .upsert_legacy_intercept_rule_from(
2097 AuditActor::Tauri,
2098 "legacy-1".to_string(),
2099 json!({ "command": "set_intercept_rule" }),
2100 InterceptRule {
2101 id: "legacy-1".to_string(),
2102 active: true,
2103 url_pattern: "example.com".to_string(),
2104 method: None,
2105 phase: "both".to_string(),
2106 },
2107 )
2108 .await
2109 .expect("initial family upsert should succeed");
2110 assert_eq!(state.get_rules().await.len(), 2);
2111
2112 state
2113 .upsert_legacy_intercept_rule_from(
2114 AuditActor::Tauri,
2115 "legacy-1".to_string(),
2116 json!({ "command": "set_intercept_rule" }),
2117 InterceptRule {
2118 id: "legacy-1".to_string(),
2119 active: true,
2120 url_pattern: "example.org".to_string(),
2121 method: Some("POST".to_string()),
2122 phase: "request".to_string(),
2123 },
2124 )
2125 .await
2126 .expect("replacement family upsert should succeed");
2127
2128 let rules = state.get_rules().await;
2129 assert_eq!(rules.len(), 1);
2130 assert_eq!(rules[0].id, "legacy-1");
2131 let event = state
2132 .recent_audit_events()
2133 .last()
2134 .cloned()
2135 .expect("audit event");
2136 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2137 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2138 }
2139
2140 #[tokio::test]
2141 async fn resolve_intercept_failure_records_failed_audit_event() {
2142 let state = CoreState::new(None).await;
2143
2144 let result = state
2145 .resolve_intercept_with_modifications_from(
2146 AuditActor::Probe,
2147 "missing-flow:request".to_string(),
2148 "drop",
2149 None,
2150 )
2151 .await;
2152
2153 assert!(result.is_err());
2154 let events = state.recent_audit_events();
2155 let event = events.last().expect("audit event should exist");
2156 assert_eq!(event.actor, AuditActor::Probe);
2157 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2158 assert_eq!(event.outcome, AuditOutcome::Failed);
2159 assert_eq!(event.details["action"], "drop");
2160 assert!(
2161 event.details["error"]
2162 .as_str()
2163 .unwrap_or_default()
2164 .contains("Interception not found")
2165 );
2166 }
2167
2168 #[tokio::test]
2169 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2170 let state = CoreState::new(None).await;
2171 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2172
2173 state
2174 .prepare_start(8080, shutdown_tx)
2175 .expect("prepare start should succeed");
2176 let lifecycle = state.lifecycle();
2177 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2178 assert_eq!(lifecycle.port, Some(8080));
2179 assert!(lifecycle.started_at_ms.is_none());
2180 assert!(lifecycle.last_error.is_none());
2181
2182 assert_eq!(
2183 state.stop_proxy().expect("stop should succeed"),
2184 ProxyStopResult::Stopping
2185 );
2186 let lifecycle = state.lifecycle();
2187 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2188 assert_eq!(lifecycle.port, Some(8080));
2189 assert!(shutdown_rx.await.is_ok());
2190 }
2191
2192 #[tokio::test]
2193 async fn status_snapshot_derives_runtime_facing_fields() {
2194 let state = CoreState::new(None).await;
2195 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2196 state
2197 .prepare_start(8080, shutdown_tx)
2198 .expect("prepare start should succeed");
2199
2200 let status = state.status_snapshot();
2201 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2202 assert!(status.running);
2203 assert_eq!(status.port, Some(8080));
2204 assert!(status.uptime.is_none());
2205 assert!(status.last_error.is_none());
2206 }
2207
2208 #[tokio::test]
2209 async fn status_report_combines_status_and_metrics() {
2210 let state = CoreState::new(None).await;
2211 let report = state.status_report().await;
2212
2213 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2214 assert!(!report.status.running);
2215 assert_eq!(report.metrics.intercepts_pending, 0);
2216 assert_eq!(report.metrics.ws_pending_messages, 0);
2217 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2218 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2219 assert_eq!(report.metrics.audit_events_total, 0);
2220 assert_eq!(report.metrics.audit_events_failed, 0);
2221 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2222 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2223 }
2224
2225 #[test]
2226 fn proxy_config_new_and_transport_setters_preserve_values() {
2227 let config = ProxyConfig::new(
2228 8080,
2229 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2230 std::path::PathBuf::from("/tmp/ca_key.pem"),
2231 )
2232 .with_transparent(true)
2233 .with_udp_tproxy_port(Some(15000));
2234
2235 assert_eq!(config.port, 8080);
2236 assert_eq!(
2237 config.ca_cert_path,
2238 std::path::PathBuf::from("/tmp/ca_cert.pem")
2239 );
2240 assert_eq!(
2241 config.ca_key_path,
2242 std::path::PathBuf::from("/tmp/ca_key.pem")
2243 );
2244 assert!(config.transparent);
2245 assert_eq!(config.udp_tproxy_port, Some(15000));
2246 }
2247
2248 #[test]
2249 fn proxy_config_from_app_data_dir_creates_default_paths() {
2250 let unique = SystemTime::now()
2251 .duration_since(UNIX_EPOCH)
2252 .expect("clock drift")
2253 .as_nanos();
2254 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2255
2256 let config =
2257 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2258
2259 assert!(dir.exists());
2260 assert_eq!(config.port, 8899);
2261 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2262 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2263 assert!(!config.transparent);
2264 assert!(config.udp_tproxy_port.is_none());
2265 }
2266
2267 #[tokio::test]
2268 async fn intercept_snapshot_maps_pending_counts() {
2269 let state = CoreState::new(None).await;
2270 let snapshot = state.intercept_snapshot().await;
2271
2272 assert_eq!(snapshot.pending_count, 0);
2273 assert_eq!(snapshot.ws_pending_count, 0);
2274 }
2275
2276 #[tokio::test]
2277 async fn audit_snapshot_returns_latest_events_in_order() {
2278 let state = CoreState::new(None).await;
2279 state.record_audit_event(AuditEvent::new(
2280 AuditActor::Runtime,
2281 AuditEventKind::RuleChanged,
2282 "first",
2283 AuditOutcome::Success,
2284 json!({ "index": 1 }),
2285 ));
2286 state.record_audit_event(AuditEvent::new(
2287 AuditActor::Http,
2288 AuditEventKind::PolicyUpdated,
2289 "second",
2290 AuditOutcome::Success,
2291 json!({ "index": 2 }),
2292 ));
2293
2294 let snapshot = state.audit_snapshot(1);
2295
2296 assert_eq!(snapshot.events.len(), 1);
2297 assert_eq!(snapshot.events[0].target, "second");
2298 assert_eq!(snapshot.events[0].details["index"], 2);
2299 }
2300
2301 #[tokio::test]
2302 async fn query_audit_snapshot_filters_in_memory_events() {
2303 let state = CoreState::new(None).await;
2304 state.record_audit_event(AuditEvent::new(
2305 AuditActor::Http,
2306 AuditEventKind::RuleChanged,
2307 "rule-1",
2308 AuditOutcome::Success,
2309 json!({ "idx": 1 }),
2310 ));
2311 state.record_audit_event(AuditEvent::new(
2312 AuditActor::Probe,
2313 AuditEventKind::PolicyUpdated,
2314 "policy",
2315 AuditOutcome::Failed,
2316 json!({ "idx": 2 }),
2317 ));
2318
2319 let snapshot = state
2320 .query_audit_snapshot(CoreAuditQuery {
2321 actor: Some(AuditActor::Probe),
2322 kind: Some(AuditEventKind::PolicyUpdated),
2323 outcome: Some(AuditOutcome::Failed),
2324 limit: 10,
2325 ..Default::default()
2326 })
2327 .await;
2328
2329 assert_eq!(snapshot.events.len(), 1);
2330 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2331 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2332 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2333 }
2334
2335 #[tokio::test]
2336 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2337 let state = CoreState::new(Some(sqlite_url())).await;
2338 state.update_policy_from(
2339 AuditActor::Http,
2340 "policy".to_string(),
2341 ProxyPolicy {
2342 transparent_enabled: true,
2343 ..Default::default()
2344 },
2345 );
2346
2347 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2348 for _ in 0..10 {
2349 snapshot = state
2350 .query_audit_snapshot(CoreAuditQuery {
2351 actor: Some(AuditActor::Http),
2352 kind: Some(AuditEventKind::PolicyUpdated),
2353 limit: 10,
2354 ..Default::default()
2355 })
2356 .await;
2357 if !snapshot.events.is_empty() {
2358 break;
2359 }
2360 sleep(Duration::from_millis(20)).await;
2361 }
2362
2363 assert!(!snapshot.events.is_empty());
2364 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2365 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2366 }
2367
2368 #[tokio::test]
2369 async fn prepare_start_rejects_second_active_start() {
2370 let state = CoreState::new(None).await;
2371 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2372 state
2373 .prepare_start(8080, shutdown_tx)
2374 .expect("first start should succeed");
2375
2376 let (second_tx, _second_rx) = oneshot::channel();
2377 let error = state
2378 .prepare_start(8081, second_tx)
2379 .expect_err("second active start should be rejected");
2380 assert!(error.contains("already"));
2381 }
2382
2383 #[test]
2384 fn update_policy_records_audit_event() {
2385 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2386 let state = runtime.block_on(CoreState::new(None));
2387
2388 state.update_policy_from(
2389 AuditActor::Runtime,
2390 "policy".to_string(),
2391 ProxyPolicy {
2392 transparent_enabled: true,
2393 ..Default::default()
2394 },
2395 );
2396
2397 let events = state.recent_audit_events();
2398 let event = events.last().expect("audit event should exist");
2399 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2400 assert_eq!(event.outcome, AuditOutcome::Success);
2401 assert_eq!(event.details["transparent_enabled"], true);
2402 }
2403
2404 #[test]
2405 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2406 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2407 let state = runtime.block_on(CoreState::new(None));
2408 let original_timeout = state.policy_snapshot().request_timeout_ms;
2409
2410 state.patch_policy_from(
2411 AuditActor::Runtime,
2412 "policy.patch".to_string(),
2413 relay_core_api::policy::ProxyPolicyPatch {
2414 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2415 enabled: Some(true),
2416 redact_bodies: Some(true),
2417 ..Default::default()
2418 }),
2419 },
2420 );
2421
2422 let policy = state.policy_snapshot();
2423 assert_eq!(policy.request_timeout_ms, original_timeout);
2424 assert!(policy.redaction.enabled);
2425 assert!(policy.redaction.redact_bodies);
2426
2427 let events = state.recent_audit_events();
2428 let event = events.last().expect("audit event should exist");
2429 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2430 assert_eq!(event.details["redaction_enabled"], true);
2431 }
2432
2433 #[tokio::test]
2434 async fn metrics_include_audit_and_lagged_event_counters() {
2435 let state = CoreState::new(None).await;
2436
2437 state.update_policy_from(
2438 AuditActor::Runtime,
2439 "policy".to_string(),
2440 ProxyPolicy::default(),
2441 );
2442 let _ = state
2443 .resolve_intercept_with_modifications_from(
2444 AuditActor::Probe,
2445 "missing-flow:request".to_string(),
2446 "drop",
2447 None,
2448 )
2449 .await;
2450
2451 state.record_flow_events_lagged(3);
2452 state.record_audit_events_lagged(5);
2453
2454 let metrics = state.get_metrics().await;
2455 assert_eq!(metrics.audit_events_total, 2);
2456 assert_eq!(metrics.audit_events_failed, 1);
2457 assert_eq!(metrics.flow_events_lagged_total, 3);
2458 assert_eq!(metrics.audit_events_lagged_total, 5);
2459 }
2460
2461 #[tokio::test]
2462 async fn prometheus_metrics_text_contains_observability_fields() {
2463 let state = CoreState::new(None).await;
2464 state.record_flow_events_lagged(2);
2465 state.record_audit_events_lagged(4);
2466
2467 let text = state.get_metrics_prometheus_text().await;
2468 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2469 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2470 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2471 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2472 }
2473
2474 #[tokio::test]
2475 async fn search_flows_uses_store_with_offset_pagination() {
2476 let state = CoreState::new(Some(sqlite_url())).await;
2477 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2478 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2479 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2480
2481 state.upsert_flow(Box::new(flow_a));
2482 state.upsert_flow(Box::new(flow_b));
2483 state.upsert_flow(Box::new(flow_c));
2484
2485 let mut baseline = Vec::new();
2486 for _ in 0..20 {
2487 baseline = state
2488 .search_flows(FlowQuery {
2489 host: Some("api.example.com".to_string()),
2490 path_contains: None,
2491 method: None,
2492 status_min: None,
2493 status_max: None,
2494 has_error: None,
2495 is_websocket: None,
2496 limit: Some(3),
2497 offset: Some(0),
2498 })
2499 .await;
2500 if baseline.len() == 3 {
2501 break;
2502 }
2503 sleep(Duration::from_millis(20)).await;
2504 }
2505
2506 assert_eq!(baseline.len(), 3);
2507 let page = state
2508 .search_flows(FlowQuery {
2509 host: Some("api.example.com".to_string()),
2510 path_contains: None,
2511 method: None,
2512 status_min: None,
2513 status_max: None,
2514 has_error: None,
2515 is_websocket: None,
2516 limit: Some(1),
2517 offset: Some(1),
2518 })
2519 .await;
2520 assert_eq!(page.len(), 1);
2521 assert_eq!(page[0].id, baseline[1].id);
2522 }
2523
2524 #[tokio::test]
2525 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2526 let state = CoreState::new(Some(sqlite_url())).await;
2527 let first_flow = sample_http_flow(
2528 "persist.example.com",
2529 "/first",
2530 "GET",
2531 200,
2532 1_700_000_010_000,
2533 );
2534 let first_id = first_flow.id.to_string();
2535 state.upsert_flow(Box::new(first_flow));
2536 for i in 0..240 {
2537 state.upsert_flow(Box::new(sample_http_flow(
2538 "persist.example.com",
2539 &format!("/{}", i),
2540 "GET",
2541 200,
2542 1_700_000_020_000 + i,
2543 )));
2544 }
2545
2546 sleep(Duration::from_millis(200)).await;
2547
2548 let loaded = state.get_flow(first_id).await;
2549 assert!(loaded.is_some());
2550 }
2551
2552 #[tokio::test]
2553 async fn search_flows_redacts_summary_url_when_enabled() {
2554 let state = CoreState::new(None).await;
2555 state.update_policy_from(
2556 AuditActor::Runtime,
2557 "policy.redaction".to_string(),
2558 ProxyPolicy {
2559 redaction: RedactionPolicy {
2560 enabled: true,
2561 sensitive_query_keys: vec!["token".to_string()],
2562 redact_bodies: false,
2563 ..Default::default()
2564 },
2565 ..Default::default()
2566 },
2567 );
2568 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2569
2570 let mut items = Vec::new();
2571 for _ in 0..20 {
2572 items = state
2573 .search_flows(FlowQuery {
2574 host: Some("api.example.com".to_string()),
2575 path_contains: Some("/private".to_string()),
2576 ..Default::default()
2577 })
2578 .await;
2579 if !items.is_empty() {
2580 break;
2581 }
2582 sleep(Duration::from_millis(20)).await;
2583 }
2584
2585 assert!(!items.is_empty());
2586 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2587 let token = redacted
2588 .query_pairs()
2589 .find(|(k, _)| k == "token")
2590 .map(|(_, v)| v.to_string());
2591 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2592 }
2593
2594 #[tokio::test]
2595 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2596 let state = CoreState::new(Some(sqlite_url())).await;
2597 state.update_policy_from(
2598 AuditActor::Runtime,
2599 "policy.redaction".to_string(),
2600 ProxyPolicy {
2601 redaction: RedactionPolicy {
2602 enabled: true,
2603 sensitive_header_names: vec![
2604 "authorization".to_string(),
2605 "set-cookie".to_string(),
2606 ],
2607 sensitive_query_keys: vec!["token".to_string()],
2608 redact_bodies: true,
2609 },
2610 ..Default::default()
2611 },
2612 );
2613
2614 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2615 let flow_id = flow.id.to_string();
2616 state.upsert_flow(Box::new(flow));
2617 sleep(Duration::from_millis(80)).await;
2618
2619 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2620 let Layer::Http(http) = loaded.layer else {
2621 panic!("expected http layer");
2622 };
2623
2624 let auth = http
2625 .request
2626 .headers
2627 .iter()
2628 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2629 .map(|(_, v)| v.as_str());
2630 assert_eq!(auth, Some("[REDACTED]"));
2631
2632 let req_query_token = http
2633 .request
2634 .query
2635 .iter()
2636 .find(|(k, _)| k == "token")
2637 .map(|(_, v)| v.as_str());
2638 assert_eq!(req_query_token, Some("[REDACTED]"));
2639
2640 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2641 assert_eq!(req_body, Some("[REDACTED]"));
2642
2643 let response = http.response.expect("response should exist");
2644 let set_cookie = response
2645 .headers
2646 .iter()
2647 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2648 .map(|(_, v)| v.as_str());
2649 assert_eq!(set_cookie, Some("[REDACTED]"));
2650 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2651 assert_eq!(res_body, Some("[REDACTED]"));
2652 }
2653
2654 #[test]
2655 fn redact_flow_update_masks_http_body_when_enabled() {
2656 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2657 let state = runtime.block_on(CoreState::new(None));
2658 state.update_policy_from(
2659 AuditActor::Runtime,
2660 "policy.redaction".to_string(),
2661 ProxyPolicy {
2662 redaction: RedactionPolicy {
2663 enabled: true,
2664 redact_bodies: true,
2665 ..Default::default()
2666 },
2667 ..Default::default()
2668 },
2669 );
2670
2671 let update = FlowUpdate::HttpBody {
2672 flow_id: "f-1".to_string(),
2673 direction: relay_core_api::flow::Direction::ClientToServer,
2674 body: BodyData {
2675 encoding: "utf-8".to_string(),
2676 content: "super-secret".to_string(),
2677 size: 12,
2678 },
2679 };
2680 let redacted = state.redact_flow_update_for_output(update);
2681 match redacted {
2682 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2683 _ => panic!("expected http body update"),
2684 }
2685 }
2686
2687 #[cfg(feature = "script")]
2688 #[tokio::test]
2689 async fn load_script_from_records_audit_event() {
2690 let state = CoreState::new(None).await;
2691
2692 state
2693 .load_script_from(
2694 AuditActor::Tauri,
2695 "tauri.load_script".to_string(),
2696 "globalThis.onRequestHeaders = (_flow) => {};",
2697 )
2698 .await
2699 .expect("script should load");
2700
2701 let events = state.recent_audit_events();
2702 let event = events.last().expect("audit event should exist");
2703 assert_eq!(event.actor, AuditActor::Tauri);
2704 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2705 assert_eq!(event.outcome, AuditOutcome::Success);
2706 assert_eq!(event.target, "tauri.load_script");
2707 }
2708}