1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62mod log_format;
63pub mod modification;
64pub mod paths;
65pub mod rule;
66pub mod services;
67
68pub use paths::CaPaths;
71pub use relay_core_api::{flow, policy};
72pub use relay_core_lib::InterceptionResult;
73pub use relay_core_lib::rule as lib_rule;
74
75use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
76use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
77use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
78
79pub mod rule_engine {
80 pub use relay_core_lib::rule_engine::*;
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
84pub struct CoreMetrics {
85 pub flows_total: usize,
86 pub flows_in_memory: usize,
87 pub flows_dropped: usize,
88 pub intercepts_pending: usize,
89 pub ws_pending_messages: usize,
90 pub oldest_intercept_age_ms: Option<u64>,
91 pub oldest_ws_message_age_ms: Option<u64>,
92 pub rule_exec_errors: usize,
93 pub audit_events_total: usize,
94 pub audit_events_failed: usize,
95 pub flow_events_lagged_total: usize,
96 pub audit_events_lagged_total: usize,
97 pub proxy_body_degraded_total: usize,
99 pub proxy_http_request_total: usize,
101 pub proxy_sandbox_reject_total: usize,
103 pub proxy_invalid_method_total: usize,
105 pub proxy_invalid_status_total: usize,
107 pub proxy_retry_total: usize,
109 pub proxy_stream_mode_tap_total: usize,
111 pub proxy_stream_mode_degrade_total: usize,
113}
114
115impl CoreMetrics {
116 pub fn to_prometheus_text(&self) -> String {
117 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
118 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
119 format!(
120 "relay_core_flows_total {}\n\
121relay_core_flows_in_memory {}\n\
122relay_core_flows_dropped_total {}\n\
123relay_core_intercepts_pending {}\n\
124relay_core_ws_pending_messages {}\n\
125relay_core_oldest_intercept_age_ms {}\n\
126relay_core_oldest_ws_message_age_ms {}\n\
127relay_core_rule_exec_errors_total {}\n\
128relay_core_audit_events_total {}\n\
129relay_core_audit_events_failed_total {}\n\
130relay_core_flow_events_lagged_total {}\n\
131relay_core_audit_events_lagged_total {}\n\
132relay_core_proxy_body_degraded_total {}\n\
133relay_core_proxy_http_request_total {}\n\
134relay_core_proxy_sandbox_reject_total {}\n\
135relay_core_proxy_invalid_method_total {}\n\
136relay_core_proxy_invalid_status_total {}\n\
137relay_core_proxy_retry_total {}\n\
138relay_core_proxy_stream_mode_tap_total {}\n\
139relay_core_proxy_stream_mode_degrade_total {}\n",
140 self.flows_total,
141 self.flows_in_memory,
142 self.flows_dropped,
143 self.intercepts_pending,
144 self.ws_pending_messages,
145 oldest_intercept_age_ms,
146 oldest_ws_message_age_ms,
147 self.rule_exec_errors,
148 self.audit_events_total,
149 self.audit_events_failed,
150 self.flow_events_lagged_total,
151 self.audit_events_lagged_total,
152 self.proxy_body_degraded_total,
153 self.proxy_http_request_total,
154 self.proxy_sandbox_reject_total,
155 self.proxy_invalid_method_total,
156 self.proxy_invalid_status_total,
157 self.proxy_retry_total,
158 self.proxy_stream_mode_tap_total,
159 self.proxy_stream_mode_degrade_total,
160 )
161 }
162}
163
164#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
165pub struct CoreStatusSnapshot {
166 pub phase: RuntimeLifecyclePhase,
167 pub running: bool,
168 pub port: Option<u16>,
169 pub uptime: Option<u64>,
170 pub last_error: Option<String>,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
174pub struct CoreStatusReport {
175 pub status: CoreStatusSnapshot,
176 pub metrics: CoreMetrics,
177}
178
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
180pub struct PendingInterceptItem {
181 pub key: String,
182 pub flow_id: String,
183 pub phase: String,
184 pub url: String,
185 pub method: String,
186 pub created_at_ms: u64,
187}
188
189#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
190pub struct CoreInterceptSnapshot {
191 pub pending_count: usize,
192 pub ws_pending_count: usize,
193 #[serde(default, skip_serializing_if = "Vec::is_empty")]
194 pub items: Vec<PendingInterceptItem>,
195}
196
197#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
198pub struct CoreAuditSnapshot {
199 pub events: Vec<AuditEvent>,
200}
201
202#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
203pub struct CoreAuditQuery {
204 pub since_ms: Option<u64>,
205 pub until_ms: Option<u64>,
206 pub actor: Option<AuditActor>,
207 pub kind: Option<AuditEventKind>,
208 pub outcome: Option<AuditOutcome>,
209 pub limit: usize,
210}
211
212impl Default for CoreAuditQuery {
213 fn default() -> Self {
214 Self {
215 since_ms: None,
216 until_ms: None,
217 actor: None,
218 kind: None,
219 outcome: None,
220 limit: 50,
221 }
222 }
223}
224
225#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
226#[serde(rename_all = "snake_case")]
227pub enum RuntimeLifecyclePhase {
228 Created,
229 Starting,
230 Running,
231 Stopping,
232 Stopped,
233 Failed,
234}
235
236impl RuntimeLifecyclePhase {
237 pub fn as_str(&self) -> &'static str {
238 match self {
239 Self::Created => "created",
240 Self::Starting => "starting",
241 Self::Running => "running",
242 Self::Stopping => "stopping",
243 Self::Stopped => "stopped",
244 Self::Failed => "failed",
245 }
246 }
247
248 pub fn is_active(&self) -> bool {
249 matches!(self, Self::Starting | Self::Running | Self::Stopping)
250 }
251}
252
253#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
254pub struct RuntimeLifecycle {
255 pub phase: RuntimeLifecyclePhase,
256 pub port: Option<u16>,
257 pub started_at_ms: Option<u64>,
258 pub last_error: Option<String>,
259}
260
261impl RuntimeLifecycle {
262 pub fn created() -> Self {
263 Self {
264 phase: RuntimeLifecyclePhase::Created,
265 port: None,
266 started_at_ms: None,
267 last_error: None,
268 }
269 }
270
271 pub fn is_active(&self) -> bool {
272 self.phase.is_active()
273 }
274
275 pub fn uptime_seconds(&self) -> Option<u64> {
276 let started_at_ms = self.started_at_ms?;
277 let now_ms = now_unix_ms();
278 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
279 }
280}
281
282pub enum ProxySpawnResult {
283 Started(tokio::task::JoinHandle<()>),
284 AlreadyRunning,
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum ProxyStopResult {
289 Stopping,
290 NotRunning,
291}
292
293impl From<RuntimeLifecycle> for CoreStatusSnapshot {
294 fn from(lifecycle: RuntimeLifecycle) -> Self {
295 Self {
296 running: lifecycle.is_active(),
297 uptime: lifecycle.uptime_seconds(),
298 port: lifecycle.port,
299 last_error: lifecycle.last_error,
300 phase: lifecycle.phase,
301 }
302 }
303}
304
305pub struct CoreState {
306 flow_store: mpsc::Sender<FlowStoreMessage>,
307 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
308 rule_store: mpsc::Sender<RuleStoreMessage>,
309 store: Option<Store>,
310 #[cfg(feature = "script")]
311 pub script_interceptor: Arc<ScriptInterceptor>,
312 pub policy_tx: watch::Sender<ProxyPolicy>,
313 pub flows_dropped: Arc<AtomicUsize>,
314 audit_events_total: Arc<AtomicUsize>,
315 audit_events_failed: Arc<AtomicUsize>,
316 flow_events_lagged_total: Arc<AtomicUsize>,
317 audit_events_lagged_total: Arc<AtomicUsize>,
318 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
320 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
321 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
322 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
323 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
324}
325
326impl CoreState {
327 pub async fn new(db_url: Option<String>) -> Self {
328 const AUDIT_HISTORY_LIMIT: usize = 200;
329
330 let store = if let Some(url) = db_url {
331 match Store::connect(&url).await {
332 Ok(s) => {
333 if let Err(e) = s.init().await {
334 tracing::error!("Failed to init store: {}", e);
335 }
336 Some(s)
337 }
338 Err(e) => {
339 tracing::error!("Failed to connect to store: {}", e);
340 None
341 }
342 }
343 } else {
344 None
345 };
346
347 let (flow_tx, flow_rx) = mpsc::channel(10000);
348 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
349 tokio::spawn(flow_actor.run());
350
351 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
352 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
353 tokio::spawn(intercept_actor.run());
354
355 let (rule_tx, rule_rx) = mpsc::channel(100);
356 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
357 tokio::spawn(rule_actor.run());
358
359 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
360 let (flow_broadcast_tx, _) = broadcast::channel(1000);
361 let (audit_broadcast_tx, _) = broadcast::channel(256);
362 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
363
364 #[cfg(feature = "script")]
365 let script_interceptor = ScriptInterceptor::new()
366 .await
367 .expect("Failed to initialize ScriptInterceptor");
368 Self {
369 flow_store: flow_tx,
370 intercept_broker: intercept_tx,
371 rule_store: rule_tx,
372 store,
373 #[cfg(feature = "script")]
374 script_interceptor: Arc::new(script_interceptor),
375 policy_tx,
376 flows_dropped: Arc::new(AtomicUsize::new(0)),
377 audit_events_total: Arc::new(AtomicUsize::new(0)),
378 audit_events_failed: Arc::new(AtomicUsize::new(0)),
379 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
380 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
381 flow_broadcast_tx,
382 audit_broadcast_tx,
383 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
384 lifecycle_tx,
385 shutdown_tx: Mutex::new(None),
386 }
387 }
388
389 pub async fn get_metrics(&self) -> CoreMetrics {
390 let (flow_tx, flow_rx) = oneshot::channel();
391 let _ = self
392 .flow_store
393 .send(FlowStoreMessage::GetMetrics(flow_tx))
394 .await;
395 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
396
397 let (int_tx, int_rx) = oneshot::channel();
398 let _ = self
399 .intercept_broker
400 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
401 .await;
402 let (
403 intercepts_pending,
404 ws_pending_messages,
405 oldest_intercept_age_ms,
406 oldest_ws_message_age_ms,
407 ) = int_rx.await.unwrap_or((0, 0, None, None));
408
409 let (rule_tx, rule_rx) = oneshot::channel();
410 let _ = self
411 .rule_store
412 .send(RuleStoreMessage::GetMetrics(rule_tx))
413 .await;
414 let rule_exec_errors = rule_rx.await.unwrap_or(0);
415
416 CoreMetrics {
417 flows_total,
418 flows_in_memory,
419 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
420 intercepts_pending,
421 ws_pending_messages,
422 oldest_intercept_age_ms,
423 oldest_ws_message_age_ms,
424 rule_exec_errors,
425 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
426 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
427 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
428 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
429 proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
430 proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
431 proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
432 proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
433 proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
434 proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
435 proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
436 proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
437 ),
438 }
439 }
440
441 pub async fn get_metrics_prometheus_text(&self) -> String {
442 let mut text = self.get_metrics().await.to_prometheus_text();
443 #[cfg(feature = "script")]
444 {
445 text.push_str(&self.script_interceptor.metrics.prometheus_lines());
446 }
447 text
448 }
449
450 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
451 self.lifecycle().into()
452 }
453
454 pub async fn status_report(&self) -> CoreStatusReport {
455 CoreStatusReport {
456 status: self.status_snapshot(),
457 metrics: self.get_metrics().await,
458 }
459 }
460
461 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
462 let metrics = self.get_metrics().await;
463 let items = self.list_pending_intercept_items().await;
464 CoreInterceptSnapshot {
465 pending_count: metrics.intercepts_pending,
466 ws_pending_count: metrics.ws_pending_messages,
467 items,
468 }
469 }
470
471 pub async fn list_pending_intercept_items(&self) -> Vec<PendingInterceptItem> {
472 let keys = self.list_pending_intercept_keys().await;
473 let mut items = Vec::with_capacity(keys.len());
474 for (key, created_at_ms) in keys {
475 let (flow_id, phase) = parse_intercept_key(&key);
476 let (url, method) = if let Some(flow) = self.get_flow(flow_id.clone()).await {
477 flow_url_method(&flow)
478 } else {
479 (String::new(), String::new())
480 };
481 items.push(PendingInterceptItem {
482 key,
483 flow_id,
484 phase,
485 url,
486 method,
487 created_at_ms,
488 });
489 }
490 items
491 }
492
493 async fn list_pending_intercept_keys(&self) -> Vec<(String, u64)> {
494 let (tx, rx) = oneshot::channel();
495 if self
496 .intercept_broker
497 .send(InterceptBrokerMessage::ListPendingIntercepts { respond_to: tx })
498 .await
499 .is_err()
500 {
501 return Vec::new();
502 }
503 rx.await.unwrap_or_default()
504 }
505
506 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
507 let events = self.recent_audit_events();
508 let start = events.len().saturating_sub(limit);
509 CoreAuditSnapshot {
510 events: events.into_iter().skip(start).collect(),
511 }
512 }
513
514 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
515 let limit = query.limit.clamp(1, 500);
516 if let Some(store) = &self.store {
517 let rows = store
518 .query_audit_events(
519 query.since_ms,
520 query.until_ms,
521 query.actor.as_ref().map(AuditActor::as_str),
522 query.kind.as_ref().map(AuditEventKind::as_str),
523 query.outcome.as_ref().map(AuditOutcome::as_str),
524 limit,
525 )
526 .await
527 .unwrap_or_default();
528
529 let mut events = Vec::with_capacity(rows.len());
530 for row in rows {
531 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
532 events.push(event);
533 }
534 }
535 return CoreAuditSnapshot { events };
536 }
537
538 let mut events = self.recent_audit_events();
539 events.reverse();
540 let filtered = events
541 .into_iter()
542 .filter(|event| {
543 query
544 .since_ms
545 .map(|v| event.timestamp_ms >= v)
546 .unwrap_or(true)
547 })
548 .filter(|event| {
549 query
550 .until_ms
551 .map(|v| event.timestamp_ms <= v)
552 .unwrap_or(true)
553 })
554 .filter(|event| {
555 query
556 .actor
557 .as_ref()
558 .map(|v| &event.actor == v)
559 .unwrap_or(true)
560 })
561 .filter(|event| {
562 query
563 .kind
564 .as_ref()
565 .map(|v| &event.kind == v)
566 .unwrap_or(true)
567 })
568 .filter(|event| {
569 query
570 .outcome
571 .as_ref()
572 .map(|v| &event.outcome == v)
573 .unwrap_or(true)
574 })
575 .take(limit)
576 .collect();
577 CoreAuditSnapshot { events: filtered }
578 }
579
580 pub async fn get_flow(&self, id: String) -> Option<Flow> {
581 let (tx, rx) = oneshot::channel();
582 if let Err(e) = self
583 .flow_store
584 .send(FlowStoreMessage::GetFlow {
585 id: id.clone(),
586 respond_to: tx,
587 })
588 .await
589 {
590 error!("Failed to send GetFlow request: {}", e);
591 if let Some(store) = &self.store {
592 return store
593 .load_flow(&id)
594 .await
595 .ok()
596 .flatten()
597 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
598 }
599 return None;
600 }
601 let flow = rx
602 .await
603 .map_err(|e| {
604 error!("Failed to receive Flow response: {}", e);
605 e
606 })
607 .unwrap_or(None);
608 if let Some(flow) = flow {
609 return Some(redact_flow(flow, &self.current_redaction_policy()));
610 }
611 if let Some(store) = &self.store {
612 return store
613 .load_flow(&id)
614 .await
615 .ok()
616 .flatten()
617 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
618 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
619 }
620 None
621 }
622
623 pub async fn get_rules(&self) -> Vec<Rule> {
624 let (tx, rx) = oneshot::channel();
625 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
626 error!("Failed to send GetRules request: {}", e);
627 return Vec::new();
628 }
629 rx.await
630 .map_err(|e| {
631 error!("Failed to receive Rules response: {}", e);
632 e
633 })
634 .unwrap_or_default()
635 }
636
637 pub async fn set_rules(&self, rules: Vec<Rule>) {
638 let _ = self
639 .set_rules_from(
640 AuditActor::Runtime,
641 "rules.replace",
642 "rule_set".to_string(),
643 json!({}),
644 rules,
645 )
646 .await;
647 }
648
649 pub async fn upsert_rule_from(
650 &self,
651 actor: AuditActor,
652 operation: &str,
653 target: String,
654 details: serde_json::Value,
655 rule: Rule,
656 ) -> Result<(), String> {
657 let rule_id = rule.id.clone();
658 let mut rules = self.get_rules().await;
659 rules.retain(|existing| existing.id != rule_id);
660 rules.push(rule);
661 self.set_rules_from(actor, operation, target, details, rules)
662 .await
663 }
664
665 pub async fn delete_rule_from(
666 &self,
667 actor: AuditActor,
668 operation: &str,
669 target: String,
670 details: serde_json::Value,
671 rule_id: &str,
672 ) -> Result<bool, String> {
673 let mut rules = self.get_rules().await;
674 let before = rules.len();
675 rules.retain(|rule| rule.id != rule_id);
676 if rules.len() == before {
677 return Ok(false);
678 }
679 self.set_rules_from(actor, operation, target, details, rules)
680 .await
681 .map(|_| true)
682 }
683
684 pub async fn create_mock_response_rule_from(
685 &self,
686 actor: AuditActor,
687 target: String,
688 details: serde_json::Value,
689 config: MockResponseRuleConfig,
690 ) -> Result<String, String> {
691 let rule_id = config.rule_id.clone();
692 let rule = build_mock_response_rule(config);
693 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
694 .await
695 .map(|_| rule_id)
696 }
697
698 pub async fn create_intercept_rule_from(
699 &self,
700 actor: AuditActor,
701 target: String,
702 details: serde_json::Value,
703 config: InterceptRuleConfig,
704 ) -> Result<String, String> {
705 let rule_id = config.rule_id.clone();
706 let mut rules = self.get_rules().await;
707 rules.extend(build_intercept_rules(config));
708 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
709 .await
710 .map(|_| rule_id)
711 }
712
713 pub async fn upsert_legacy_intercept_rule_from(
714 &self,
715 actor: AuditActor,
716 target: String,
717 details: serde_json::Value,
718 rule: InterceptRule,
719 ) -> Result<(), String> {
720 let rule_id = rule.id.clone();
721 let mut rules = self.get_rules().await;
722 rules.retain(|existing| {
723 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
724 });
725 rules.extend(rule.to_rules());
726 self.set_rules_from(
727 actor,
728 "rule.intercept_legacy_upsert",
729 target,
730 details,
731 rules,
732 )
733 .await
734 }
735
736 pub async fn set_rules_from(
737 &self,
738 actor: AuditActor,
739 operation: &str,
740 target: String,
741 details: serde_json::Value,
742 rules: Vec<Rule>,
743 ) -> Result<(), String> {
744 let rule_count = rules.len();
745 if let Err(e) = self
746 .rule_store
747 .send(RuleStoreMessage::SetRules(rules))
748 .await
749 {
750 error!("Failed to send SetRules request: {}", e);
751 self.record_audit_event(AuditEvent::new(
752 actor,
753 AuditEventKind::RuleChanged,
754 target,
755 AuditOutcome::Failed,
756 json!({
757 "operation": operation,
758 "rule_count": rule_count,
759 "details": details,
760 "error": e.to_string()
761 }),
762 ));
763 return Err(e.to_string());
764 }
765
766 self.record_audit_event(AuditEvent::new(
767 actor,
768 AuditEventKind::RuleChanged,
769 target,
770 AuditOutcome::Success,
771 json!({
772 "operation": operation,
773 "rule_count": rule_count,
774 "details": details
775 }),
776 ));
777 Ok(())
778 }
779
780 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
781 let mut new_rules = Vec::new();
782 for rule in rules {
783 new_rules.extend(rule.to_rules());
784 }
785 self.set_rules(new_rules).await;
786 }
787
788 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
789 let (tx, rx) = oneshot::channel();
790 if let Err(e) = self
791 .rule_store
792 .send(RuleStoreMessage::GetRuleEngine(tx))
793 .await
794 {
795 error!("Failed to send GetRuleEngine request: {}", e);
796 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
797 }
798 rx.await
799 .map_err(|e| {
800 error!("Failed to receive RuleEngine response: {}", e);
801 e
802 })
803 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
804 }
805
806 pub fn update_policy(&self, policy: ProxyPolicy) {
807 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
808 }
809
810 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
811 let mut policy = self.policy_snapshot();
812 policy.apply_patch(patch);
813 self.update_policy_from(actor, target, policy);
814 }
815
816 pub fn policy_snapshot(&self) -> ProxyPolicy {
817 self.policy_tx.borrow().clone()
818 }
819
820 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
821 let details = json!({
822 "strict_http_semantics": policy.strict_http_semantics,
823 "request_timeout_ms": policy.request_timeout_ms,
824 "max_body_size": policy.max_body_size,
825 "transparent_enabled": policy.transparent_enabled,
826 "redaction_enabled": policy.redaction.enabled,
827 "redact_bodies": policy.redaction.redact_bodies
828 });
829
830 self.policy_tx.send_replace(policy);
831 self.record_audit_event(AuditEvent::new(
832 actor,
833 AuditEventKind::PolicyUpdated,
834 target,
835 AuditOutcome::Success,
836 details,
837 ));
838 }
839
840 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
841 if let Err(e) = self
842 .intercept_broker
843 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
844 .await
845 {
846 error!("Failed to send RegisterIntercept request: {}", e);
847 }
848 }
849
850 pub async fn resolve_intercept(
851 &self,
852 key: String,
853 result: InterceptionResult,
854 ) -> Result<(), String> {
855 let (tx, rx) = oneshot::channel();
856 if let Err(e) = self
857 .intercept_broker
858 .send(InterceptBrokerMessage::ResolveIntercept {
859 key,
860 result,
861 respond_to: tx,
862 })
863 .await
864 {
865 error!("Failed to send ResolveIntercept request: {}", e);
866 return Err(e.to_string());
867 }
868 rx.await.map_err(|_| "Actor dropped".to_string())?
869 }
870
871 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
872 let (tx, rx) = oneshot::channel();
873 if let Err(e) = self
874 .intercept_broker
875 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
876 key,
877 respond_to: tx,
878 })
879 .await
880 {
881 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
882 return None;
883 }
884 rx.await
885 .map_err(|e| {
886 error!("Failed to receive WebSocketMessage response: {}", e);
887 e
888 })
889 .unwrap_or(None)
890 }
891
892 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
893 if let Err(e) = self
894 .intercept_broker
895 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
896 .await
897 {
898 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
899 }
900 }
901
902 pub async fn is_intercept_pending(&self, key: String) -> bool {
903 let (tx, rx) = oneshot::channel();
904 if let Err(e) = self
905 .intercept_broker
906 .send(InterceptBrokerMessage::GetPendingIntercept {
907 key,
908 respond_to: tx,
909 })
910 .await
911 {
912 error!("Failed to send GetPendingIntercept request: {}", e);
913 return false;
914 }
915 rx.await
916 .map_err(|e| {
917 error!("Failed to receive PendingIntercept response: {}", e);
918 e
919 })
920 .unwrap_or(false)
921 }
922
923 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
924 let (tx, rx) = oneshot::channel();
925 if let Err(e) = self
926 .intercept_broker
927 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
928 flow_id,
929 respond_to: tx,
930 })
931 .await
932 {
933 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
934 return false;
935 }
936 rx.await
937 .map_err(|e| {
938 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
939 e
940 })
941 .unwrap_or(false)
942 }
943
944 pub fn upsert_flow(&self, flow: Box<Flow>) {
945 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
946 error!("FlowStore dropped flow: {}", e);
948 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
949 }
950 }
951
952 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
953 if let Err(e) = self
954 .flow_store
955 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
956 {
957 error!("FlowStore dropped WS message: {}", e);
958 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
959 }
960 }
961
962 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
963 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
964 flow_id,
965 body,
966 direction,
967 }) {
968 error!("FlowStore dropped HTTP body: {}", e);
969 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
970 }
971 }
972
973 pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
975 if let Err(e) = self
976 .flow_store
977 .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
978 {
979 error!("FlowStore dropped budget-exceeded tag: {}", e);
980 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
981 }
982 }
983
984 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
986 self.flow_broadcast_tx.subscribe()
987 }
988
989 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
990 self.audit_broadcast_tx.subscribe()
991 }
992
993 fn current_redaction_policy(&self) -> RedactionPolicy {
994 self.policy_tx.borrow().redaction.clone()
995 }
996
997 pub fn record_flow_events_lagged(&self, skipped: u64) {
998 self.flow_events_lagged_total
999 .fetch_add(skipped as usize, Ordering::Relaxed);
1000 }
1001
1002 pub fn record_audit_events_lagged(&self, skipped: u64) {
1003 self.audit_events_lagged_total
1004 .fetch_add(skipped as usize, Ordering::Relaxed);
1005 }
1006
1007 pub fn lifecycle(&self) -> RuntimeLifecycle {
1008 self.lifecycle_tx.borrow().clone()
1009 }
1010
1011 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
1012 self.lifecycle_tx.subscribe()
1013 }
1014
1015 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
1016 let current = self.lifecycle();
1017 if current.is_active() {
1018 return Err(format!(
1019 "Proxy is already {} on port {}",
1020 current.phase.as_str(),
1021 current.port.unwrap_or(port)
1022 ));
1023 }
1024
1025 let mut guard = self
1026 .shutdown_tx
1027 .lock()
1028 .map_err(|_| "shutdown state poisoned".to_string())?;
1029 *guard = Some(shutdown_tx);
1030 drop(guard);
1031
1032 self.update_lifecycle(RuntimeLifecycle {
1033 phase: RuntimeLifecyclePhase::Starting,
1034 port: Some(port),
1035 started_at_ms: None,
1036 last_error: None,
1037 });
1038 Ok(())
1039 }
1040
1041 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
1042 let mut guard = self
1043 .shutdown_tx
1044 .lock()
1045 .map_err(|_| "shutdown state poisoned".to_string())?;
1046 let Some(tx) = guard.take() else {
1047 return Ok(ProxyStopResult::NotRunning);
1048 };
1049 drop(guard);
1050
1051 let current = self.lifecycle();
1052 self.update_lifecycle(RuntimeLifecycle {
1053 phase: RuntimeLifecyclePhase::Stopping,
1054 port: current.port,
1055 started_at_ms: current.started_at_ms,
1056 last_error: current.last_error,
1057 });
1058 let _ = tx.send(());
1059 Ok(ProxyStopResult::Stopping)
1060 }
1061
1062 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1063 self.audit_history
1064 .lock()
1065 .map(|events| events.iter().cloned().collect())
1066 .unwrap_or_default()
1067 }
1068
1069 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1071 let redaction = self.current_redaction_policy();
1072 if let Some(store) = &self.store {
1073 return store
1074 .query_flow_summaries(&query)
1075 .await
1076 .unwrap_or_default()
1077 .into_iter()
1078 .map(|summary| redact_flow_summary(summary, &redaction))
1079 .collect();
1080 }
1081 let (tx, rx) = oneshot::channel();
1082 if let Err(e) = self
1083 .flow_store
1084 .send(FlowStoreMessage::SearchFlows {
1085 query,
1086 respond_to: tx,
1087 })
1088 .await
1089 {
1090 error!("Failed to send SearchFlows request: {}", e);
1091 return Vec::new();
1092 }
1093 rx.await
1094 .unwrap_or_default()
1095 .into_iter()
1096 .map(|summary| redact_flow_summary(summary, &redaction))
1097 .collect()
1098 }
1099
1100 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1101 let redaction = self.current_redaction_policy();
1102 redact_flow_update(update, &redaction)
1103 }
1104
1105 pub async fn resolve_intercept_with_modifications(
1115 &self,
1116 key: String,
1117 action: &str,
1118 mods: Option<relay_core_api::modification::FlowModification>,
1119 ) -> Result<(), String> {
1120 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1121 .await
1122 }
1123
1124 pub async fn resolve_intercept_with_modifications_from(
1125 &self,
1126 actor: AuditActor,
1127 key: String,
1128 action: &str,
1129 mods: Option<relay_core_api::modification::FlowModification>,
1130 ) -> Result<(), String> {
1131 let modified_fields = modification_field_names(mods.as_ref());
1132 let result = match action {
1133 "drop" => InterceptionResult::Drop,
1134 _ => match mods {
1135 None => InterceptionResult::Continue,
1136 Some(m) => {
1137 let parts: Vec<&str> = key.splitn(4, ':').collect();
1138 match parts.as_slice() {
1139 [flow_id, phase] => {
1140 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1141 modification::apply_flow_modification(&flow, phase, m)
1142 } else {
1143 InterceptionResult::Continue
1144 }
1145 }
1146 [_, "ws_msg", _] => {
1149 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1150 modification::apply_ws_modification(&msg, m)
1151 } else {
1152 InterceptionResult::Continue
1153 }
1154 }
1155 _ => InterceptionResult::Continue,
1156 }
1157 }
1158 },
1159 };
1160 let outcome = self.resolve_intercept(key.clone(), result).await;
1161 let audit_outcome = if outcome.is_ok() {
1162 AuditOutcome::Success
1163 } else {
1164 AuditOutcome::Failed
1165 };
1166 let error_message = outcome.as_ref().err().cloned();
1167
1168 self.record_audit_event(AuditEvent::new(
1169 actor,
1170 AuditEventKind::InterceptResolved,
1171 key,
1172 audit_outcome,
1173 json!({
1174 "action": action,
1175 "has_modifications": !modified_fields.is_empty(),
1176 "modified_fields": modified_fields,
1177 "error": error_message
1178 }),
1179 ));
1180 outcome
1181 }
1182
1183 #[cfg(feature = "script")]
1184 pub async fn load_script_from(
1185 &self,
1186 actor: AuditActor,
1187 target: String,
1188 script: &str,
1189 ) -> Result<(), String> {
1190 let result = self
1191 .script_interceptor
1192 .load_script(script)
1193 .await
1194 .map_err(|e| e.to_string());
1195
1196 let outcome = if result.is_ok() {
1197 AuditOutcome::Success
1198 } else {
1199 AuditOutcome::Failed
1200 };
1201 let error_message = result.as_ref().err().cloned();
1202
1203 self.record_audit_event(AuditEvent::new(
1204 actor,
1205 AuditEventKind::ScriptReloaded,
1206 target,
1207 outcome,
1208 json!({
1209 "script_bytes": script.len(),
1210 "error": error_message
1211 }),
1212 ));
1213
1214 result
1215 }
1216
1217 #[cfg(feature = "script")]
1219 pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1220 self.script_interceptor.set_env_allow(env_allow).await;
1221 }
1222
1223 fn record_audit_event(&self, event: AuditEvent) {
1224 const AUDIT_HISTORY_LIMIT: usize = 200;
1225 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1226 if event.outcome == AuditOutcome::Failed {
1227 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1228 }
1229
1230 let details = log_format::audit_details_log(&event.kind, &event.details);
1231 tracing::info!(
1232 target: "relay_core_audit",
1233 event_id = %event.id,
1234 actor = %event.actor.as_str(),
1235 kind = %event.kind.as_str(),
1236 target = %event.target,
1237 outcome = %event.outcome.as_str(),
1238 details = %details
1239 );
1240
1241 if let Ok(mut history) = self.audit_history.lock() {
1242 if history.len() >= AUDIT_HISTORY_LIMIT {
1243 history.pop_front();
1244 }
1245 history.push_back(event.clone());
1246 }
1247
1248 if let Some(store) = self.store.clone() {
1249 let event_json = serde_json::to_value(&event).unwrap_or_default();
1250 let event_id = event.id.clone();
1251 let timestamp_ms = event.timestamp_ms;
1252 let actor = event.actor.as_str().to_string();
1253 let kind = event.kind.as_str().to_string();
1254 let target = event.target.clone();
1255 let outcome = event.outcome.as_str().to_string();
1256 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1257 handle.spawn(async move {
1258 if let Err(e) = store
1259 .save_audit_event(AuditEventRecord {
1260 id: &event_id,
1261 timestamp_ms,
1262 actor: &actor,
1263 kind: &kind,
1264 target: &target,
1265 outcome: &outcome,
1266 content: &event_json,
1267 })
1268 .await
1269 {
1270 tracing::error!("Failed to persist audit event: {}", e);
1271 }
1272 });
1273 }
1274 }
1275
1276 let _ = self.audit_broadcast_tx.send(event);
1277 }
1278
1279 fn transition_to_running(&self, port: u16) {
1280 self.update_lifecycle(RuntimeLifecycle {
1281 phase: RuntimeLifecyclePhase::Running,
1282 port: Some(port),
1283 started_at_ms: Some(now_unix_ms()),
1284 last_error: None,
1285 });
1286 }
1287
1288 fn transition_to_stopped(&self) {
1289 if let Ok(mut guard) = self.shutdown_tx.lock() {
1290 *guard = None;
1291 }
1292
1293 self.update_lifecycle(RuntimeLifecycle {
1294 phase: RuntimeLifecyclePhase::Stopped,
1295 port: None,
1296 started_at_ms: None,
1297 last_error: None,
1298 });
1299 }
1300
1301 fn transition_to_failed(&self, port: u16, error: String) {
1302 if let Ok(mut guard) = self.shutdown_tx.lock() {
1303 *guard = None;
1304 }
1305
1306 self.update_lifecycle(RuntimeLifecycle {
1307 phase: RuntimeLifecyclePhase::Failed,
1308 port: Some(port),
1309 started_at_ms: None,
1310 last_error: Some(error),
1311 });
1312 }
1313
1314 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1315 let err = lifecycle
1316 .last_error
1317 .as_deref()
1318 .filter(|s| !s.is_empty())
1319 .unwrap_or("-");
1320 tracing::info!(
1321 target: "relay_core_lifecycle",
1322 phase = %lifecycle.phase.as_str(),
1323 port = %log_format::opt_u16(lifecycle.port),
1324 started_at_ms = %log_format::opt_u64(lifecycle.started_at_ms),
1325 last_error = %err,
1326 );
1327 self.lifecycle_tx.send_replace(lifecycle);
1328 }
1329
1330 pub fn spawn_proxy(
1331 self: &Arc<Self>,
1332 config: ProxyConfig,
1333 sink: mpsc::Sender<FlowUpdate>,
1334 extra_interceptor: Option<Arc<dyn Interceptor>>,
1335 ) -> Result<ProxySpawnResult, String> {
1336 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1337 match self.prepare_start(config.port, shutdown_tx) {
1338 Ok(()) => {}
1339 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1340 Err(error) => return Err(error),
1341 }
1342 let state = self.clone();
1343 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1344 if let Err(error) = state
1345 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1346 .await
1347 {
1348 error!("Proxy failed: {}", error);
1349 }
1350 })))
1351 }
1352
1353 pub async fn start_proxy(
1354 self: &Arc<Self>,
1355 config: ProxyConfig,
1356 sink: mpsc::Sender<FlowUpdate>,
1357 extra_interceptor: Option<Arc<dyn Interceptor>>,
1358 ) -> Result<(), String> {
1359 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1360 self.prepare_start(config.port, shutdown_tx)?;
1361 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1362 .await
1363 }
1364
1365 async fn run_proxy(
1366 self: &Arc<Self>,
1367 config: ProxyConfig,
1368 sink: mpsc::Sender<FlowUpdate>,
1369 extra_interceptor: Option<Arc<dyn Interceptor>>,
1370 shutdown_rx: oneshot::Receiver<()>,
1371 ) -> Result<(), String> {
1372 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1373 let state = self.clone();
1374
1375 if let Some(parent) = config.ca_cert_path.parent()
1376 && !parent.exists()
1377 {
1378 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1379 }
1380
1381 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1382 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1383 let ca = Arc::new(ca);
1384
1385 #[cfg(feature = "script")]
1386 let script_interceptor = self.script_interceptor.clone();
1387
1388 #[cfg(feature = "script")]
1389 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1390
1391 #[cfg(not(feature = "script"))]
1392 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1393
1394 interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1395 self.clone(),
1396 )));
1397
1398 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1399 self.clone(),
1400 )));
1401
1402 if let Some(interceptor) = extra_interceptor {
1403 interceptors.push(interceptor);
1404 }
1405
1406 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1407 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1408
1409 tokio::spawn(async move {
1410 while let Some(update) = proxy_rx.recv().await {
1411 match update.clone() {
1412 FlowUpdate::Full(flow) => {
1413 state.upsert_flow(flow);
1414 }
1415 FlowUpdate::WebSocketMessage { flow_id, message } => {
1416 state.append_ws_message(flow_id, message);
1417 }
1418 FlowUpdate::HttpBody {
1419 flow_id,
1420 direction,
1421 body,
1422 } => {
1423 state.update_http_body(flow_id, body, direction);
1424 }
1425 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1426 state.tag_flow_budget_exceeded(flow_id, direction);
1428 }
1429 }
1430
1431 let _ = state.flow_broadcast_tx.send(update.clone());
1432
1433 if sink.try_send(update).is_err() {
1434 relay_core_lib::metrics::inc_flows_dropped();
1435 }
1436 }
1437 });
1438
1439 if let Some(udp_port) = config.udp_tproxy_port {
1440 let udp_proxy_tx = proxy_tx.clone();
1441 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1442 tokio::spawn(async move {
1443 match tokio::net::UdpSocket::bind(udp_addr).await {
1444 Ok(socket) => {
1445 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1446 if let Err(e) = proxy.run(udp_proxy_tx).await {
1447 error!("UDP TPROXY failed: {}", e);
1448 }
1449 }
1450 Err(e) => {
1451 error!("Failed to bind UDP TPROXY socket: {}", e);
1452 }
1453 }
1454 });
1455 }
1456
1457 let listener = match TcpListener::bind(addr).await {
1458 Ok(listener) => listener,
1459 Err(e) => {
1460 if let Ok(mut guard) = self.shutdown_tx.lock() {
1461 *guard = None;
1462 }
1463 let message = format!("Failed to bind to address {}: {}", addr, e);
1464 self.transition_to_failed(config.port, message.clone());
1465 return Err(message);
1466 }
1467 };
1468
1469 self.transition_to_running(config.port);
1470 let shutdown_rx = Some(shutdown_rx);
1471
1472 if config.transparent {
1473 let policy = ProxyPolicy {
1474 transparent_enabled: true,
1475 ..Default::default()
1476 };
1477 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1478 let policy_rx = self.policy_tx.subscribe();
1479
1480 let provider: Arc<dyn OriginalDstProvider> = {
1481 let mut addrs = BTreeSet::new();
1482 if let Ok(local) = listener.local_addr() {
1483 addrs.insert(local);
1484 }
1485
1486 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1487 {
1488 Arc::new(LinuxOriginalDstProvider::new(addrs))
1489 }
1490 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1491 {
1492 match MacOsOriginalDstProvider::new(addrs.clone()) {
1493 Ok(provider) => Arc::new(provider),
1494 Err(e) => {
1495 error!("Failed to initialize macOS PF provider: {}", e);
1496 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1497 }
1498 }
1499 }
1500 #[cfg(target_os = "windows")]
1501 {
1502 let filter =
1503 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1504 .to_string();
1505 let port = config.port;
1506 tokio::spawn(async move {
1507 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1508 .await;
1509 });
1510
1511 Arc::new(WindowsOriginalDstProvider::new(addrs))
1512 }
1513 #[cfg(not(any(
1514 all(target_os = "linux", feature = "transparent-linux"),
1515 all(target_os = "macos", feature = "transparent-macos"),
1516 target_os = "windows"
1517 )))]
1518 {
1519 Arc::new(NoOpOriginalDstProvider::new(addrs))
1520 }
1521 };
1522
1523 let source = TransparentTcpCaptureSource::new(listener, provider);
1524 let result = relay_core_lib::start_proxy(
1525 source,
1526 proxy_tx,
1527 interceptor,
1528 ca,
1529 policy_rx,
1530 None,
1531 shutdown_rx,
1532 Some(self.get_rule_engine().await),
1533 )
1534 .await
1535 .map_err(|e| e.to_string());
1536 if let Err(error) = &result {
1537 self.transition_to_failed(config.port, error.clone());
1538 } else {
1539 self.transition_to_stopped();
1540 }
1541 result
1542 } else {
1543 let source = TcpCaptureSource::new(listener);
1544 let policy = ProxyPolicy::default();
1545 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1546 let policy_rx = self.policy_tx.subscribe();
1547
1548 let result = relay_core_lib::start_proxy(
1549 source,
1550 proxy_tx,
1551 interceptor,
1552 ca,
1553 policy_rx,
1554 None,
1555 shutdown_rx,
1556 Some(self.get_rule_engine().await),
1557 )
1558 .await
1559 .map_err(|e| e.to_string());
1560 if let Err(error) = &result {
1561 self.transition_to_failed(config.port, error.clone());
1562 } else {
1563 self.transition_to_stopped();
1564 }
1565 result
1566 }
1567 }
1568}
1569
1570fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1571 match update {
1572 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1573 FlowUpdate::WebSocketMessage {
1574 flow_id,
1575 mut message,
1576 } => {
1577 message.content = redact_body(message.content, redaction);
1578 FlowUpdate::WebSocketMessage { flow_id, message }
1579 }
1580 FlowUpdate::HttpBody {
1581 flow_id,
1582 direction,
1583 body,
1584 } => FlowUpdate::HttpBody {
1585 flow_id,
1586 direction,
1587 body: redact_body(body, redaction),
1588 },
1589 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1590 FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1591 }
1592 }
1593}
1594
1595fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1596 if !redaction.enabled {
1597 return flow;
1598 }
1599 match &mut flow.layer {
1600 Layer::Http(http) => {
1601 redact_http_request(&mut http.request, redaction);
1602 if let Some(response) = &mut http.response {
1603 redact_headers(&mut response.headers, redaction);
1604 response.body = response
1605 .body
1606 .take()
1607 .map(|body| redact_body(body, redaction));
1608 }
1609 }
1610 Layer::WebSocket(ws) => {
1611 redact_http_request(&mut ws.handshake_request, redaction);
1612 redact_headers(&mut ws.handshake_response.headers, redaction);
1613 ws.handshake_response.body = ws
1614 .handshake_response
1615 .body
1616 .take()
1617 .map(|body| redact_body(body, redaction));
1618 for message in &mut ws.messages {
1619 message.content = redact_body(message.content.clone(), redaction);
1620 }
1621 }
1622 _ => {}
1623 }
1624 flow
1625}
1626
1627fn parse_intercept_key(key: &str) -> (String, String) {
1628 if let Some((flow_id, rest)) = key.split_once(':') {
1629 (flow_id.to_string(), rest.to_string())
1630 } else {
1631 (key.to_string(), String::new())
1632 }
1633}
1634
1635fn flow_url_method(flow: &Flow) -> (String, String) {
1636 match &flow.layer {
1637 Layer::Http(http) => (http.request.url.to_string(), http.request.method.clone()),
1638 Layer::WebSocket(ws) => (
1639 ws.handshake_request.url.to_string(),
1640 ws.handshake_request.method.clone(),
1641 ),
1642 _ => (String::new(), String::new()),
1643 }
1644}
1645
1646fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1647 if !redaction.enabled {
1648 return summary;
1649 }
1650 summary.url = redact_url_string(&summary.url, redaction);
1651 summary
1652}
1653
1654fn redact_http_request(
1655 request: &mut relay_core_api::flow::HttpRequest,
1656 redaction: &RedactionPolicy,
1657) {
1658 redact_headers(&mut request.headers, redaction);
1659 redact_query_pairs(&mut request.query, redaction);
1660 request.url = redact_url(&request.url, redaction);
1661 request.body = request.body.take().map(|body| redact_body(body, redaction));
1662}
1663
1664fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1665 if !redaction.enabled {
1666 return;
1667 }
1668 let sensitive = redaction_set(&redaction.sensitive_header_names);
1669 for (name, value) in headers.iter_mut() {
1670 if sensitive.contains(&name.to_ascii_lowercase()) {
1671 *value = "[REDACTED]".to_string();
1672 }
1673 }
1674}
1675
1676fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1677 if !redaction.enabled {
1678 return;
1679 }
1680 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1681 for (name, value) in query.iter_mut() {
1682 if sensitive.contains(&name.to_ascii_lowercase()) {
1683 *value = "[REDACTED]".to_string();
1684 }
1685 }
1686}
1687
1688fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1689 if !redaction.enabled {
1690 return url.clone();
1691 }
1692 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1693 let pairs: Vec<(String, String)> = url
1694 .query_pairs()
1695 .map(|(k, v)| {
1696 let key = k.to_string();
1697 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1698 "[REDACTED]".to_string()
1699 } else {
1700 v.to_string()
1701 };
1702 (key, value)
1703 })
1704 .collect();
1705 let mut next = url.clone();
1706 if pairs.is_empty() {
1707 return next;
1708 }
1709 next.query_pairs_mut().clear();
1710 for (k, v) in pairs {
1711 next.query_pairs_mut().append_pair(&k, &v);
1712 }
1713 next
1714}
1715
1716fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1717 match url::Url::parse(input) {
1718 Ok(url) => redact_url(&url, redaction).to_string(),
1719 Err(_) => input.to_string(),
1720 }
1721}
1722
1723fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1724 if redaction.enabled && redaction.redact_bodies {
1725 body.content = "[REDACTED]".to_string();
1726 }
1727 body
1728}
1729
1730fn redaction_set(values: &[String]) -> HashSet<String> {
1731 values
1732 .iter()
1733 .map(|value| value.to_ascii_lowercase())
1734 .collect()
1735}
1736
1737#[derive(Clone)]
1738pub struct ProxyConfig {
1739 pub port: u16,
1740 pub ca_cert_path: std::path::PathBuf,
1741 pub ca_key_path: std::path::PathBuf,
1742 pub transparent: bool,
1743 pub udp_tproxy_port: Option<u16>,
1744}
1745
1746impl ProxyConfig {
1747 pub fn new(
1748 port: u16,
1749 ca_cert_path: std::path::PathBuf,
1750 ca_key_path: std::path::PathBuf,
1751 ) -> Self {
1752 Self {
1753 port,
1754 ca_cert_path,
1755 ca_key_path,
1756 transparent: false,
1757 udp_tproxy_port: None,
1758 }
1759 }
1760
1761 pub fn from_app_data_dir(
1762 app_data_dir: impl Into<std::path::PathBuf>,
1763 port: u16,
1764 ) -> Result<Self, String> {
1765 let app_data_dir = app_data_dir.into();
1766 if !app_data_dir.exists() {
1767 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1768 }
1769
1770 Ok(Self::new(
1771 port,
1772 app_data_dir.join("ca_cert.pem"),
1773 app_data_dir.join("ca_key.pem"),
1774 ))
1775 }
1776
1777 pub fn with_transparent(mut self, transparent: bool) -> Self {
1778 self.transparent = transparent;
1779 self
1780 }
1781
1782 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1783 self.udp_tproxy_port = udp_tproxy_port;
1784 self
1785 }
1786}
1787
1788fn now_unix_ms() -> u64 {
1789 SystemTime::now()
1790 .duration_since(UNIX_EPOCH)
1791 .unwrap_or_default()
1792 .as_millis() as u64
1793}
1794
1795fn modification_field_names(
1796 mods: Option<&relay_core_api::modification::FlowModification>,
1797) -> Vec<&'static str> {
1798 let Some(mods) = mods else {
1799 return Vec::new();
1800 };
1801
1802 let mut fields = Vec::new();
1803 if mods.method.is_some() {
1804 fields.push("method");
1805 }
1806 if mods.url.is_some() {
1807 fields.push("url");
1808 }
1809 if mods.request_headers.is_some() {
1810 fields.push("request_headers");
1811 }
1812 if mods.request_body.is_some() {
1813 fields.push("request_body");
1814 }
1815 if mods.status_code.is_some() {
1816 fields.push("status_code");
1817 }
1818 if mods.response_headers.is_some() {
1819 fields.push("response_headers");
1820 }
1821 if mods.response_body.is_some() {
1822 fields.push("response_body");
1823 }
1824 if mods.message_content.is_some() {
1825 fields.push("message_content");
1826 }
1827 fields
1828}
1829
1830#[cfg(test)]
1831mod tests {
1832 use super::*;
1833 use chrono::Utc;
1834 use relay_core_api::flow::{
1835 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1836 ResponseTiming, TransportProtocol,
1837 };
1838 use std::sync::atomic::{AtomicU64, Ordering};
1839 use tokio::time::{Duration, sleep};
1840 use url::Url;
1841 use uuid::Uuid;
1842
1843 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1844
1845 fn sqlite_url() -> String {
1846 let nanos = SystemTime::now()
1847 .duration_since(UNIX_EPOCH)
1848 .expect("clock drift")
1849 .as_nanos();
1850 let pid = std::process::id();
1851 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1852 let db_dir = std::env::current_dir()
1853 .expect("cwd")
1854 .join("target")
1855 .join("test-dbs");
1856 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1857 let db_path = db_dir.join(format!(
1858 "relay-core-runtime-test-{}-{}-{}.db",
1859 pid, nanos, seq
1860 ));
1861 format!("sqlite://{}?mode=rwc", db_path.display())
1862 }
1863
1864 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1865 let start_time =
1866 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1867 let request_url =
1868 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1869 Flow {
1870 id: Uuid::new_v4(),
1871 start_time,
1872 end_time: Some(start_time),
1873 network: NetworkInfo {
1874 client_ip: "127.0.0.1".to_string(),
1875 client_port: 12000,
1876 server_ip: "127.0.0.1".to_string(),
1877 server_port: 8080,
1878 protocol: TransportProtocol::TCP,
1879 tls: false,
1880 tls_version: None,
1881 sni: None,
1882 },
1883 layer: Layer::Http(HttpLayer {
1884 request: HttpRequest {
1885 method: method.to_string(),
1886 url: request_url,
1887 version: "HTTP/1.1".to_string(),
1888 headers: vec![],
1889 cookies: vec![],
1890 query: vec![],
1891 body: None,
1892 },
1893 response: Some(HttpResponse {
1894 status,
1895 status_text: "OK".to_string(),
1896 version: "HTTP/1.1".to_string(),
1897 headers: vec![],
1898 cookies: vec![],
1899 body: None,
1900 timing: ResponseTiming {
1901 time_to_first_byte: None,
1902 time_to_last_byte: None,
1903 connect_time_ms: None,
1904 ssl_time_ms: None,
1905 },
1906 }),
1907 error: None,
1908 }),
1909 tags: vec![],
1910 meta: std::collections::HashMap::new(),
1911 resilience_trace: None,
1912 rule_variables: std::collections::HashMap::new(),
1913 matched_rules: vec![],
1914 }
1915 }
1916
1917 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1918 let start_time =
1919 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1920 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1921 .expect("url should parse");
1922 Flow {
1923 id: Uuid::new_v4(),
1924 start_time,
1925 end_time: Some(start_time),
1926 network: NetworkInfo {
1927 client_ip: "127.0.0.1".to_string(),
1928 client_port: 12000,
1929 server_ip: "127.0.0.1".to_string(),
1930 server_port: 8080,
1931 protocol: TransportProtocol::TCP,
1932 tls: false,
1933 tls_version: None,
1934 sni: None,
1935 },
1936 layer: Layer::Http(HttpLayer {
1937 request: HttpRequest {
1938 method: "GET".to_string(),
1939 url: request_url,
1940 version: "HTTP/1.1".to_string(),
1941 headers: vec![
1942 (
1943 "Authorization".to_string(),
1944 "Bearer secret-token".to_string(),
1945 ),
1946 ("X-Normal".to_string(), "visible".to_string()),
1947 ],
1948 cookies: vec![],
1949 query: vec![
1950 ("token".to_string(), "abc123".to_string()),
1951 ("ok".to_string(), "1".to_string()),
1952 ],
1953 body: Some(BodyData {
1954 encoding: "utf-8".to_string(),
1955 content: "secret request body".to_string(),
1956 size: 19,
1957 }),
1958 },
1959 response: Some(HttpResponse {
1960 status: 200,
1961 status_text: "OK".to_string(),
1962 version: "HTTP/1.1".to_string(),
1963 headers: vec![
1964 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1965 ("X-Response".to_string(), "visible".to_string()),
1966 ],
1967 cookies: vec![],
1968 body: Some(BodyData {
1969 encoding: "utf-8".to_string(),
1970 content: "secret response body".to_string(),
1971 size: 20,
1972 }),
1973 timing: ResponseTiming {
1974 time_to_first_byte: None,
1975 time_to_last_byte: None,
1976 connect_time_ms: None,
1977 ssl_time_ms: None,
1978 },
1979 }),
1980 error: None,
1981 }),
1982 tags: vec![],
1983 meta: std::collections::HashMap::new(),
1984 resilience_trace: None,
1985 rule_variables: std::collections::HashMap::new(),
1986 matched_rules: vec![],
1987 }
1988 }
1989
1990 #[tokio::test]
1991 async fn set_rules_from_records_audit_event() {
1992 let state = CoreState::new(None).await;
1993
1994 state
1995 .set_rules_from(
1996 AuditActor::Http,
1997 "rule.upsert",
1998 "rule-1".to_string(),
1999 json!({ "route": "/api/v1/rules" }),
2000 Vec::new(),
2001 )
2002 .await
2003 .expect("set rules should succeed");
2004
2005 let events = state.recent_audit_events();
2006 let event = events.last().expect("audit event should exist");
2007 assert_eq!(event.actor, AuditActor::Http);
2008 assert_eq!(event.kind, AuditEventKind::RuleChanged);
2009 assert_eq!(event.outcome, AuditOutcome::Success);
2010 assert_eq!(event.target, "rule-1");
2011 assert_eq!(event.details["operation"], "rule.upsert");
2012 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
2013 }
2014
2015 #[tokio::test]
2016 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
2017 let state = CoreState::new(None).await;
2018
2019 state
2020 .upsert_rule_from(
2021 AuditActor::Probe,
2022 "rule.upsert",
2023 "rule-1".to_string(),
2024 json!({ "tool": "set_rule" }),
2025 Rule {
2026 id: "rule-1".to_string(),
2027 name: "first".to_string(),
2028 active: true,
2029 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
2030 priority: 1,
2031 termination: relay_core_lib::rule::RuleTermination::Continue,
2032 filter: relay_core_lib::rule::Filter::Url(
2033 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
2034 ),
2035 actions: vec![],
2036 constraints: None,
2037 },
2038 )
2039 .await
2040 .expect("initial upsert should succeed");
2041
2042 state
2043 .upsert_rule_from(
2044 AuditActor::Probe,
2045 "rule.upsert",
2046 "rule-1".to_string(),
2047 json!({ "tool": "set_rule" }),
2048 Rule {
2049 id: "rule-1".to_string(),
2050 name: "second".to_string(),
2051 active: true,
2052 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
2053 priority: 2,
2054 termination: relay_core_lib::rule::RuleTermination::Continue,
2055 filter: relay_core_lib::rule::Filter::Url(
2056 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
2057 ),
2058 actions: vec![],
2059 constraints: None,
2060 },
2061 )
2062 .await
2063 .expect("replacement upsert should succeed");
2064
2065 let rules = state.get_rules().await;
2066 assert_eq!(rules.len(), 1);
2067 assert_eq!(rules[0].name, "second");
2068 let event = state
2069 .recent_audit_events()
2070 .last()
2071 .cloned()
2072 .expect("audit event");
2073 assert_eq!(event.details["operation"], "rule.upsert");
2074 assert_eq!(event.details["details"]["tool"], "set_rule");
2075 }
2076
2077 #[tokio::test]
2078 async fn delete_rule_from_returns_false_when_rule_missing() {
2079 let state = CoreState::new(None).await;
2080
2081 let deleted = state
2082 .delete_rule_from(
2083 AuditActor::Http,
2084 "rule.delete",
2085 "missing".to_string(),
2086 json!({ "route": "/api/v1/rules/{id}" }),
2087 "missing",
2088 )
2089 .await
2090 .expect("delete should not fail");
2091
2092 assert!(!deleted);
2093 assert!(state.recent_audit_events().is_empty());
2094 }
2095
2096 #[tokio::test]
2097 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2098 let state = CoreState::new(None).await;
2099
2100 let rule_id = state
2101 .create_mock_response_rule_from(
2102 AuditActor::Http,
2103 "api-mock-1".to_string(),
2104 json!({ "route": "/api/v1/mock", "status": 201 }),
2105 MockResponseRuleConfig {
2106 rule_id: "api-mock-1".to_string(),
2107 url_pattern: "example.com".to_string(),
2108 name: "api-mock:example.com".to_string(),
2109 status: 201,
2110 content_type: "application/json".to_string(),
2111 body: "{\"ok\":true}".to_string(),
2112 },
2113 )
2114 .await
2115 .expect("mock rule should be created");
2116
2117 assert_eq!(rule_id, "api-mock-1");
2118 let rules = state.get_rules().await;
2119 assert_eq!(rules.len(), 1);
2120 assert_eq!(rules[0].id, "api-mock-1");
2121 let event = state
2122 .recent_audit_events()
2123 .last()
2124 .cloned()
2125 .expect("audit event");
2126 assert_eq!(event.details["operation"], "rule.mock_create");
2127 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2128 }
2129
2130 #[tokio::test]
2131 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2132 let state = CoreState::new(None).await;
2133
2134 let rule_id = state
2135 .create_intercept_rule_from(
2136 AuditActor::Http,
2137 "intercept-1".to_string(),
2138 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2139 InterceptRuleConfig {
2140 rule_id: "intercept-1".to_string(),
2141 active: true,
2142 url_pattern: "example.com".to_string(),
2143 method: None,
2144 phase: "request".to_string(),
2145 name: "api-intercept:example.com".to_string(),
2146 priority: 100,
2147 termination: relay_core_lib::rule::RuleTermination::Stop,
2148 },
2149 )
2150 .await
2151 .expect("intercept rule should be created");
2152
2153 assert_eq!(rule_id, "intercept-1");
2154 let rules = state.get_rules().await;
2155 assert_eq!(rules.len(), 1);
2156 assert_eq!(rules[0].id, "intercept-1");
2157 assert_eq!(rules[0].name, "api-intercept:example.com");
2158 let event = state
2159 .recent_audit_events()
2160 .last()
2161 .cloned()
2162 .expect("audit event");
2163 assert_eq!(event.details["operation"], "rule.intercept_create");
2164 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2165 }
2166
2167 #[tokio::test]
2168 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2169 let state = CoreState::new(None).await;
2170
2171 state
2172 .upsert_legacy_intercept_rule_from(
2173 AuditActor::Tauri,
2174 "legacy-1".to_string(),
2175 json!({ "command": "set_intercept_rule" }),
2176 InterceptRule {
2177 id: "legacy-1".to_string(),
2178 active: true,
2179 url_pattern: "example.com".to_string(),
2180 method: None,
2181 phase: "both".to_string(),
2182 },
2183 )
2184 .await
2185 .expect("initial family upsert should succeed");
2186 assert_eq!(state.get_rules().await.len(), 2);
2187
2188 state
2189 .upsert_legacy_intercept_rule_from(
2190 AuditActor::Tauri,
2191 "legacy-1".to_string(),
2192 json!({ "command": "set_intercept_rule" }),
2193 InterceptRule {
2194 id: "legacy-1".to_string(),
2195 active: true,
2196 url_pattern: "example.org".to_string(),
2197 method: Some("POST".to_string()),
2198 phase: "request".to_string(),
2199 },
2200 )
2201 .await
2202 .expect("replacement family upsert should succeed");
2203
2204 let rules = state.get_rules().await;
2205 assert_eq!(rules.len(), 1);
2206 assert_eq!(rules[0].id, "legacy-1");
2207 let event = state
2208 .recent_audit_events()
2209 .last()
2210 .cloned()
2211 .expect("audit event");
2212 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2213 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2214 }
2215
2216 #[tokio::test]
2217 async fn resolve_intercept_failure_records_failed_audit_event() {
2218 let state = CoreState::new(None).await;
2219
2220 let result = state
2221 .resolve_intercept_with_modifications_from(
2222 AuditActor::Probe,
2223 "missing-flow:request".to_string(),
2224 "drop",
2225 None,
2226 )
2227 .await;
2228
2229 assert!(result.is_err());
2230 let events = state.recent_audit_events();
2231 let event = events.last().expect("audit event should exist");
2232 assert_eq!(event.actor, AuditActor::Probe);
2233 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2234 assert_eq!(event.outcome, AuditOutcome::Failed);
2235 assert_eq!(event.details["action"], "drop");
2236 assert!(
2237 event.details["error"]
2238 .as_str()
2239 .unwrap_or_default()
2240 .contains("Interception not found")
2241 );
2242 }
2243
2244 #[tokio::test]
2245 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2246 let state = CoreState::new(None).await;
2247 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2248
2249 state
2250 .prepare_start(8080, shutdown_tx)
2251 .expect("prepare start should succeed");
2252 let lifecycle = state.lifecycle();
2253 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2254 assert_eq!(lifecycle.port, Some(8080));
2255 assert!(lifecycle.started_at_ms.is_none());
2256 assert!(lifecycle.last_error.is_none());
2257
2258 assert_eq!(
2259 state.stop_proxy().expect("stop should succeed"),
2260 ProxyStopResult::Stopping
2261 );
2262 let lifecycle = state.lifecycle();
2263 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2264 assert_eq!(lifecycle.port, Some(8080));
2265 assert!(shutdown_rx.await.is_ok());
2266 }
2267
2268 #[tokio::test]
2269 async fn status_snapshot_derives_runtime_facing_fields() {
2270 let state = CoreState::new(None).await;
2271 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2272 state
2273 .prepare_start(8080, shutdown_tx)
2274 .expect("prepare start should succeed");
2275
2276 let status = state.status_snapshot();
2277 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2278 assert!(status.running);
2279 assert_eq!(status.port, Some(8080));
2280 assert!(status.uptime.is_none());
2281 assert!(status.last_error.is_none());
2282 }
2283
2284 #[tokio::test]
2285 async fn status_report_combines_status_and_metrics() {
2286 let state = CoreState::new(None).await;
2287 let report = state.status_report().await;
2288
2289 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2290 assert!(!report.status.running);
2291 assert_eq!(report.metrics.intercepts_pending, 0);
2292 assert_eq!(report.metrics.ws_pending_messages, 0);
2293 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2294 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2295 assert_eq!(report.metrics.audit_events_total, 0);
2296 assert_eq!(report.metrics.audit_events_failed, 0);
2297 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2298 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2299 }
2300
2301 #[test]
2302 fn proxy_config_new_and_transport_setters_preserve_values() {
2303 let config = ProxyConfig::new(
2304 8080,
2305 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2306 std::path::PathBuf::from("/tmp/ca_key.pem"),
2307 )
2308 .with_transparent(true)
2309 .with_udp_tproxy_port(Some(15000));
2310
2311 assert_eq!(config.port, 8080);
2312 assert_eq!(
2313 config.ca_cert_path,
2314 std::path::PathBuf::from("/tmp/ca_cert.pem")
2315 );
2316 assert_eq!(
2317 config.ca_key_path,
2318 std::path::PathBuf::from("/tmp/ca_key.pem")
2319 );
2320 assert!(config.transparent);
2321 assert_eq!(config.udp_tproxy_port, Some(15000));
2322 }
2323
2324 #[test]
2325 fn proxy_config_from_app_data_dir_creates_default_paths() {
2326 let unique = SystemTime::now()
2327 .duration_since(UNIX_EPOCH)
2328 .expect("clock drift")
2329 .as_nanos();
2330 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2331
2332 let config =
2333 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2334
2335 assert!(dir.exists());
2336 assert_eq!(config.port, 8899);
2337 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2338 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2339 assert!(!config.transparent);
2340 assert!(config.udp_tproxy_port.is_none());
2341 }
2342
2343 #[tokio::test]
2344 async fn intercept_snapshot_maps_pending_counts() {
2345 let state = CoreState::new(None).await;
2346 let snapshot = state.intercept_snapshot().await;
2347
2348 assert_eq!(snapshot.pending_count, 0);
2349 assert_eq!(snapshot.ws_pending_count, 0);
2350 }
2351
2352 #[tokio::test]
2353 async fn audit_snapshot_returns_latest_events_in_order() {
2354 let state = CoreState::new(None).await;
2355 state.record_audit_event(AuditEvent::new(
2356 AuditActor::Runtime,
2357 AuditEventKind::RuleChanged,
2358 "first",
2359 AuditOutcome::Success,
2360 json!({ "index": 1 }),
2361 ));
2362 state.record_audit_event(AuditEvent::new(
2363 AuditActor::Http,
2364 AuditEventKind::PolicyUpdated,
2365 "second",
2366 AuditOutcome::Success,
2367 json!({ "index": 2 }),
2368 ));
2369
2370 let snapshot = state.audit_snapshot(1);
2371
2372 assert_eq!(snapshot.events.len(), 1);
2373 assert_eq!(snapshot.events[0].target, "second");
2374 assert_eq!(snapshot.events[0].details["index"], 2);
2375 }
2376
2377 #[tokio::test]
2378 async fn query_audit_snapshot_filters_in_memory_events() {
2379 let state = CoreState::new(None).await;
2380 state.record_audit_event(AuditEvent::new(
2381 AuditActor::Http,
2382 AuditEventKind::RuleChanged,
2383 "rule-1",
2384 AuditOutcome::Success,
2385 json!({ "idx": 1 }),
2386 ));
2387 state.record_audit_event(AuditEvent::new(
2388 AuditActor::Probe,
2389 AuditEventKind::PolicyUpdated,
2390 "policy",
2391 AuditOutcome::Failed,
2392 json!({ "idx": 2 }),
2393 ));
2394
2395 let snapshot = state
2396 .query_audit_snapshot(CoreAuditQuery {
2397 actor: Some(AuditActor::Probe),
2398 kind: Some(AuditEventKind::PolicyUpdated),
2399 outcome: Some(AuditOutcome::Failed),
2400 limit: 10,
2401 ..Default::default()
2402 })
2403 .await;
2404
2405 assert_eq!(snapshot.events.len(), 1);
2406 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2407 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2408 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2409 }
2410
2411 #[tokio::test]
2412 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2413 let state = CoreState::new(Some(sqlite_url())).await;
2414 state.update_policy_from(
2415 AuditActor::Http,
2416 "policy".to_string(),
2417 ProxyPolicy {
2418 transparent_enabled: true,
2419 ..Default::default()
2420 },
2421 );
2422
2423 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2424 for _ in 0..10 {
2425 snapshot = state
2426 .query_audit_snapshot(CoreAuditQuery {
2427 actor: Some(AuditActor::Http),
2428 kind: Some(AuditEventKind::PolicyUpdated),
2429 limit: 10,
2430 ..Default::default()
2431 })
2432 .await;
2433 if !snapshot.events.is_empty() {
2434 break;
2435 }
2436 sleep(Duration::from_millis(20)).await;
2437 }
2438
2439 assert!(!snapshot.events.is_empty());
2440 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2441 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2442 }
2443
2444 #[tokio::test]
2445 async fn prepare_start_rejects_second_active_start() {
2446 let state = CoreState::new(None).await;
2447 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2448 state
2449 .prepare_start(8080, shutdown_tx)
2450 .expect("first start should succeed");
2451
2452 let (second_tx, _second_rx) = oneshot::channel();
2453 let error = state
2454 .prepare_start(8081, second_tx)
2455 .expect_err("second active start should be rejected");
2456 assert!(error.contains("already"));
2457 }
2458
2459 #[test]
2460 fn update_policy_records_audit_event() {
2461 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2462 let state = runtime.block_on(CoreState::new(None));
2463
2464 state.update_policy_from(
2465 AuditActor::Runtime,
2466 "policy".to_string(),
2467 ProxyPolicy {
2468 transparent_enabled: true,
2469 ..Default::default()
2470 },
2471 );
2472
2473 let events = state.recent_audit_events();
2474 let event = events.last().expect("audit event should exist");
2475 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2476 assert_eq!(event.outcome, AuditOutcome::Success);
2477 assert_eq!(event.details["transparent_enabled"], true);
2478 }
2479
2480 #[test]
2481 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2482 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2483 let state = runtime.block_on(CoreState::new(None));
2484 let original_timeout = state.policy_snapshot().request_timeout_ms;
2485
2486 state.patch_policy_from(
2487 AuditActor::Runtime,
2488 "policy.patch".to_string(),
2489 relay_core_api::policy::ProxyPolicyPatch {
2490 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2491 enabled: Some(true),
2492 redact_bodies: Some(true),
2493 ..Default::default()
2494 }),
2495 upstream: None,
2496 },
2497 );
2498
2499 let policy = state.policy_snapshot();
2500 assert_eq!(policy.request_timeout_ms, original_timeout);
2501 assert!(policy.redaction.enabled);
2502 assert!(policy.redaction.redact_bodies);
2503
2504 let events = state.recent_audit_events();
2505 let event = events.last().expect("audit event should exist");
2506 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2507 assert_eq!(event.details["redaction_enabled"], true);
2508 }
2509
2510 #[tokio::test]
2511 async fn metrics_include_audit_and_lagged_event_counters() {
2512 let state = CoreState::new(None).await;
2513
2514 state.update_policy_from(
2515 AuditActor::Runtime,
2516 "policy".to_string(),
2517 ProxyPolicy::default(),
2518 );
2519 let _ = state
2520 .resolve_intercept_with_modifications_from(
2521 AuditActor::Probe,
2522 "missing-flow:request".to_string(),
2523 "drop",
2524 None,
2525 )
2526 .await;
2527
2528 state.record_flow_events_lagged(3);
2529 state.record_audit_events_lagged(5);
2530
2531 let metrics = state.get_metrics().await;
2532 assert_eq!(metrics.audit_events_total, 2);
2533 assert_eq!(metrics.audit_events_failed, 1);
2534 assert_eq!(metrics.flow_events_lagged_total, 3);
2535 assert_eq!(metrics.audit_events_lagged_total, 5);
2536 }
2537
2538 #[tokio::test]
2539 async fn prometheus_metrics_text_contains_observability_fields() {
2540 let state = CoreState::new(None).await;
2541 state.record_flow_events_lagged(2);
2542 state.record_audit_events_lagged(4);
2543
2544 let text = state.get_metrics_prometheus_text().await;
2545 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2546 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2547 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2548 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2549 }
2550
2551 #[tokio::test]
2552 async fn search_flows_uses_store_with_offset_pagination() {
2553 let state = CoreState::new(Some(sqlite_url())).await;
2554 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2555 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2556 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2557
2558 state.upsert_flow(Box::new(flow_a));
2559 state.upsert_flow(Box::new(flow_b));
2560 state.upsert_flow(Box::new(flow_c));
2561
2562 let mut baseline = Vec::new();
2563 for _ in 0..20 {
2564 baseline = state
2565 .search_flows(FlowQuery {
2566 host: Some("api.example.com".to_string()),
2567 path_contains: None,
2568 method: None,
2569 status_min: None,
2570 status_max: None,
2571 has_error: None,
2572 is_websocket: None,
2573 limit: Some(3),
2574 offset: Some(0),
2575 })
2576 .await;
2577 if baseline.len() == 3 {
2578 break;
2579 }
2580 sleep(Duration::from_millis(20)).await;
2581 }
2582
2583 assert_eq!(baseline.len(), 3);
2584 let page = state
2585 .search_flows(FlowQuery {
2586 host: Some("api.example.com".to_string()),
2587 path_contains: None,
2588 method: None,
2589 status_min: None,
2590 status_max: None,
2591 has_error: None,
2592 is_websocket: None,
2593 limit: Some(1),
2594 offset: Some(1),
2595 })
2596 .await;
2597 assert_eq!(page.len(), 1);
2598 assert_eq!(page[0].id, baseline[1].id);
2599 }
2600
2601 #[tokio::test]
2602 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2603 let state = CoreState::new(Some(sqlite_url())).await;
2604 let first_flow = sample_http_flow(
2605 "persist.example.com",
2606 "/first",
2607 "GET",
2608 200,
2609 1_700_000_010_000,
2610 );
2611 let first_id = first_flow.id.to_string();
2612 state.upsert_flow(Box::new(first_flow));
2613 for i in 0..240 {
2614 state.upsert_flow(Box::new(sample_http_flow(
2615 "persist.example.com",
2616 &format!("/{}", i),
2617 "GET",
2618 200,
2619 1_700_000_020_000 + i,
2620 )));
2621 }
2622
2623 sleep(Duration::from_millis(200)).await;
2624
2625 let loaded = state.get_flow(first_id).await;
2626 assert!(loaded.is_some());
2627 }
2628
2629 #[tokio::test]
2630 async fn search_flows_redacts_summary_url_when_enabled() {
2631 let state = CoreState::new(None).await;
2632 state.update_policy_from(
2633 AuditActor::Runtime,
2634 "policy.redaction".to_string(),
2635 ProxyPolicy {
2636 redaction: RedactionPolicy {
2637 enabled: true,
2638 sensitive_query_keys: vec!["token".to_string()],
2639 redact_bodies: false,
2640 ..Default::default()
2641 },
2642 ..Default::default()
2643 },
2644 );
2645 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2646
2647 let mut items = Vec::new();
2648 for _ in 0..20 {
2649 items = state
2650 .search_flows(FlowQuery {
2651 host: Some("api.example.com".to_string()),
2652 path_contains: Some("/private".to_string()),
2653 ..Default::default()
2654 })
2655 .await;
2656 if !items.is_empty() {
2657 break;
2658 }
2659 sleep(Duration::from_millis(20)).await;
2660 }
2661
2662 assert!(!items.is_empty());
2663 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2664 let token = redacted
2665 .query_pairs()
2666 .find(|(k, _)| k == "token")
2667 .map(|(_, v)| v.to_string());
2668 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2669 }
2670
2671 #[tokio::test]
2672 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2673 let state = CoreState::new(Some(sqlite_url())).await;
2674 state.update_policy_from(
2675 AuditActor::Runtime,
2676 "policy.redaction".to_string(),
2677 ProxyPolicy {
2678 redaction: RedactionPolicy {
2679 enabled: true,
2680 sensitive_header_names: vec![
2681 "authorization".to_string(),
2682 "set-cookie".to_string(),
2683 ],
2684 sensitive_query_keys: vec!["token".to_string()],
2685 redact_bodies: true,
2686 },
2687 ..Default::default()
2688 },
2689 );
2690
2691 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2692 let flow_id = flow.id.to_string();
2693 state.upsert_flow(Box::new(flow));
2694 sleep(Duration::from_millis(80)).await;
2695
2696 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2697 let Layer::Http(http) = loaded.layer else {
2698 panic!("expected http layer");
2699 };
2700
2701 let auth = http
2702 .request
2703 .headers
2704 .iter()
2705 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2706 .map(|(_, v)| v.as_str());
2707 assert_eq!(auth, Some("[REDACTED]"));
2708
2709 let req_query_token = http
2710 .request
2711 .query
2712 .iter()
2713 .find(|(k, _)| k == "token")
2714 .map(|(_, v)| v.as_str());
2715 assert_eq!(req_query_token, Some("[REDACTED]"));
2716
2717 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2718 assert_eq!(req_body, Some("[REDACTED]"));
2719
2720 let response = http.response.expect("response should exist");
2721 let set_cookie = response
2722 .headers
2723 .iter()
2724 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2725 .map(|(_, v)| v.as_str());
2726 assert_eq!(set_cookie, Some("[REDACTED]"));
2727 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2728 assert_eq!(res_body, Some("[REDACTED]"));
2729 }
2730
2731 #[test]
2732 fn redact_flow_update_masks_http_body_when_enabled() {
2733 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2734 let state = runtime.block_on(CoreState::new(None));
2735 state.update_policy_from(
2736 AuditActor::Runtime,
2737 "policy.redaction".to_string(),
2738 ProxyPolicy {
2739 redaction: RedactionPolicy {
2740 enabled: true,
2741 redact_bodies: true,
2742 ..Default::default()
2743 },
2744 ..Default::default()
2745 },
2746 );
2747
2748 let update = FlowUpdate::HttpBody {
2749 flow_id: "f-1".to_string(),
2750 direction: relay_core_api::flow::Direction::ClientToServer,
2751 body: BodyData {
2752 encoding: "utf-8".to_string(),
2753 content: "super-secret".to_string(),
2754 size: 12,
2755 },
2756 };
2757 let redacted = state.redact_flow_update_for_output(update);
2758 match redacted {
2759 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2760 _ => panic!("expected http body update"),
2761 }
2762 }
2763
2764 #[cfg(feature = "script")]
2765 #[tokio::test]
2766 async fn load_script_from_records_audit_event() {
2767 let state = CoreState::new(None).await;
2768
2769 state
2770 .load_script_from(
2771 AuditActor::Tauri,
2772 "tauri.load_script".to_string(),
2773 "globalThis.onRequestHeaders = (_flow) => {};",
2774 )
2775 .await
2776 .expect("script should load");
2777
2778 let events = state.recent_audit_events();
2779 let event = events.last().expect("audit event should exist");
2780 assert_eq!(event.actor, AuditActor::Tauri);
2781 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2782 assert_eq!(event.outcome, AuditOutcome::Success);
2783 assert_eq!(event.target, "tauri.load_script");
2784 }
2785}