1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod paths;
64pub mod rule;
65pub mod services;
66
67pub use paths::CaPaths;
70pub use relay_core_api::{flow, policy};
71pub use relay_core_lib::InterceptionResult;
72pub use relay_core_lib::rule as lib_rule;
73
74use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
75use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
76use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
77
78pub mod rule_engine {
79 pub use relay_core_lib::rule_engine::*;
80}
81
82#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
83pub struct CoreMetrics {
84 pub flows_total: usize,
85 pub flows_in_memory: usize,
86 pub flows_dropped: usize,
87 pub intercepts_pending: usize,
88 pub ws_pending_messages: usize,
89 pub oldest_intercept_age_ms: Option<u64>,
90 pub oldest_ws_message_age_ms: Option<u64>,
91 pub rule_exec_errors: usize,
92 pub audit_events_total: usize,
93 pub audit_events_failed: usize,
94 pub flow_events_lagged_total: usize,
95 pub audit_events_lagged_total: usize,
96 pub proxy_body_degraded_total: usize,
98 pub proxy_http_request_total: usize,
100 pub proxy_sandbox_reject_total: usize,
102 pub proxy_invalid_method_total: usize,
104 pub proxy_invalid_status_total: usize,
106 pub proxy_retry_total: usize,
108 pub proxy_stream_mode_tap_total: usize,
110 pub proxy_stream_mode_degrade_total: usize,
112}
113
114impl CoreMetrics {
115 pub fn to_prometheus_text(&self) -> String {
116 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
117 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
118 format!(
119 "relay_core_flows_total {}\n\
120relay_core_flows_in_memory {}\n\
121relay_core_flows_dropped_total {}\n\
122relay_core_intercepts_pending {}\n\
123relay_core_ws_pending_messages {}\n\
124relay_core_oldest_intercept_age_ms {}\n\
125relay_core_oldest_ws_message_age_ms {}\n\
126relay_core_rule_exec_errors_total {}\n\
127relay_core_audit_events_total {}\n\
128relay_core_audit_events_failed_total {}\n\
129relay_core_flow_events_lagged_total {}\n\
130relay_core_audit_events_lagged_total {}\n\
131relay_core_proxy_body_degraded_total {}\n\
132relay_core_proxy_http_request_total {}\n\
133relay_core_proxy_sandbox_reject_total {}\n\
134relay_core_proxy_invalid_method_total {}\n\
135relay_core_proxy_invalid_status_total {}\n\
136relay_core_proxy_retry_total {}\n\
137relay_core_proxy_stream_mode_tap_total {}\n\
138relay_core_proxy_stream_mode_degrade_total {}\n",
139 self.flows_total,
140 self.flows_in_memory,
141 self.flows_dropped,
142 self.intercepts_pending,
143 self.ws_pending_messages,
144 oldest_intercept_age_ms,
145 oldest_ws_message_age_ms,
146 self.rule_exec_errors,
147 self.audit_events_total,
148 self.audit_events_failed,
149 self.flow_events_lagged_total,
150 self.audit_events_lagged_total,
151 self.proxy_body_degraded_total,
152 self.proxy_http_request_total,
153 self.proxy_sandbox_reject_total,
154 self.proxy_invalid_method_total,
155 self.proxy_invalid_status_total,
156 self.proxy_retry_total,
157 self.proxy_stream_mode_tap_total,
158 self.proxy_stream_mode_degrade_total,
159 )
160 }
161}
162
163#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
164pub struct CoreStatusSnapshot {
165 pub phase: RuntimeLifecyclePhase,
166 pub running: bool,
167 pub port: Option<u16>,
168 pub uptime: Option<u64>,
169 pub last_error: Option<String>,
170}
171
172#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
173pub struct CoreStatusReport {
174 pub status: CoreStatusSnapshot,
175 pub metrics: CoreMetrics,
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179pub struct CoreInterceptSnapshot {
180 pub pending_count: usize,
181 pub ws_pending_count: usize,
182}
183
184#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
185pub struct CoreAuditSnapshot {
186 pub events: Vec<AuditEvent>,
187}
188
189#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
190pub struct CoreAuditQuery {
191 pub since_ms: Option<u64>,
192 pub until_ms: Option<u64>,
193 pub actor: Option<AuditActor>,
194 pub kind: Option<AuditEventKind>,
195 pub outcome: Option<AuditOutcome>,
196 pub limit: usize,
197}
198
199impl Default for CoreAuditQuery {
200 fn default() -> Self {
201 Self {
202 since_ms: None,
203 until_ms: None,
204 actor: None,
205 kind: None,
206 outcome: None,
207 limit: 50,
208 }
209 }
210}
211
212#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
213#[serde(rename_all = "snake_case")]
214pub enum RuntimeLifecyclePhase {
215 Created,
216 Starting,
217 Running,
218 Stopping,
219 Stopped,
220 Failed,
221}
222
223impl RuntimeLifecyclePhase {
224 pub fn as_str(&self) -> &'static str {
225 match self {
226 Self::Created => "created",
227 Self::Starting => "starting",
228 Self::Running => "running",
229 Self::Stopping => "stopping",
230 Self::Stopped => "stopped",
231 Self::Failed => "failed",
232 }
233 }
234
235 pub fn is_active(&self) -> bool {
236 matches!(self, Self::Starting | Self::Running | Self::Stopping)
237 }
238}
239
240#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
241pub struct RuntimeLifecycle {
242 pub phase: RuntimeLifecyclePhase,
243 pub port: Option<u16>,
244 pub started_at_ms: Option<u64>,
245 pub last_error: Option<String>,
246}
247
248impl RuntimeLifecycle {
249 pub fn created() -> Self {
250 Self {
251 phase: RuntimeLifecyclePhase::Created,
252 port: None,
253 started_at_ms: None,
254 last_error: None,
255 }
256 }
257
258 pub fn is_active(&self) -> bool {
259 self.phase.is_active()
260 }
261
262 pub fn uptime_seconds(&self) -> Option<u64> {
263 let started_at_ms = self.started_at_ms?;
264 let now_ms = now_unix_ms();
265 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
266 }
267}
268
269pub enum ProxySpawnResult {
270 Started(tokio::task::JoinHandle<()>),
271 AlreadyRunning,
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub enum ProxyStopResult {
276 Stopping,
277 NotRunning,
278}
279
280impl From<RuntimeLifecycle> for CoreStatusSnapshot {
281 fn from(lifecycle: RuntimeLifecycle) -> Self {
282 Self {
283 running: lifecycle.is_active(),
284 uptime: lifecycle.uptime_seconds(),
285 port: lifecycle.port,
286 last_error: lifecycle.last_error,
287 phase: lifecycle.phase,
288 }
289 }
290}
291
292pub struct CoreState {
293 flow_store: mpsc::Sender<FlowStoreMessage>,
294 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
295 rule_store: mpsc::Sender<RuleStoreMessage>,
296 store: Option<Store>,
297 #[cfg(feature = "script")]
298 pub script_interceptor: Arc<ScriptInterceptor>,
299 pub policy_tx: watch::Sender<ProxyPolicy>,
300 pub flows_dropped: Arc<AtomicUsize>,
301 audit_events_total: Arc<AtomicUsize>,
302 audit_events_failed: Arc<AtomicUsize>,
303 flow_events_lagged_total: Arc<AtomicUsize>,
304 audit_events_lagged_total: Arc<AtomicUsize>,
305 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
307 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
308 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
309 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
310 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
311}
312
313impl CoreState {
314 pub async fn new(db_url: Option<String>) -> Self {
315 const AUDIT_HISTORY_LIMIT: usize = 200;
316
317 let store = if let Some(url) = db_url {
318 match Store::connect(&url).await {
319 Ok(s) => {
320 if let Err(e) = s.init().await {
321 tracing::error!("Failed to init store: {}", e);
322 }
323 Some(s)
324 }
325 Err(e) => {
326 tracing::error!("Failed to connect to store: {}", e);
327 None
328 }
329 }
330 } else {
331 None
332 };
333
334 let (flow_tx, flow_rx) = mpsc::channel(10000);
335 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
336 tokio::spawn(flow_actor.run());
337
338 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
339 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
340 tokio::spawn(intercept_actor.run());
341
342 let (rule_tx, rule_rx) = mpsc::channel(100);
343 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
344 tokio::spawn(rule_actor.run());
345
346 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
347 let (flow_broadcast_tx, _) = broadcast::channel(1000);
348 let (audit_broadcast_tx, _) = broadcast::channel(256);
349 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
350
351 #[cfg(feature = "script")]
352 let script_interceptor = ScriptInterceptor::new()
353 .await
354 .expect("Failed to initialize ScriptInterceptor");
355 Self {
356 flow_store: flow_tx,
357 intercept_broker: intercept_tx,
358 rule_store: rule_tx,
359 store,
360 #[cfg(feature = "script")]
361 script_interceptor: Arc::new(script_interceptor),
362 policy_tx,
363 flows_dropped: Arc::new(AtomicUsize::new(0)),
364 audit_events_total: Arc::new(AtomicUsize::new(0)),
365 audit_events_failed: Arc::new(AtomicUsize::new(0)),
366 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
367 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
368 flow_broadcast_tx,
369 audit_broadcast_tx,
370 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
371 lifecycle_tx,
372 shutdown_tx: Mutex::new(None),
373 }
374 }
375
376 pub async fn get_metrics(&self) -> CoreMetrics {
377 let (flow_tx, flow_rx) = oneshot::channel();
378 let _ = self
379 .flow_store
380 .send(FlowStoreMessage::GetMetrics(flow_tx))
381 .await;
382 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
383
384 let (int_tx, int_rx) = oneshot::channel();
385 let _ = self
386 .intercept_broker
387 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
388 .await;
389 let (
390 intercepts_pending,
391 ws_pending_messages,
392 oldest_intercept_age_ms,
393 oldest_ws_message_age_ms,
394 ) = int_rx.await.unwrap_or((0, 0, None, None));
395
396 let (rule_tx, rule_rx) = oneshot::channel();
397 let _ = self
398 .rule_store
399 .send(RuleStoreMessage::GetMetrics(rule_tx))
400 .await;
401 let rule_exec_errors = rule_rx.await.unwrap_or(0);
402
403 CoreMetrics {
404 flows_total,
405 flows_in_memory,
406 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
407 intercepts_pending,
408 ws_pending_messages,
409 oldest_intercept_age_ms,
410 oldest_ws_message_age_ms,
411 rule_exec_errors,
412 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
413 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
414 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
415 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
416 proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
417 proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
418 proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
419 proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
420 proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
421 proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
422 proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
423 proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
424 ),
425 }
426 }
427
428 pub async fn get_metrics_prometheus_text(&self) -> String {
429 let mut text = self.get_metrics().await.to_prometheus_text();
430 #[cfg(feature = "script")]
431 {
432 text.push_str(&self.script_interceptor.metrics.prometheus_lines());
433 }
434 text
435 }
436
437 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
438 self.lifecycle().into()
439 }
440
441 pub async fn status_report(&self) -> CoreStatusReport {
442 CoreStatusReport {
443 status: self.status_snapshot(),
444 metrics: self.get_metrics().await,
445 }
446 }
447
448 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
449 let metrics = self.get_metrics().await;
450 CoreInterceptSnapshot {
451 pending_count: metrics.intercepts_pending,
452 ws_pending_count: metrics.ws_pending_messages,
453 }
454 }
455
456 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
457 let events = self.recent_audit_events();
458 let start = events.len().saturating_sub(limit);
459 CoreAuditSnapshot {
460 events: events.into_iter().skip(start).collect(),
461 }
462 }
463
464 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
465 let limit = query.limit.clamp(1, 500);
466 if let Some(store) = &self.store {
467 let rows = store
468 .query_audit_events(
469 query.since_ms,
470 query.until_ms,
471 query.actor.as_ref().map(AuditActor::as_str),
472 query.kind.as_ref().map(AuditEventKind::as_str),
473 query.outcome.as_ref().map(AuditOutcome::as_str),
474 limit,
475 )
476 .await
477 .unwrap_or_default();
478
479 let mut events = Vec::with_capacity(rows.len());
480 for row in rows {
481 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
482 events.push(event);
483 }
484 }
485 return CoreAuditSnapshot { events };
486 }
487
488 let mut events = self.recent_audit_events();
489 events.reverse();
490 let filtered = events
491 .into_iter()
492 .filter(|event| {
493 query
494 .since_ms
495 .map(|v| event.timestamp_ms >= v)
496 .unwrap_or(true)
497 })
498 .filter(|event| {
499 query
500 .until_ms
501 .map(|v| event.timestamp_ms <= v)
502 .unwrap_or(true)
503 })
504 .filter(|event| {
505 query
506 .actor
507 .as_ref()
508 .map(|v| &event.actor == v)
509 .unwrap_or(true)
510 })
511 .filter(|event| {
512 query
513 .kind
514 .as_ref()
515 .map(|v| &event.kind == v)
516 .unwrap_or(true)
517 })
518 .filter(|event| {
519 query
520 .outcome
521 .as_ref()
522 .map(|v| &event.outcome == v)
523 .unwrap_or(true)
524 })
525 .take(limit)
526 .collect();
527 CoreAuditSnapshot { events: filtered }
528 }
529
530 pub async fn get_flow(&self, id: String) -> Option<Flow> {
531 let (tx, rx) = oneshot::channel();
532 if let Err(e) = self
533 .flow_store
534 .send(FlowStoreMessage::GetFlow {
535 id: id.clone(),
536 respond_to: tx,
537 })
538 .await
539 {
540 error!("Failed to send GetFlow request: {}", e);
541 if let Some(store) = &self.store {
542 return store
543 .load_flow(&id)
544 .await
545 .ok()
546 .flatten()
547 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
548 }
549 return None;
550 }
551 let flow = rx
552 .await
553 .map_err(|e| {
554 error!("Failed to receive Flow response: {}", e);
555 e
556 })
557 .unwrap_or(None);
558 if let Some(flow) = flow {
559 return Some(redact_flow(flow, &self.current_redaction_policy()));
560 }
561 if let Some(store) = &self.store {
562 return store
563 .load_flow(&id)
564 .await
565 .ok()
566 .flatten()
567 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
568 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
569 }
570 None
571 }
572
573 pub async fn get_rules(&self) -> Vec<Rule> {
574 let (tx, rx) = oneshot::channel();
575 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
576 error!("Failed to send GetRules request: {}", e);
577 return Vec::new();
578 }
579 rx.await
580 .map_err(|e| {
581 error!("Failed to receive Rules response: {}", e);
582 e
583 })
584 .unwrap_or_default()
585 }
586
587 pub async fn set_rules(&self, rules: Vec<Rule>) {
588 let _ = self
589 .set_rules_from(
590 AuditActor::Runtime,
591 "rules.replace",
592 "rule_set".to_string(),
593 json!({}),
594 rules,
595 )
596 .await;
597 }
598
599 pub async fn upsert_rule_from(
600 &self,
601 actor: AuditActor,
602 operation: &str,
603 target: String,
604 details: serde_json::Value,
605 rule: Rule,
606 ) -> Result<(), String> {
607 let rule_id = rule.id.clone();
608 let mut rules = self.get_rules().await;
609 rules.retain(|existing| existing.id != rule_id);
610 rules.push(rule);
611 self.set_rules_from(actor, operation, target, details, rules)
612 .await
613 }
614
615 pub async fn delete_rule_from(
616 &self,
617 actor: AuditActor,
618 operation: &str,
619 target: String,
620 details: serde_json::Value,
621 rule_id: &str,
622 ) -> Result<bool, String> {
623 let mut rules = self.get_rules().await;
624 let before = rules.len();
625 rules.retain(|rule| rule.id != rule_id);
626 if rules.len() == before {
627 return Ok(false);
628 }
629 self.set_rules_from(actor, operation, target, details, rules)
630 .await
631 .map(|_| true)
632 }
633
634 pub async fn create_mock_response_rule_from(
635 &self,
636 actor: AuditActor,
637 target: String,
638 details: serde_json::Value,
639 config: MockResponseRuleConfig,
640 ) -> Result<String, String> {
641 let rule_id = config.rule_id.clone();
642 let rule = build_mock_response_rule(config);
643 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
644 .await
645 .map(|_| rule_id)
646 }
647
648 pub async fn create_intercept_rule_from(
649 &self,
650 actor: AuditActor,
651 target: String,
652 details: serde_json::Value,
653 config: InterceptRuleConfig,
654 ) -> Result<String, String> {
655 let rule_id = config.rule_id.clone();
656 let mut rules = self.get_rules().await;
657 rules.extend(build_intercept_rules(config));
658 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
659 .await
660 .map(|_| rule_id)
661 }
662
663 pub async fn upsert_legacy_intercept_rule_from(
664 &self,
665 actor: AuditActor,
666 target: String,
667 details: serde_json::Value,
668 rule: InterceptRule,
669 ) -> Result<(), String> {
670 let rule_id = rule.id.clone();
671 let mut rules = self.get_rules().await;
672 rules.retain(|existing| {
673 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
674 });
675 rules.extend(rule.to_rules());
676 self.set_rules_from(
677 actor,
678 "rule.intercept_legacy_upsert",
679 target,
680 details,
681 rules,
682 )
683 .await
684 }
685
686 pub async fn set_rules_from(
687 &self,
688 actor: AuditActor,
689 operation: &str,
690 target: String,
691 details: serde_json::Value,
692 rules: Vec<Rule>,
693 ) -> Result<(), String> {
694 let rule_count = rules.len();
695 if let Err(e) = self
696 .rule_store
697 .send(RuleStoreMessage::SetRules(rules))
698 .await
699 {
700 error!("Failed to send SetRules request: {}", e);
701 self.record_audit_event(AuditEvent::new(
702 actor,
703 AuditEventKind::RuleChanged,
704 target,
705 AuditOutcome::Failed,
706 json!({
707 "operation": operation,
708 "rule_count": rule_count,
709 "details": details,
710 "error": e.to_string()
711 }),
712 ));
713 return Err(e.to_string());
714 }
715
716 self.record_audit_event(AuditEvent::new(
717 actor,
718 AuditEventKind::RuleChanged,
719 target,
720 AuditOutcome::Success,
721 json!({
722 "operation": operation,
723 "rule_count": rule_count,
724 "details": details
725 }),
726 ));
727 Ok(())
728 }
729
730 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
731 let mut new_rules = Vec::new();
732 for rule in rules {
733 new_rules.extend(rule.to_rules());
734 }
735 self.set_rules(new_rules).await;
736 }
737
738 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
739 let (tx, rx) = oneshot::channel();
740 if let Err(e) = self
741 .rule_store
742 .send(RuleStoreMessage::GetRuleEngine(tx))
743 .await
744 {
745 error!("Failed to send GetRuleEngine request: {}", e);
746 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
747 }
748 rx.await
749 .map_err(|e| {
750 error!("Failed to receive RuleEngine response: {}", e);
751 e
752 })
753 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
754 }
755
756 pub fn update_policy(&self, policy: ProxyPolicy) {
757 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
758 }
759
760 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
761 let mut policy = self.policy_snapshot();
762 policy.apply_patch(patch);
763 self.update_policy_from(actor, target, policy);
764 }
765
766 pub fn policy_snapshot(&self) -> ProxyPolicy {
767 self.policy_tx.borrow().clone()
768 }
769
770 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
771 let details = json!({
772 "strict_http_semantics": policy.strict_http_semantics,
773 "request_timeout_ms": policy.request_timeout_ms,
774 "max_body_size": policy.max_body_size,
775 "transparent_enabled": policy.transparent_enabled,
776 "redaction_enabled": policy.redaction.enabled,
777 "redact_bodies": policy.redaction.redact_bodies
778 });
779
780 self.policy_tx.send_replace(policy);
781 self.record_audit_event(AuditEvent::new(
782 actor,
783 AuditEventKind::PolicyUpdated,
784 target,
785 AuditOutcome::Success,
786 details,
787 ));
788 }
789
790 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
791 if let Err(e) = self
792 .intercept_broker
793 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
794 .await
795 {
796 error!("Failed to send RegisterIntercept request: {}", e);
797 }
798 }
799
800 pub async fn resolve_intercept(
801 &self,
802 key: String,
803 result: InterceptionResult,
804 ) -> Result<(), String> {
805 let (tx, rx) = oneshot::channel();
806 if let Err(e) = self
807 .intercept_broker
808 .send(InterceptBrokerMessage::ResolveIntercept {
809 key,
810 result,
811 respond_to: tx,
812 })
813 .await
814 {
815 error!("Failed to send ResolveIntercept request: {}", e);
816 return Err(e.to_string());
817 }
818 rx.await.map_err(|_| "Actor dropped".to_string())?
819 }
820
821 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
822 let (tx, rx) = oneshot::channel();
823 if let Err(e) = self
824 .intercept_broker
825 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
826 key,
827 respond_to: tx,
828 })
829 .await
830 {
831 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
832 return None;
833 }
834 rx.await
835 .map_err(|e| {
836 error!("Failed to receive WebSocketMessage response: {}", e);
837 e
838 })
839 .unwrap_or(None)
840 }
841
842 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
843 if let Err(e) = self
844 .intercept_broker
845 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
846 .await
847 {
848 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
849 }
850 }
851
852 pub async fn is_intercept_pending(&self, key: String) -> bool {
853 let (tx, rx) = oneshot::channel();
854 if let Err(e) = self
855 .intercept_broker
856 .send(InterceptBrokerMessage::GetPendingIntercept {
857 key,
858 respond_to: tx,
859 })
860 .await
861 {
862 error!("Failed to send GetPendingIntercept request: {}", e);
863 return false;
864 }
865 rx.await
866 .map_err(|e| {
867 error!("Failed to receive PendingIntercept response: {}", e);
868 e
869 })
870 .unwrap_or(false)
871 }
872
873 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
874 let (tx, rx) = oneshot::channel();
875 if let Err(e) = self
876 .intercept_broker
877 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
878 flow_id,
879 respond_to: tx,
880 })
881 .await
882 {
883 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
884 return false;
885 }
886 rx.await
887 .map_err(|e| {
888 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
889 e
890 })
891 .unwrap_or(false)
892 }
893
894 pub fn upsert_flow(&self, flow: Box<Flow>) {
895 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
896 error!("FlowStore dropped flow: {}", e);
898 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
899 }
900 }
901
902 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
903 if let Err(e) = self
904 .flow_store
905 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
906 {
907 error!("FlowStore dropped WS message: {}", e);
908 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
909 }
910 }
911
912 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
913 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
914 flow_id,
915 body,
916 direction,
917 }) {
918 error!("FlowStore dropped HTTP body: {}", e);
919 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
920 }
921 }
922
923 pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
925 if let Err(e) = self
926 .flow_store
927 .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
928 {
929 error!("FlowStore dropped budget-exceeded tag: {}", e);
930 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
931 }
932 }
933
934 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
936 self.flow_broadcast_tx.subscribe()
937 }
938
939 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
940 self.audit_broadcast_tx.subscribe()
941 }
942
943 fn current_redaction_policy(&self) -> RedactionPolicy {
944 self.policy_tx.borrow().redaction.clone()
945 }
946
947 pub fn record_flow_events_lagged(&self, skipped: u64) {
948 self.flow_events_lagged_total
949 .fetch_add(skipped as usize, Ordering::Relaxed);
950 }
951
952 pub fn record_audit_events_lagged(&self, skipped: u64) {
953 self.audit_events_lagged_total
954 .fetch_add(skipped as usize, Ordering::Relaxed);
955 }
956
957 pub fn lifecycle(&self) -> RuntimeLifecycle {
958 self.lifecycle_tx.borrow().clone()
959 }
960
961 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
962 self.lifecycle_tx.subscribe()
963 }
964
965 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
966 let current = self.lifecycle();
967 if current.is_active() {
968 return Err(format!(
969 "Proxy is already {} on port {}",
970 current.phase.as_str(),
971 current.port.unwrap_or(port)
972 ));
973 }
974
975 let mut guard = self
976 .shutdown_tx
977 .lock()
978 .map_err(|_| "shutdown state poisoned".to_string())?;
979 *guard = Some(shutdown_tx);
980 drop(guard);
981
982 self.update_lifecycle(RuntimeLifecycle {
983 phase: RuntimeLifecyclePhase::Starting,
984 port: Some(port),
985 started_at_ms: None,
986 last_error: None,
987 });
988 Ok(())
989 }
990
991 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
992 let mut guard = self
993 .shutdown_tx
994 .lock()
995 .map_err(|_| "shutdown state poisoned".to_string())?;
996 let Some(tx) = guard.take() else {
997 return Ok(ProxyStopResult::NotRunning);
998 };
999 drop(guard);
1000
1001 let current = self.lifecycle();
1002 self.update_lifecycle(RuntimeLifecycle {
1003 phase: RuntimeLifecyclePhase::Stopping,
1004 port: current.port,
1005 started_at_ms: current.started_at_ms,
1006 last_error: current.last_error,
1007 });
1008 let _ = tx.send(());
1009 Ok(ProxyStopResult::Stopping)
1010 }
1011
1012 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1013 self.audit_history
1014 .lock()
1015 .map(|events| events.iter().cloned().collect())
1016 .unwrap_or_default()
1017 }
1018
1019 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1021 let redaction = self.current_redaction_policy();
1022 if let Some(store) = &self.store {
1023 return store
1024 .query_flow_summaries(&query)
1025 .await
1026 .unwrap_or_default()
1027 .into_iter()
1028 .map(|summary| redact_flow_summary(summary, &redaction))
1029 .collect();
1030 }
1031 let (tx, rx) = oneshot::channel();
1032 if let Err(e) = self
1033 .flow_store
1034 .send(FlowStoreMessage::SearchFlows {
1035 query,
1036 respond_to: tx,
1037 })
1038 .await
1039 {
1040 error!("Failed to send SearchFlows request: {}", e);
1041 return Vec::new();
1042 }
1043 rx.await
1044 .unwrap_or_default()
1045 .into_iter()
1046 .map(|summary| redact_flow_summary(summary, &redaction))
1047 .collect()
1048 }
1049
1050 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1051 let redaction = self.current_redaction_policy();
1052 redact_flow_update(update, &redaction)
1053 }
1054
1055 pub async fn resolve_intercept_with_modifications(
1065 &self,
1066 key: String,
1067 action: &str,
1068 mods: Option<relay_core_api::modification::FlowModification>,
1069 ) -> Result<(), String> {
1070 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1071 .await
1072 }
1073
1074 pub async fn resolve_intercept_with_modifications_from(
1075 &self,
1076 actor: AuditActor,
1077 key: String,
1078 action: &str,
1079 mods: Option<relay_core_api::modification::FlowModification>,
1080 ) -> Result<(), String> {
1081 let modified_fields = modification_field_names(mods.as_ref());
1082 let result = match action {
1083 "drop" => InterceptionResult::Drop,
1084 _ => match mods {
1085 None => InterceptionResult::Continue,
1086 Some(m) => {
1087 let parts: Vec<&str> = key.splitn(4, ':').collect();
1088 match parts.as_slice() {
1089 [flow_id, phase] => {
1090 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1091 modification::apply_flow_modification(&flow, phase, m)
1092 } else {
1093 InterceptionResult::Continue
1094 }
1095 }
1096 [_, "ws_msg", _] => {
1099 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1100 modification::apply_ws_modification(&msg, m)
1101 } else {
1102 InterceptionResult::Continue
1103 }
1104 }
1105 _ => InterceptionResult::Continue,
1106 }
1107 }
1108 },
1109 };
1110 let outcome = self.resolve_intercept(key.clone(), result).await;
1111 let audit_outcome = if outcome.is_ok() {
1112 AuditOutcome::Success
1113 } else {
1114 AuditOutcome::Failed
1115 };
1116 let error_message = outcome.as_ref().err().cloned();
1117
1118 self.record_audit_event(AuditEvent::new(
1119 actor,
1120 AuditEventKind::InterceptResolved,
1121 key,
1122 audit_outcome,
1123 json!({
1124 "action": action,
1125 "has_modifications": !modified_fields.is_empty(),
1126 "modified_fields": modified_fields,
1127 "error": error_message
1128 }),
1129 ));
1130 outcome
1131 }
1132
1133 #[cfg(feature = "script")]
1134 pub async fn load_script_from(
1135 &self,
1136 actor: AuditActor,
1137 target: String,
1138 script: &str,
1139 ) -> Result<(), String> {
1140 let result = self
1141 .script_interceptor
1142 .load_script(script)
1143 .await
1144 .map_err(|e| e.to_string());
1145
1146 let outcome = if result.is_ok() {
1147 AuditOutcome::Success
1148 } else {
1149 AuditOutcome::Failed
1150 };
1151 let error_message = result.as_ref().err().cloned();
1152
1153 self.record_audit_event(AuditEvent::new(
1154 actor,
1155 AuditEventKind::ScriptReloaded,
1156 target,
1157 outcome,
1158 json!({
1159 "script_bytes": script.len(),
1160 "error": error_message
1161 }),
1162 ));
1163
1164 result
1165 }
1166
1167 #[cfg(feature = "script")]
1169 pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1170 self.script_interceptor.set_env_allow(env_allow).await;
1171 }
1172
1173 fn record_audit_event(&self, event: AuditEvent) {
1174 const AUDIT_HISTORY_LIMIT: usize = 200;
1175 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1176 if event.outcome == AuditOutcome::Failed {
1177 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1178 }
1179
1180 let details = event.details.to_string();
1181 tracing::info!(
1182 target: "relay_core_audit",
1183 event_id = %event.id,
1184 actor = %event.actor.as_str(),
1185 kind = %event.kind.as_str(),
1186 target = %event.target,
1187 outcome = %event.outcome.as_str(),
1188 details = %details
1189 );
1190
1191 if let Ok(mut history) = self.audit_history.lock() {
1192 if history.len() >= AUDIT_HISTORY_LIMIT {
1193 history.pop_front();
1194 }
1195 history.push_back(event.clone());
1196 }
1197
1198 if let Some(store) = self.store.clone() {
1199 let event_json = serde_json::to_value(&event).unwrap_or_default();
1200 let event_id = event.id.clone();
1201 let timestamp_ms = event.timestamp_ms;
1202 let actor = event.actor.as_str().to_string();
1203 let kind = event.kind.as_str().to_string();
1204 let target = event.target.clone();
1205 let outcome = event.outcome.as_str().to_string();
1206 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1207 handle.spawn(async move {
1208 if let Err(e) = store
1209 .save_audit_event(AuditEventRecord {
1210 id: &event_id,
1211 timestamp_ms,
1212 actor: &actor,
1213 kind: &kind,
1214 target: &target,
1215 outcome: &outcome,
1216 content: &event_json,
1217 })
1218 .await
1219 {
1220 tracing::error!("Failed to persist audit event: {}", e);
1221 }
1222 });
1223 }
1224 }
1225
1226 let _ = self.audit_broadcast_tx.send(event);
1227 }
1228
1229 fn transition_to_running(&self, port: u16) {
1230 self.update_lifecycle(RuntimeLifecycle {
1231 phase: RuntimeLifecyclePhase::Running,
1232 port: Some(port),
1233 started_at_ms: Some(now_unix_ms()),
1234 last_error: None,
1235 });
1236 }
1237
1238 fn transition_to_stopped(&self) {
1239 if let Ok(mut guard) = self.shutdown_tx.lock() {
1240 *guard = None;
1241 }
1242
1243 self.update_lifecycle(RuntimeLifecycle {
1244 phase: RuntimeLifecyclePhase::Stopped,
1245 port: None,
1246 started_at_ms: None,
1247 last_error: None,
1248 });
1249 }
1250
1251 fn transition_to_failed(&self, port: u16, error: String) {
1252 if let Ok(mut guard) = self.shutdown_tx.lock() {
1253 *guard = None;
1254 }
1255
1256 self.update_lifecycle(RuntimeLifecycle {
1257 phase: RuntimeLifecyclePhase::Failed,
1258 port: Some(port),
1259 started_at_ms: None,
1260 last_error: Some(error),
1261 });
1262 }
1263
1264 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1265 tracing::info!(
1266 target: "relay_core_lifecycle",
1267 phase = %lifecycle.phase.as_str(),
1268 port = ?lifecycle.port,
1269 started_at_ms = ?lifecycle.started_at_ms,
1270 last_error = ?lifecycle.last_error
1271 );
1272 self.lifecycle_tx.send_replace(lifecycle);
1273 }
1274
1275 pub fn spawn_proxy(
1276 self: &Arc<Self>,
1277 config: ProxyConfig,
1278 sink: mpsc::Sender<FlowUpdate>,
1279 extra_interceptor: Option<Arc<dyn Interceptor>>,
1280 ) -> Result<ProxySpawnResult, String> {
1281 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1282 match self.prepare_start(config.port, shutdown_tx) {
1283 Ok(()) => {}
1284 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1285 Err(error) => return Err(error),
1286 }
1287 let state = self.clone();
1288 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1289 if let Err(error) = state
1290 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1291 .await
1292 {
1293 error!("Proxy failed: {}", error);
1294 }
1295 })))
1296 }
1297
1298 pub async fn start_proxy(
1299 self: &Arc<Self>,
1300 config: ProxyConfig,
1301 sink: mpsc::Sender<FlowUpdate>,
1302 extra_interceptor: Option<Arc<dyn Interceptor>>,
1303 ) -> Result<(), String> {
1304 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1305 self.prepare_start(config.port, shutdown_tx)?;
1306 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1307 .await
1308 }
1309
1310 async fn run_proxy(
1311 self: &Arc<Self>,
1312 config: ProxyConfig,
1313 sink: mpsc::Sender<FlowUpdate>,
1314 extra_interceptor: Option<Arc<dyn Interceptor>>,
1315 shutdown_rx: oneshot::Receiver<()>,
1316 ) -> Result<(), String> {
1317 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1318 let state = self.clone();
1319
1320 if let Some(parent) = config.ca_cert_path.parent()
1321 && !parent.exists()
1322 {
1323 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1324 }
1325
1326 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1327 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1328 let ca = Arc::new(ca);
1329
1330 #[cfg(feature = "script")]
1331 let script_interceptor = self.script_interceptor.clone();
1332
1333 #[cfg(feature = "script")]
1334 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1335
1336 #[cfg(not(feature = "script"))]
1337 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1338
1339 interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1340 self.clone(),
1341 )));
1342
1343 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1344 self.clone(),
1345 )));
1346
1347 if let Some(interceptor) = extra_interceptor {
1348 interceptors.push(interceptor);
1349 }
1350
1351 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1352 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1353
1354 tokio::spawn(async move {
1355 while let Some(update) = proxy_rx.recv().await {
1356 match update.clone() {
1357 FlowUpdate::Full(flow) => {
1358 state.upsert_flow(flow);
1359 }
1360 FlowUpdate::WebSocketMessage { flow_id, message } => {
1361 state.append_ws_message(flow_id, message);
1362 }
1363 FlowUpdate::HttpBody {
1364 flow_id,
1365 direction,
1366 body,
1367 } => {
1368 state.update_http_body(flow_id, body, direction);
1369 }
1370 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1371 state.tag_flow_budget_exceeded(flow_id, direction);
1373 }
1374 }
1375
1376 let _ = state.flow_broadcast_tx.send(update.clone());
1377
1378 if sink.try_send(update).is_err() {
1379 relay_core_lib::metrics::inc_flows_dropped();
1380 }
1381 }
1382 });
1383
1384 if let Some(udp_port) = config.udp_tproxy_port {
1385 let udp_proxy_tx = proxy_tx.clone();
1386 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1387 tokio::spawn(async move {
1388 match tokio::net::UdpSocket::bind(udp_addr).await {
1389 Ok(socket) => {
1390 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1391 if let Err(e) = proxy.run(udp_proxy_tx).await {
1392 error!("UDP TPROXY failed: {}", e);
1393 }
1394 }
1395 Err(e) => {
1396 error!("Failed to bind UDP TPROXY socket: {}", e);
1397 }
1398 }
1399 });
1400 }
1401
1402 let listener = match TcpListener::bind(addr).await {
1403 Ok(listener) => listener,
1404 Err(e) => {
1405 if let Ok(mut guard) = self.shutdown_tx.lock() {
1406 *guard = None;
1407 }
1408 let message = format!("Failed to bind to address {}: {}", addr, e);
1409 self.transition_to_failed(config.port, message.clone());
1410 return Err(message);
1411 }
1412 };
1413
1414 self.transition_to_running(config.port);
1415 let shutdown_rx = Some(shutdown_rx);
1416
1417 if config.transparent {
1418 let policy = ProxyPolicy {
1419 transparent_enabled: true,
1420 ..Default::default()
1421 };
1422 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1423 let policy_rx = self.policy_tx.subscribe();
1424
1425 let provider: Arc<dyn OriginalDstProvider> = {
1426 let mut addrs = BTreeSet::new();
1427 if let Ok(local) = listener.local_addr() {
1428 addrs.insert(local);
1429 }
1430
1431 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1432 {
1433 Arc::new(LinuxOriginalDstProvider::new(addrs))
1434 }
1435 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1436 {
1437 match MacOsOriginalDstProvider::new(addrs.clone()) {
1438 Ok(provider) => Arc::new(provider),
1439 Err(e) => {
1440 error!("Failed to initialize macOS PF provider: {}", e);
1441 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1442 }
1443 }
1444 }
1445 #[cfg(target_os = "windows")]
1446 {
1447 let filter =
1448 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1449 .to_string();
1450 let port = config.port;
1451 tokio::spawn(async move {
1452 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1453 .await;
1454 });
1455
1456 Arc::new(WindowsOriginalDstProvider::new(addrs))
1457 }
1458 #[cfg(not(any(
1459 all(target_os = "linux", feature = "transparent-linux"),
1460 all(target_os = "macos", feature = "transparent-macos"),
1461 target_os = "windows"
1462 )))]
1463 {
1464 Arc::new(NoOpOriginalDstProvider::new(addrs))
1465 }
1466 };
1467
1468 let source = TransparentTcpCaptureSource::new(listener, provider);
1469 let result = relay_core_lib::start_proxy(
1470 source,
1471 proxy_tx,
1472 interceptor,
1473 ca,
1474 policy_rx,
1475 None,
1476 shutdown_rx,
1477 Some(self.get_rule_engine().await),
1478 )
1479 .await
1480 .map_err(|e| e.to_string());
1481 if let Err(error) = &result {
1482 self.transition_to_failed(config.port, error.clone());
1483 } else {
1484 self.transition_to_stopped();
1485 }
1486 result
1487 } else {
1488 let source = TcpCaptureSource::new(listener);
1489 let policy = ProxyPolicy::default();
1490 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1491 let policy_rx = self.policy_tx.subscribe();
1492
1493 let result = relay_core_lib::start_proxy(
1494 source,
1495 proxy_tx,
1496 interceptor,
1497 ca,
1498 policy_rx,
1499 None,
1500 shutdown_rx,
1501 Some(self.get_rule_engine().await),
1502 )
1503 .await
1504 .map_err(|e| e.to_string());
1505 if let Err(error) = &result {
1506 self.transition_to_failed(config.port, error.clone());
1507 } else {
1508 self.transition_to_stopped();
1509 }
1510 result
1511 }
1512 }
1513}
1514
1515fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1516 match update {
1517 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1518 FlowUpdate::WebSocketMessage {
1519 flow_id,
1520 mut message,
1521 } => {
1522 message.content = redact_body(message.content, redaction);
1523 FlowUpdate::WebSocketMessage { flow_id, message }
1524 }
1525 FlowUpdate::HttpBody {
1526 flow_id,
1527 direction,
1528 body,
1529 } => FlowUpdate::HttpBody {
1530 flow_id,
1531 direction,
1532 body: redact_body(body, redaction),
1533 },
1534 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1535 FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1536 }
1537 }
1538}
1539
1540fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1541 if !redaction.enabled {
1542 return flow;
1543 }
1544 match &mut flow.layer {
1545 Layer::Http(http) => {
1546 redact_http_request(&mut http.request, redaction);
1547 if let Some(response) = &mut http.response {
1548 redact_headers(&mut response.headers, redaction);
1549 response.body = response
1550 .body
1551 .take()
1552 .map(|body| redact_body(body, redaction));
1553 }
1554 }
1555 Layer::WebSocket(ws) => {
1556 redact_http_request(&mut ws.handshake_request, redaction);
1557 redact_headers(&mut ws.handshake_response.headers, redaction);
1558 ws.handshake_response.body = ws
1559 .handshake_response
1560 .body
1561 .take()
1562 .map(|body| redact_body(body, redaction));
1563 for message in &mut ws.messages {
1564 message.content = redact_body(message.content.clone(), redaction);
1565 }
1566 }
1567 _ => {}
1568 }
1569 flow
1570}
1571
1572fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1573 if !redaction.enabled {
1574 return summary;
1575 }
1576 summary.url = redact_url_string(&summary.url, redaction);
1577 summary
1578}
1579
1580fn redact_http_request(
1581 request: &mut relay_core_api::flow::HttpRequest,
1582 redaction: &RedactionPolicy,
1583) {
1584 redact_headers(&mut request.headers, redaction);
1585 redact_query_pairs(&mut request.query, redaction);
1586 request.url = redact_url(&request.url, redaction);
1587 request.body = request.body.take().map(|body| redact_body(body, redaction));
1588}
1589
1590fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1591 if !redaction.enabled {
1592 return;
1593 }
1594 let sensitive = redaction_set(&redaction.sensitive_header_names);
1595 for (name, value) in headers.iter_mut() {
1596 if sensitive.contains(&name.to_ascii_lowercase()) {
1597 *value = "[REDACTED]".to_string();
1598 }
1599 }
1600}
1601
1602fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1603 if !redaction.enabled {
1604 return;
1605 }
1606 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1607 for (name, value) in query.iter_mut() {
1608 if sensitive.contains(&name.to_ascii_lowercase()) {
1609 *value = "[REDACTED]".to_string();
1610 }
1611 }
1612}
1613
1614fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1615 if !redaction.enabled {
1616 return url.clone();
1617 }
1618 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1619 let pairs: Vec<(String, String)> = url
1620 .query_pairs()
1621 .map(|(k, v)| {
1622 let key = k.to_string();
1623 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1624 "[REDACTED]".to_string()
1625 } else {
1626 v.to_string()
1627 };
1628 (key, value)
1629 })
1630 .collect();
1631 let mut next = url.clone();
1632 if pairs.is_empty() {
1633 return next;
1634 }
1635 next.query_pairs_mut().clear();
1636 for (k, v) in pairs {
1637 next.query_pairs_mut().append_pair(&k, &v);
1638 }
1639 next
1640}
1641
1642fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1643 match url::Url::parse(input) {
1644 Ok(url) => redact_url(&url, redaction).to_string(),
1645 Err(_) => input.to_string(),
1646 }
1647}
1648
1649fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1650 if redaction.enabled && redaction.redact_bodies {
1651 body.content = "[REDACTED]".to_string();
1652 }
1653 body
1654}
1655
1656fn redaction_set(values: &[String]) -> HashSet<String> {
1657 values
1658 .iter()
1659 .map(|value| value.to_ascii_lowercase())
1660 .collect()
1661}
1662
1663#[derive(Clone)]
1664pub struct ProxyConfig {
1665 pub port: u16,
1666 pub ca_cert_path: std::path::PathBuf,
1667 pub ca_key_path: std::path::PathBuf,
1668 pub transparent: bool,
1669 pub udp_tproxy_port: Option<u16>,
1670}
1671
1672impl ProxyConfig {
1673 pub fn new(
1674 port: u16,
1675 ca_cert_path: std::path::PathBuf,
1676 ca_key_path: std::path::PathBuf,
1677 ) -> Self {
1678 Self {
1679 port,
1680 ca_cert_path,
1681 ca_key_path,
1682 transparent: false,
1683 udp_tproxy_port: None,
1684 }
1685 }
1686
1687 pub fn from_app_data_dir(
1688 app_data_dir: impl Into<std::path::PathBuf>,
1689 port: u16,
1690 ) -> Result<Self, String> {
1691 let app_data_dir = app_data_dir.into();
1692 if !app_data_dir.exists() {
1693 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1694 }
1695
1696 Ok(Self::new(
1697 port,
1698 app_data_dir.join("ca_cert.pem"),
1699 app_data_dir.join("ca_key.pem"),
1700 ))
1701 }
1702
1703 pub fn with_transparent(mut self, transparent: bool) -> Self {
1704 self.transparent = transparent;
1705 self
1706 }
1707
1708 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1709 self.udp_tproxy_port = udp_tproxy_port;
1710 self
1711 }
1712}
1713
1714fn now_unix_ms() -> u64 {
1715 SystemTime::now()
1716 .duration_since(UNIX_EPOCH)
1717 .unwrap_or_default()
1718 .as_millis() as u64
1719}
1720
1721fn modification_field_names(
1722 mods: Option<&relay_core_api::modification::FlowModification>,
1723) -> Vec<&'static str> {
1724 let Some(mods) = mods else {
1725 return Vec::new();
1726 };
1727
1728 let mut fields = Vec::new();
1729 if mods.method.is_some() {
1730 fields.push("method");
1731 }
1732 if mods.url.is_some() {
1733 fields.push("url");
1734 }
1735 if mods.request_headers.is_some() {
1736 fields.push("request_headers");
1737 }
1738 if mods.request_body.is_some() {
1739 fields.push("request_body");
1740 }
1741 if mods.status_code.is_some() {
1742 fields.push("status_code");
1743 }
1744 if mods.response_headers.is_some() {
1745 fields.push("response_headers");
1746 }
1747 if mods.response_body.is_some() {
1748 fields.push("response_body");
1749 }
1750 if mods.message_content.is_some() {
1751 fields.push("message_content");
1752 }
1753 fields
1754}
1755
1756#[cfg(test)]
1757mod tests {
1758 use super::*;
1759 use chrono::Utc;
1760 use relay_core_api::flow::{
1761 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1762 ResponseTiming, TransportProtocol,
1763 };
1764 use std::sync::atomic::{AtomicU64, Ordering};
1765 use tokio::time::{Duration, sleep};
1766 use url::Url;
1767 use uuid::Uuid;
1768
1769 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1770
1771 fn sqlite_url() -> String {
1772 let nanos = SystemTime::now()
1773 .duration_since(UNIX_EPOCH)
1774 .expect("clock drift")
1775 .as_nanos();
1776 let pid = std::process::id();
1777 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1778 let db_dir = std::env::current_dir()
1779 .expect("cwd")
1780 .join("target")
1781 .join("test-dbs");
1782 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1783 let db_path = db_dir.join(format!(
1784 "relay-core-runtime-test-{}-{}-{}.db",
1785 pid, nanos, seq
1786 ));
1787 format!("sqlite://{}?mode=rwc", db_path.display())
1788 }
1789
1790 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1791 let start_time =
1792 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1793 let request_url =
1794 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1795 Flow {
1796 id: Uuid::new_v4(),
1797 start_time,
1798 end_time: Some(start_time),
1799 network: NetworkInfo {
1800 client_ip: "127.0.0.1".to_string(),
1801 client_port: 12000,
1802 server_ip: "127.0.0.1".to_string(),
1803 server_port: 8080,
1804 protocol: TransportProtocol::TCP,
1805 tls: false,
1806 tls_version: None,
1807 sni: None,
1808 },
1809 layer: Layer::Http(HttpLayer {
1810 request: HttpRequest {
1811 method: method.to_string(),
1812 url: request_url,
1813 version: "HTTP/1.1".to_string(),
1814 headers: vec![],
1815 cookies: vec![],
1816 query: vec![],
1817 body: None,
1818 },
1819 response: Some(HttpResponse {
1820 status,
1821 status_text: "OK".to_string(),
1822 version: "HTTP/1.1".to_string(),
1823 headers: vec![],
1824 cookies: vec![],
1825 body: None,
1826 timing: ResponseTiming {
1827 time_to_first_byte: None,
1828 time_to_last_byte: None,
1829 connect_time_ms: None,
1830 ssl_time_ms: None,
1831 },
1832 }),
1833 error: None,
1834 }),
1835 tags: vec![],
1836 meta: std::collections::HashMap::new(),
1837 resilience_trace: None,
1838 rule_variables: std::collections::HashMap::new(),
1839 matched_rules: vec![],
1840 }
1841 }
1842
1843 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1844 let start_time =
1845 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1846 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1847 .expect("url should parse");
1848 Flow {
1849 id: Uuid::new_v4(),
1850 start_time,
1851 end_time: Some(start_time),
1852 network: NetworkInfo {
1853 client_ip: "127.0.0.1".to_string(),
1854 client_port: 12000,
1855 server_ip: "127.0.0.1".to_string(),
1856 server_port: 8080,
1857 protocol: TransportProtocol::TCP,
1858 tls: false,
1859 tls_version: None,
1860 sni: None,
1861 },
1862 layer: Layer::Http(HttpLayer {
1863 request: HttpRequest {
1864 method: "GET".to_string(),
1865 url: request_url,
1866 version: "HTTP/1.1".to_string(),
1867 headers: vec![
1868 (
1869 "Authorization".to_string(),
1870 "Bearer secret-token".to_string(),
1871 ),
1872 ("X-Normal".to_string(), "visible".to_string()),
1873 ],
1874 cookies: vec![],
1875 query: vec![
1876 ("token".to_string(), "abc123".to_string()),
1877 ("ok".to_string(), "1".to_string()),
1878 ],
1879 body: Some(BodyData {
1880 encoding: "utf-8".to_string(),
1881 content: "secret request body".to_string(),
1882 size: 19,
1883 }),
1884 },
1885 response: Some(HttpResponse {
1886 status: 200,
1887 status_text: "OK".to_string(),
1888 version: "HTTP/1.1".to_string(),
1889 headers: vec![
1890 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1891 ("X-Response".to_string(), "visible".to_string()),
1892 ],
1893 cookies: vec![],
1894 body: Some(BodyData {
1895 encoding: "utf-8".to_string(),
1896 content: "secret response body".to_string(),
1897 size: 20,
1898 }),
1899 timing: ResponseTiming {
1900 time_to_first_byte: None,
1901 time_to_last_byte: None,
1902 connect_time_ms: None,
1903 ssl_time_ms: None,
1904 },
1905 }),
1906 error: None,
1907 }),
1908 tags: vec![],
1909 meta: std::collections::HashMap::new(),
1910 resilience_trace: None,
1911 rule_variables: std::collections::HashMap::new(),
1912 matched_rules: vec![],
1913 }
1914 }
1915
1916 #[tokio::test]
1917 async fn set_rules_from_records_audit_event() {
1918 let state = CoreState::new(None).await;
1919
1920 state
1921 .set_rules_from(
1922 AuditActor::Http,
1923 "rule.upsert",
1924 "rule-1".to_string(),
1925 json!({ "route": "/api/v1/rules" }),
1926 Vec::new(),
1927 )
1928 .await
1929 .expect("set rules should succeed");
1930
1931 let events = state.recent_audit_events();
1932 let event = events.last().expect("audit event should exist");
1933 assert_eq!(event.actor, AuditActor::Http);
1934 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1935 assert_eq!(event.outcome, AuditOutcome::Success);
1936 assert_eq!(event.target, "rule-1");
1937 assert_eq!(event.details["operation"], "rule.upsert");
1938 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1939 }
1940
1941 #[tokio::test]
1942 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1943 let state = CoreState::new(None).await;
1944
1945 state
1946 .upsert_rule_from(
1947 AuditActor::Probe,
1948 "rule.upsert",
1949 "rule-1".to_string(),
1950 json!({ "tool": "set_rule" }),
1951 Rule {
1952 id: "rule-1".to_string(),
1953 name: "first".to_string(),
1954 active: true,
1955 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1956 priority: 1,
1957 termination: relay_core_lib::rule::RuleTermination::Continue,
1958 filter: relay_core_lib::rule::Filter::Url(
1959 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1960 ),
1961 actions: vec![],
1962 constraints: None,
1963 },
1964 )
1965 .await
1966 .expect("initial upsert should succeed");
1967
1968 state
1969 .upsert_rule_from(
1970 AuditActor::Probe,
1971 "rule.upsert",
1972 "rule-1".to_string(),
1973 json!({ "tool": "set_rule" }),
1974 Rule {
1975 id: "rule-1".to_string(),
1976 name: "second".to_string(),
1977 active: true,
1978 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1979 priority: 2,
1980 termination: relay_core_lib::rule::RuleTermination::Continue,
1981 filter: relay_core_lib::rule::Filter::Url(
1982 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1983 ),
1984 actions: vec![],
1985 constraints: None,
1986 },
1987 )
1988 .await
1989 .expect("replacement upsert should succeed");
1990
1991 let rules = state.get_rules().await;
1992 assert_eq!(rules.len(), 1);
1993 assert_eq!(rules[0].name, "second");
1994 let event = state
1995 .recent_audit_events()
1996 .last()
1997 .cloned()
1998 .expect("audit event");
1999 assert_eq!(event.details["operation"], "rule.upsert");
2000 assert_eq!(event.details["details"]["tool"], "set_rule");
2001 }
2002
2003 #[tokio::test]
2004 async fn delete_rule_from_returns_false_when_rule_missing() {
2005 let state = CoreState::new(None).await;
2006
2007 let deleted = state
2008 .delete_rule_from(
2009 AuditActor::Http,
2010 "rule.delete",
2011 "missing".to_string(),
2012 json!({ "route": "/api/v1/rules/{id}" }),
2013 "missing",
2014 )
2015 .await
2016 .expect("delete should not fail");
2017
2018 assert!(!deleted);
2019 assert!(state.recent_audit_events().is_empty());
2020 }
2021
2022 #[tokio::test]
2023 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2024 let state = CoreState::new(None).await;
2025
2026 let rule_id = state
2027 .create_mock_response_rule_from(
2028 AuditActor::Http,
2029 "api-mock-1".to_string(),
2030 json!({ "route": "/api/v1/mock", "status": 201 }),
2031 MockResponseRuleConfig {
2032 rule_id: "api-mock-1".to_string(),
2033 url_pattern: "example.com".to_string(),
2034 name: "api-mock:example.com".to_string(),
2035 status: 201,
2036 content_type: "application/json".to_string(),
2037 body: "{\"ok\":true}".to_string(),
2038 },
2039 )
2040 .await
2041 .expect("mock rule should be created");
2042
2043 assert_eq!(rule_id, "api-mock-1");
2044 let rules = state.get_rules().await;
2045 assert_eq!(rules.len(), 1);
2046 assert_eq!(rules[0].id, "api-mock-1");
2047 let event = state
2048 .recent_audit_events()
2049 .last()
2050 .cloned()
2051 .expect("audit event");
2052 assert_eq!(event.details["operation"], "rule.mock_create");
2053 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2054 }
2055
2056 #[tokio::test]
2057 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2058 let state = CoreState::new(None).await;
2059
2060 let rule_id = state
2061 .create_intercept_rule_from(
2062 AuditActor::Http,
2063 "intercept-1".to_string(),
2064 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2065 InterceptRuleConfig {
2066 rule_id: "intercept-1".to_string(),
2067 active: true,
2068 url_pattern: "example.com".to_string(),
2069 method: None,
2070 phase: "request".to_string(),
2071 name: "api-intercept:example.com".to_string(),
2072 priority: 100,
2073 termination: relay_core_lib::rule::RuleTermination::Stop,
2074 },
2075 )
2076 .await
2077 .expect("intercept rule should be created");
2078
2079 assert_eq!(rule_id, "intercept-1");
2080 let rules = state.get_rules().await;
2081 assert_eq!(rules.len(), 1);
2082 assert_eq!(rules[0].id, "intercept-1");
2083 assert_eq!(rules[0].name, "api-intercept:example.com");
2084 let event = state
2085 .recent_audit_events()
2086 .last()
2087 .cloned()
2088 .expect("audit event");
2089 assert_eq!(event.details["operation"], "rule.intercept_create");
2090 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2091 }
2092
2093 #[tokio::test]
2094 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2095 let state = CoreState::new(None).await;
2096
2097 state
2098 .upsert_legacy_intercept_rule_from(
2099 AuditActor::Tauri,
2100 "legacy-1".to_string(),
2101 json!({ "command": "set_intercept_rule" }),
2102 InterceptRule {
2103 id: "legacy-1".to_string(),
2104 active: true,
2105 url_pattern: "example.com".to_string(),
2106 method: None,
2107 phase: "both".to_string(),
2108 },
2109 )
2110 .await
2111 .expect("initial family upsert should succeed");
2112 assert_eq!(state.get_rules().await.len(), 2);
2113
2114 state
2115 .upsert_legacy_intercept_rule_from(
2116 AuditActor::Tauri,
2117 "legacy-1".to_string(),
2118 json!({ "command": "set_intercept_rule" }),
2119 InterceptRule {
2120 id: "legacy-1".to_string(),
2121 active: true,
2122 url_pattern: "example.org".to_string(),
2123 method: Some("POST".to_string()),
2124 phase: "request".to_string(),
2125 },
2126 )
2127 .await
2128 .expect("replacement family upsert should succeed");
2129
2130 let rules = state.get_rules().await;
2131 assert_eq!(rules.len(), 1);
2132 assert_eq!(rules[0].id, "legacy-1");
2133 let event = state
2134 .recent_audit_events()
2135 .last()
2136 .cloned()
2137 .expect("audit event");
2138 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2139 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2140 }
2141
2142 #[tokio::test]
2143 async fn resolve_intercept_failure_records_failed_audit_event() {
2144 let state = CoreState::new(None).await;
2145
2146 let result = state
2147 .resolve_intercept_with_modifications_from(
2148 AuditActor::Probe,
2149 "missing-flow:request".to_string(),
2150 "drop",
2151 None,
2152 )
2153 .await;
2154
2155 assert!(result.is_err());
2156 let events = state.recent_audit_events();
2157 let event = events.last().expect("audit event should exist");
2158 assert_eq!(event.actor, AuditActor::Probe);
2159 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2160 assert_eq!(event.outcome, AuditOutcome::Failed);
2161 assert_eq!(event.details["action"], "drop");
2162 assert!(
2163 event.details["error"]
2164 .as_str()
2165 .unwrap_or_default()
2166 .contains("Interception not found")
2167 );
2168 }
2169
2170 #[tokio::test]
2171 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2172 let state = CoreState::new(None).await;
2173 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2174
2175 state
2176 .prepare_start(8080, shutdown_tx)
2177 .expect("prepare start should succeed");
2178 let lifecycle = state.lifecycle();
2179 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2180 assert_eq!(lifecycle.port, Some(8080));
2181 assert!(lifecycle.started_at_ms.is_none());
2182 assert!(lifecycle.last_error.is_none());
2183
2184 assert_eq!(
2185 state.stop_proxy().expect("stop should succeed"),
2186 ProxyStopResult::Stopping
2187 );
2188 let lifecycle = state.lifecycle();
2189 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2190 assert_eq!(lifecycle.port, Some(8080));
2191 assert!(shutdown_rx.await.is_ok());
2192 }
2193
2194 #[tokio::test]
2195 async fn status_snapshot_derives_runtime_facing_fields() {
2196 let state = CoreState::new(None).await;
2197 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2198 state
2199 .prepare_start(8080, shutdown_tx)
2200 .expect("prepare start should succeed");
2201
2202 let status = state.status_snapshot();
2203 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2204 assert!(status.running);
2205 assert_eq!(status.port, Some(8080));
2206 assert!(status.uptime.is_none());
2207 assert!(status.last_error.is_none());
2208 }
2209
2210 #[tokio::test]
2211 async fn status_report_combines_status_and_metrics() {
2212 let state = CoreState::new(None).await;
2213 let report = state.status_report().await;
2214
2215 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2216 assert!(!report.status.running);
2217 assert_eq!(report.metrics.intercepts_pending, 0);
2218 assert_eq!(report.metrics.ws_pending_messages, 0);
2219 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2220 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2221 assert_eq!(report.metrics.audit_events_total, 0);
2222 assert_eq!(report.metrics.audit_events_failed, 0);
2223 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2224 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2225 }
2226
2227 #[test]
2228 fn proxy_config_new_and_transport_setters_preserve_values() {
2229 let config = ProxyConfig::new(
2230 8080,
2231 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2232 std::path::PathBuf::from("/tmp/ca_key.pem"),
2233 )
2234 .with_transparent(true)
2235 .with_udp_tproxy_port(Some(15000));
2236
2237 assert_eq!(config.port, 8080);
2238 assert_eq!(
2239 config.ca_cert_path,
2240 std::path::PathBuf::from("/tmp/ca_cert.pem")
2241 );
2242 assert_eq!(
2243 config.ca_key_path,
2244 std::path::PathBuf::from("/tmp/ca_key.pem")
2245 );
2246 assert!(config.transparent);
2247 assert_eq!(config.udp_tproxy_port, Some(15000));
2248 }
2249
2250 #[test]
2251 fn proxy_config_from_app_data_dir_creates_default_paths() {
2252 let unique = SystemTime::now()
2253 .duration_since(UNIX_EPOCH)
2254 .expect("clock drift")
2255 .as_nanos();
2256 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2257
2258 let config =
2259 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2260
2261 assert!(dir.exists());
2262 assert_eq!(config.port, 8899);
2263 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2264 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2265 assert!(!config.transparent);
2266 assert!(config.udp_tproxy_port.is_none());
2267 }
2268
2269 #[tokio::test]
2270 async fn intercept_snapshot_maps_pending_counts() {
2271 let state = CoreState::new(None).await;
2272 let snapshot = state.intercept_snapshot().await;
2273
2274 assert_eq!(snapshot.pending_count, 0);
2275 assert_eq!(snapshot.ws_pending_count, 0);
2276 }
2277
2278 #[tokio::test]
2279 async fn audit_snapshot_returns_latest_events_in_order() {
2280 let state = CoreState::new(None).await;
2281 state.record_audit_event(AuditEvent::new(
2282 AuditActor::Runtime,
2283 AuditEventKind::RuleChanged,
2284 "first",
2285 AuditOutcome::Success,
2286 json!({ "index": 1 }),
2287 ));
2288 state.record_audit_event(AuditEvent::new(
2289 AuditActor::Http,
2290 AuditEventKind::PolicyUpdated,
2291 "second",
2292 AuditOutcome::Success,
2293 json!({ "index": 2 }),
2294 ));
2295
2296 let snapshot = state.audit_snapshot(1);
2297
2298 assert_eq!(snapshot.events.len(), 1);
2299 assert_eq!(snapshot.events[0].target, "second");
2300 assert_eq!(snapshot.events[0].details["index"], 2);
2301 }
2302
2303 #[tokio::test]
2304 async fn query_audit_snapshot_filters_in_memory_events() {
2305 let state = CoreState::new(None).await;
2306 state.record_audit_event(AuditEvent::new(
2307 AuditActor::Http,
2308 AuditEventKind::RuleChanged,
2309 "rule-1",
2310 AuditOutcome::Success,
2311 json!({ "idx": 1 }),
2312 ));
2313 state.record_audit_event(AuditEvent::new(
2314 AuditActor::Probe,
2315 AuditEventKind::PolicyUpdated,
2316 "policy",
2317 AuditOutcome::Failed,
2318 json!({ "idx": 2 }),
2319 ));
2320
2321 let snapshot = state
2322 .query_audit_snapshot(CoreAuditQuery {
2323 actor: Some(AuditActor::Probe),
2324 kind: Some(AuditEventKind::PolicyUpdated),
2325 outcome: Some(AuditOutcome::Failed),
2326 limit: 10,
2327 ..Default::default()
2328 })
2329 .await;
2330
2331 assert_eq!(snapshot.events.len(), 1);
2332 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2333 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2334 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2335 }
2336
2337 #[tokio::test]
2338 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2339 let state = CoreState::new(Some(sqlite_url())).await;
2340 state.update_policy_from(
2341 AuditActor::Http,
2342 "policy".to_string(),
2343 ProxyPolicy {
2344 transparent_enabled: true,
2345 ..Default::default()
2346 },
2347 );
2348
2349 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2350 for _ in 0..10 {
2351 snapshot = state
2352 .query_audit_snapshot(CoreAuditQuery {
2353 actor: Some(AuditActor::Http),
2354 kind: Some(AuditEventKind::PolicyUpdated),
2355 limit: 10,
2356 ..Default::default()
2357 })
2358 .await;
2359 if !snapshot.events.is_empty() {
2360 break;
2361 }
2362 sleep(Duration::from_millis(20)).await;
2363 }
2364
2365 assert!(!snapshot.events.is_empty());
2366 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2367 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2368 }
2369
2370 #[tokio::test]
2371 async fn prepare_start_rejects_second_active_start() {
2372 let state = CoreState::new(None).await;
2373 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2374 state
2375 .prepare_start(8080, shutdown_tx)
2376 .expect("first start should succeed");
2377
2378 let (second_tx, _second_rx) = oneshot::channel();
2379 let error = state
2380 .prepare_start(8081, second_tx)
2381 .expect_err("second active start should be rejected");
2382 assert!(error.contains("already"));
2383 }
2384
2385 #[test]
2386 fn update_policy_records_audit_event() {
2387 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2388 let state = runtime.block_on(CoreState::new(None));
2389
2390 state.update_policy_from(
2391 AuditActor::Runtime,
2392 "policy".to_string(),
2393 ProxyPolicy {
2394 transparent_enabled: true,
2395 ..Default::default()
2396 },
2397 );
2398
2399 let events = state.recent_audit_events();
2400 let event = events.last().expect("audit event should exist");
2401 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2402 assert_eq!(event.outcome, AuditOutcome::Success);
2403 assert_eq!(event.details["transparent_enabled"], true);
2404 }
2405
2406 #[test]
2407 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2408 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2409 let state = runtime.block_on(CoreState::new(None));
2410 let original_timeout = state.policy_snapshot().request_timeout_ms;
2411
2412 state.patch_policy_from(
2413 AuditActor::Runtime,
2414 "policy.patch".to_string(),
2415 relay_core_api::policy::ProxyPolicyPatch {
2416 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2417 enabled: Some(true),
2418 redact_bodies: Some(true),
2419 ..Default::default()
2420 }),
2421 upstream: None,
2422 },
2423 );
2424
2425 let policy = state.policy_snapshot();
2426 assert_eq!(policy.request_timeout_ms, original_timeout);
2427 assert!(policy.redaction.enabled);
2428 assert!(policy.redaction.redact_bodies);
2429
2430 let events = state.recent_audit_events();
2431 let event = events.last().expect("audit event should exist");
2432 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2433 assert_eq!(event.details["redaction_enabled"], true);
2434 }
2435
2436 #[tokio::test]
2437 async fn metrics_include_audit_and_lagged_event_counters() {
2438 let state = CoreState::new(None).await;
2439
2440 state.update_policy_from(
2441 AuditActor::Runtime,
2442 "policy".to_string(),
2443 ProxyPolicy::default(),
2444 );
2445 let _ = state
2446 .resolve_intercept_with_modifications_from(
2447 AuditActor::Probe,
2448 "missing-flow:request".to_string(),
2449 "drop",
2450 None,
2451 )
2452 .await;
2453
2454 state.record_flow_events_lagged(3);
2455 state.record_audit_events_lagged(5);
2456
2457 let metrics = state.get_metrics().await;
2458 assert_eq!(metrics.audit_events_total, 2);
2459 assert_eq!(metrics.audit_events_failed, 1);
2460 assert_eq!(metrics.flow_events_lagged_total, 3);
2461 assert_eq!(metrics.audit_events_lagged_total, 5);
2462 }
2463
2464 #[tokio::test]
2465 async fn prometheus_metrics_text_contains_observability_fields() {
2466 let state = CoreState::new(None).await;
2467 state.record_flow_events_lagged(2);
2468 state.record_audit_events_lagged(4);
2469
2470 let text = state.get_metrics_prometheus_text().await;
2471 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2472 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2473 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2474 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2475 }
2476
2477 #[tokio::test]
2478 async fn search_flows_uses_store_with_offset_pagination() {
2479 let state = CoreState::new(Some(sqlite_url())).await;
2480 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2481 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2482 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2483
2484 state.upsert_flow(Box::new(flow_a));
2485 state.upsert_flow(Box::new(flow_b));
2486 state.upsert_flow(Box::new(flow_c));
2487
2488 let mut baseline = Vec::new();
2489 for _ in 0..20 {
2490 baseline = state
2491 .search_flows(FlowQuery {
2492 host: Some("api.example.com".to_string()),
2493 path_contains: None,
2494 method: None,
2495 status_min: None,
2496 status_max: None,
2497 has_error: None,
2498 is_websocket: None,
2499 limit: Some(3),
2500 offset: Some(0),
2501 })
2502 .await;
2503 if baseline.len() == 3 {
2504 break;
2505 }
2506 sleep(Duration::from_millis(20)).await;
2507 }
2508
2509 assert_eq!(baseline.len(), 3);
2510 let page = state
2511 .search_flows(FlowQuery {
2512 host: Some("api.example.com".to_string()),
2513 path_contains: None,
2514 method: None,
2515 status_min: None,
2516 status_max: None,
2517 has_error: None,
2518 is_websocket: None,
2519 limit: Some(1),
2520 offset: Some(1),
2521 })
2522 .await;
2523 assert_eq!(page.len(), 1);
2524 assert_eq!(page[0].id, baseline[1].id);
2525 }
2526
2527 #[tokio::test]
2528 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2529 let state = CoreState::new(Some(sqlite_url())).await;
2530 let first_flow = sample_http_flow(
2531 "persist.example.com",
2532 "/first",
2533 "GET",
2534 200,
2535 1_700_000_010_000,
2536 );
2537 let first_id = first_flow.id.to_string();
2538 state.upsert_flow(Box::new(first_flow));
2539 for i in 0..240 {
2540 state.upsert_flow(Box::new(sample_http_flow(
2541 "persist.example.com",
2542 &format!("/{}", i),
2543 "GET",
2544 200,
2545 1_700_000_020_000 + i,
2546 )));
2547 }
2548
2549 sleep(Duration::from_millis(200)).await;
2550
2551 let loaded = state.get_flow(first_id).await;
2552 assert!(loaded.is_some());
2553 }
2554
2555 #[tokio::test]
2556 async fn search_flows_redacts_summary_url_when_enabled() {
2557 let state = CoreState::new(None).await;
2558 state.update_policy_from(
2559 AuditActor::Runtime,
2560 "policy.redaction".to_string(),
2561 ProxyPolicy {
2562 redaction: RedactionPolicy {
2563 enabled: true,
2564 sensitive_query_keys: vec!["token".to_string()],
2565 redact_bodies: false,
2566 ..Default::default()
2567 },
2568 ..Default::default()
2569 },
2570 );
2571 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2572
2573 let mut items = Vec::new();
2574 for _ in 0..20 {
2575 items = state
2576 .search_flows(FlowQuery {
2577 host: Some("api.example.com".to_string()),
2578 path_contains: Some("/private".to_string()),
2579 ..Default::default()
2580 })
2581 .await;
2582 if !items.is_empty() {
2583 break;
2584 }
2585 sleep(Duration::from_millis(20)).await;
2586 }
2587
2588 assert!(!items.is_empty());
2589 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2590 let token = redacted
2591 .query_pairs()
2592 .find(|(k, _)| k == "token")
2593 .map(|(_, v)| v.to_string());
2594 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2595 }
2596
2597 #[tokio::test]
2598 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2599 let state = CoreState::new(Some(sqlite_url())).await;
2600 state.update_policy_from(
2601 AuditActor::Runtime,
2602 "policy.redaction".to_string(),
2603 ProxyPolicy {
2604 redaction: RedactionPolicy {
2605 enabled: true,
2606 sensitive_header_names: vec![
2607 "authorization".to_string(),
2608 "set-cookie".to_string(),
2609 ],
2610 sensitive_query_keys: vec!["token".to_string()],
2611 redact_bodies: true,
2612 },
2613 ..Default::default()
2614 },
2615 );
2616
2617 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2618 let flow_id = flow.id.to_string();
2619 state.upsert_flow(Box::new(flow));
2620 sleep(Duration::from_millis(80)).await;
2621
2622 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2623 let Layer::Http(http) = loaded.layer else {
2624 panic!("expected http layer");
2625 };
2626
2627 let auth = http
2628 .request
2629 .headers
2630 .iter()
2631 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2632 .map(|(_, v)| v.as_str());
2633 assert_eq!(auth, Some("[REDACTED]"));
2634
2635 let req_query_token = http
2636 .request
2637 .query
2638 .iter()
2639 .find(|(k, _)| k == "token")
2640 .map(|(_, v)| v.as_str());
2641 assert_eq!(req_query_token, Some("[REDACTED]"));
2642
2643 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2644 assert_eq!(req_body, Some("[REDACTED]"));
2645
2646 let response = http.response.expect("response should exist");
2647 let set_cookie = response
2648 .headers
2649 .iter()
2650 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2651 .map(|(_, v)| v.as_str());
2652 assert_eq!(set_cookie, Some("[REDACTED]"));
2653 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2654 assert_eq!(res_body, Some("[REDACTED]"));
2655 }
2656
2657 #[test]
2658 fn redact_flow_update_masks_http_body_when_enabled() {
2659 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2660 let state = runtime.block_on(CoreState::new(None));
2661 state.update_policy_from(
2662 AuditActor::Runtime,
2663 "policy.redaction".to_string(),
2664 ProxyPolicy {
2665 redaction: RedactionPolicy {
2666 enabled: true,
2667 redact_bodies: true,
2668 ..Default::default()
2669 },
2670 ..Default::default()
2671 },
2672 );
2673
2674 let update = FlowUpdate::HttpBody {
2675 flow_id: "f-1".to_string(),
2676 direction: relay_core_api::flow::Direction::ClientToServer,
2677 body: BodyData {
2678 encoding: "utf-8".to_string(),
2679 content: "super-secret".to_string(),
2680 size: 12,
2681 },
2682 };
2683 let redacted = state.redact_flow_update_for_output(update);
2684 match redacted {
2685 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2686 _ => panic!("expected http body update"),
2687 }
2688 }
2689
2690 #[cfg(feature = "script")]
2691 #[tokio::test]
2692 async fn load_script_from_records_audit_event() {
2693 let state = CoreState::new(None).await;
2694
2695 state
2696 .load_script_from(
2697 AuditActor::Tauri,
2698 "tauri.load_script".to_string(),
2699 "globalThis.onRequestHeaders = (_flow) => {};",
2700 )
2701 .await
2702 .expect("script should load");
2703
2704 let events = state.recent_audit_events();
2705 let event = events.last().expect("audit event should exist");
2706 assert_eq!(event.actor, AuditActor::Tauri);
2707 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2708 assert_eq!(event.outcome, AuditOutcome::Success);
2709 assert_eq!(event.target, "tauri.load_script");
2710 }
2711}