1use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50 InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51 build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77 pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82 pub flows_total: usize,
83 pub flows_in_memory: usize,
84 pub flows_dropped: usize,
85 pub intercepts_pending: usize,
86 pub ws_pending_messages: usize,
87 pub oldest_intercept_age_ms: Option<u64>,
88 pub oldest_ws_message_age_ms: Option<u64>,
89 pub rule_exec_errors: usize,
90 pub audit_events_total: usize,
91 pub audit_events_failed: usize,
92 pub flow_events_lagged_total: usize,
93 pub audit_events_lagged_total: usize,
94 pub proxy_body_degraded_total: usize,
96 pub proxy_http_request_total: usize,
98 pub proxy_sandbox_reject_total: usize,
100 pub proxy_invalid_method_total: usize,
102 pub proxy_invalid_status_total: usize,
104 pub proxy_retry_total: usize,
106 pub proxy_stream_mode_tap_total: usize,
108 pub proxy_stream_mode_degrade_total: usize,
110}
111
112impl CoreMetrics {
113 pub fn to_prometheus_text(&self) -> String {
114 let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
115 let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
116 format!(
117 "relay_core_flows_total {}\n\
118relay_core_flows_in_memory {}\n\
119relay_core_flows_dropped_total {}\n\
120relay_core_intercepts_pending {}\n\
121relay_core_ws_pending_messages {}\n\
122relay_core_oldest_intercept_age_ms {}\n\
123relay_core_oldest_ws_message_age_ms {}\n\
124relay_core_rule_exec_errors_total {}\n\
125relay_core_audit_events_total {}\n\
126relay_core_audit_events_failed_total {}\n\
127relay_core_flow_events_lagged_total {}\n\
128relay_core_audit_events_lagged_total {}\n\
129relay_core_proxy_body_degraded_total {}\n\
130relay_core_proxy_http_request_total {}\n\
131relay_core_proxy_sandbox_reject_total {}\n\
132relay_core_proxy_invalid_method_total {}\n\
133relay_core_proxy_invalid_status_total {}\n\
134relay_core_proxy_retry_total {}\n\
135relay_core_proxy_stream_mode_tap_total {}\n\
136relay_core_proxy_stream_mode_degrade_total {}\n",
137 self.flows_total,
138 self.flows_in_memory,
139 self.flows_dropped,
140 self.intercepts_pending,
141 self.ws_pending_messages,
142 oldest_intercept_age_ms,
143 oldest_ws_message_age_ms,
144 self.rule_exec_errors,
145 self.audit_events_total,
146 self.audit_events_failed,
147 self.flow_events_lagged_total,
148 self.audit_events_lagged_total,
149 self.proxy_body_degraded_total,
150 self.proxy_http_request_total,
151 self.proxy_sandbox_reject_total,
152 self.proxy_invalid_method_total,
153 self.proxy_invalid_status_total,
154 self.proxy_retry_total,
155 self.proxy_stream_mode_tap_total,
156 self.proxy_stream_mode_degrade_total,
157 )
158 }
159}
160
161#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
162pub struct CoreStatusSnapshot {
163 pub phase: RuntimeLifecyclePhase,
164 pub running: bool,
165 pub port: Option<u16>,
166 pub uptime: Option<u64>,
167 pub last_error: Option<String>,
168}
169
170#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
171pub struct CoreStatusReport {
172 pub status: CoreStatusSnapshot,
173 pub metrics: CoreMetrics,
174}
175
176#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
177pub struct CoreInterceptSnapshot {
178 pub pending_count: usize,
179 pub ws_pending_count: usize,
180}
181
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
183pub struct CoreAuditSnapshot {
184 pub events: Vec<AuditEvent>,
185}
186
187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
188pub struct CoreAuditQuery {
189 pub since_ms: Option<u64>,
190 pub until_ms: Option<u64>,
191 pub actor: Option<AuditActor>,
192 pub kind: Option<AuditEventKind>,
193 pub outcome: Option<AuditOutcome>,
194 pub limit: usize,
195}
196
197impl Default for CoreAuditQuery {
198 fn default() -> Self {
199 Self {
200 since_ms: None,
201 until_ms: None,
202 actor: None,
203 kind: None,
204 outcome: None,
205 limit: 50,
206 }
207 }
208}
209
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
211#[serde(rename_all = "snake_case")]
212pub enum RuntimeLifecyclePhase {
213 Created,
214 Starting,
215 Running,
216 Stopping,
217 Stopped,
218 Failed,
219}
220
221impl RuntimeLifecyclePhase {
222 pub fn as_str(&self) -> &'static str {
223 match self {
224 Self::Created => "created",
225 Self::Starting => "starting",
226 Self::Running => "running",
227 Self::Stopping => "stopping",
228 Self::Stopped => "stopped",
229 Self::Failed => "failed",
230 }
231 }
232
233 pub fn is_active(&self) -> bool {
234 matches!(self, Self::Starting | Self::Running | Self::Stopping)
235 }
236}
237
238#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
239pub struct RuntimeLifecycle {
240 pub phase: RuntimeLifecyclePhase,
241 pub port: Option<u16>,
242 pub started_at_ms: Option<u64>,
243 pub last_error: Option<String>,
244}
245
246impl RuntimeLifecycle {
247 pub fn created() -> Self {
248 Self {
249 phase: RuntimeLifecyclePhase::Created,
250 port: None,
251 started_at_ms: None,
252 last_error: None,
253 }
254 }
255
256 pub fn is_active(&self) -> bool {
257 self.phase.is_active()
258 }
259
260 pub fn uptime_seconds(&self) -> Option<u64> {
261 let started_at_ms = self.started_at_ms?;
262 let now_ms = now_unix_ms();
263 Some(now_ms.saturating_sub(started_at_ms) / 1_000)
264 }
265}
266
267pub enum ProxySpawnResult {
268 Started(tokio::task::JoinHandle<()>),
269 AlreadyRunning,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub enum ProxyStopResult {
274 Stopping,
275 NotRunning,
276}
277
278impl From<RuntimeLifecycle> for CoreStatusSnapshot {
279 fn from(lifecycle: RuntimeLifecycle) -> Self {
280 Self {
281 running: lifecycle.is_active(),
282 uptime: lifecycle.uptime_seconds(),
283 port: lifecycle.port,
284 last_error: lifecycle.last_error,
285 phase: lifecycle.phase,
286 }
287 }
288}
289
290pub struct CoreState {
291 flow_store: mpsc::Sender<FlowStoreMessage>,
292 intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
293 rule_store: mpsc::Sender<RuleStoreMessage>,
294 store: Option<Store>,
295 #[cfg(feature = "script")]
296 pub script_interceptor: Arc<ScriptInterceptor>,
297 pub policy_tx: watch::Sender<ProxyPolicy>,
298 pub flows_dropped: Arc<AtomicUsize>,
299 audit_events_total: Arc<AtomicUsize>,
300 audit_events_failed: Arc<AtomicUsize>,
301 flow_events_lagged_total: Arc<AtomicUsize>,
302 audit_events_lagged_total: Arc<AtomicUsize>,
303 flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
305 audit_broadcast_tx: broadcast::Sender<AuditEvent>,
306 audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
307 lifecycle_tx: watch::Sender<RuntimeLifecycle>,
308 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
309}
310
311impl CoreState {
312 pub async fn new(db_url: Option<String>) -> Self {
313 const AUDIT_HISTORY_LIMIT: usize = 200;
314
315 let store = if let Some(url) = db_url {
316 match Store::connect(&url).await {
317 Ok(s) => {
318 if let Err(e) = s.init().await {
319 tracing::error!("Failed to init store: {}", e);
320 }
321 Some(s)
322 }
323 Err(e) => {
324 tracing::error!("Failed to connect to store: {}", e);
325 None
326 }
327 }
328 } else {
329 None
330 };
331
332 let (flow_tx, flow_rx) = mpsc::channel(10000);
333 let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
334 tokio::spawn(flow_actor.run());
335
336 let (intercept_tx, intercept_rx) = mpsc::channel(1000);
337 let intercept_actor = InterceptBrokerActor::new(intercept_rx);
338 tokio::spawn(intercept_actor.run());
339
340 let (rule_tx, rule_rx) = mpsc::channel(100);
341 let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
342 tokio::spawn(rule_actor.run());
343
344 let (policy_tx, _) = watch::channel(ProxyPolicy::default());
345 let (flow_broadcast_tx, _) = broadcast::channel(1000);
346 let (audit_broadcast_tx, _) = broadcast::channel(256);
347 let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
348
349 #[cfg(feature = "script")]
350 let script_interceptor = ScriptInterceptor::new()
351 .await
352 .expect("Failed to initialize ScriptInterceptor");
353 Self {
354 flow_store: flow_tx,
355 intercept_broker: intercept_tx,
356 rule_store: rule_tx,
357 store,
358 #[cfg(feature = "script")]
359 script_interceptor: Arc::new(script_interceptor),
360 policy_tx,
361 flows_dropped: Arc::new(AtomicUsize::new(0)),
362 audit_events_total: Arc::new(AtomicUsize::new(0)),
363 audit_events_failed: Arc::new(AtomicUsize::new(0)),
364 flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
365 audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
366 flow_broadcast_tx,
367 audit_broadcast_tx,
368 audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
369 lifecycle_tx,
370 shutdown_tx: Mutex::new(None),
371 }
372 }
373
374 pub async fn get_metrics(&self) -> CoreMetrics {
375 let (flow_tx, flow_rx) = oneshot::channel();
376 let _ = self
377 .flow_store
378 .send(FlowStoreMessage::GetMetrics(flow_tx))
379 .await;
380 let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
381
382 let (int_tx, int_rx) = oneshot::channel();
383 let _ = self
384 .intercept_broker
385 .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
386 .await;
387 let (
388 intercepts_pending,
389 ws_pending_messages,
390 oldest_intercept_age_ms,
391 oldest_ws_message_age_ms,
392 ) = int_rx.await.unwrap_or((0, 0, None, None));
393
394 let (rule_tx, rule_rx) = oneshot::channel();
395 let _ = self
396 .rule_store
397 .send(RuleStoreMessage::GetMetrics(rule_tx))
398 .await;
399 let rule_exec_errors = rule_rx.await.unwrap_or(0);
400
401 CoreMetrics {
402 flows_total,
403 flows_in_memory,
404 flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
405 intercepts_pending,
406 ws_pending_messages,
407 oldest_intercept_age_ms,
408 oldest_ws_message_age_ms,
409 rule_exec_errors,
410 audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
411 audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
412 flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
413 audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
414 proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
415 proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
416 proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
417 proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
418 proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
419 proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
420 proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
421 proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
422 ),
423 }
424 }
425
426 pub async fn get_metrics_prometheus_text(&self) -> String {
427 let mut text = self.get_metrics().await.to_prometheus_text();
428 #[cfg(feature = "script")]
429 {
430 text.push_str(&self.script_interceptor.metrics.prometheus_lines());
431 }
432 text
433 }
434
435 pub fn status_snapshot(&self) -> CoreStatusSnapshot {
436 self.lifecycle().into()
437 }
438
439 pub async fn status_report(&self) -> CoreStatusReport {
440 CoreStatusReport {
441 status: self.status_snapshot(),
442 metrics: self.get_metrics().await,
443 }
444 }
445
446 pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
447 let metrics = self.get_metrics().await;
448 CoreInterceptSnapshot {
449 pending_count: metrics.intercepts_pending,
450 ws_pending_count: metrics.ws_pending_messages,
451 }
452 }
453
454 pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
455 let events = self.recent_audit_events();
456 let start = events.len().saturating_sub(limit);
457 CoreAuditSnapshot {
458 events: events.into_iter().skip(start).collect(),
459 }
460 }
461
462 pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
463 let limit = query.limit.clamp(1, 500);
464 if let Some(store) = &self.store {
465 let rows = store
466 .query_audit_events(
467 query.since_ms,
468 query.until_ms,
469 query.actor.as_ref().map(AuditActor::as_str),
470 query.kind.as_ref().map(AuditEventKind::as_str),
471 query.outcome.as_ref().map(AuditOutcome::as_str),
472 limit,
473 )
474 .await
475 .unwrap_or_default();
476
477 let mut events = Vec::with_capacity(rows.len());
478 for row in rows {
479 if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
480 events.push(event);
481 }
482 }
483 return CoreAuditSnapshot { events };
484 }
485
486 let mut events = self.recent_audit_events();
487 events.reverse();
488 let filtered = events
489 .into_iter()
490 .filter(|event| {
491 query
492 .since_ms
493 .map(|v| event.timestamp_ms >= v)
494 .unwrap_or(true)
495 })
496 .filter(|event| {
497 query
498 .until_ms
499 .map(|v| event.timestamp_ms <= v)
500 .unwrap_or(true)
501 })
502 .filter(|event| {
503 query
504 .actor
505 .as_ref()
506 .map(|v| &event.actor == v)
507 .unwrap_or(true)
508 })
509 .filter(|event| {
510 query
511 .kind
512 .as_ref()
513 .map(|v| &event.kind == v)
514 .unwrap_or(true)
515 })
516 .filter(|event| {
517 query
518 .outcome
519 .as_ref()
520 .map(|v| &event.outcome == v)
521 .unwrap_or(true)
522 })
523 .take(limit)
524 .collect();
525 CoreAuditSnapshot { events: filtered }
526 }
527
528 pub async fn get_flow(&self, id: String) -> Option<Flow> {
529 let (tx, rx) = oneshot::channel();
530 if let Err(e) = self
531 .flow_store
532 .send(FlowStoreMessage::GetFlow {
533 id: id.clone(),
534 respond_to: tx,
535 })
536 .await
537 {
538 error!("Failed to send GetFlow request: {}", e);
539 if let Some(store) = &self.store {
540 return store
541 .load_flow(&id)
542 .await
543 .ok()
544 .flatten()
545 .and_then(|value| serde_json::from_value::<Flow>(value).ok());
546 }
547 return None;
548 }
549 let flow = rx
550 .await
551 .map_err(|e| {
552 error!("Failed to receive Flow response: {}", e);
553 e
554 })
555 .unwrap_or(None);
556 if let Some(flow) = flow {
557 return Some(redact_flow(flow, &self.current_redaction_policy()));
558 }
559 if let Some(store) = &self.store {
560 return store
561 .load_flow(&id)
562 .await
563 .ok()
564 .flatten()
565 .and_then(|value| serde_json::from_value::<Flow>(value).ok())
566 .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
567 }
568 None
569 }
570
571 pub async fn get_rules(&self) -> Vec<Rule> {
572 let (tx, rx) = oneshot::channel();
573 if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
574 error!("Failed to send GetRules request: {}", e);
575 return Vec::new();
576 }
577 rx.await
578 .map_err(|e| {
579 error!("Failed to receive Rules response: {}", e);
580 e
581 })
582 .unwrap_or_default()
583 }
584
585 pub async fn set_rules(&self, rules: Vec<Rule>) {
586 let _ = self
587 .set_rules_from(
588 AuditActor::Runtime,
589 "rules.replace",
590 "rule_set".to_string(),
591 json!({}),
592 rules,
593 )
594 .await;
595 }
596
597 pub async fn upsert_rule_from(
598 &self,
599 actor: AuditActor,
600 operation: &str,
601 target: String,
602 details: serde_json::Value,
603 rule: Rule,
604 ) -> Result<(), String> {
605 let rule_id = rule.id.clone();
606 let mut rules = self.get_rules().await;
607 rules.retain(|existing| existing.id != rule_id);
608 rules.push(rule);
609 self.set_rules_from(actor, operation, target, details, rules)
610 .await
611 }
612
613 pub async fn delete_rule_from(
614 &self,
615 actor: AuditActor,
616 operation: &str,
617 target: String,
618 details: serde_json::Value,
619 rule_id: &str,
620 ) -> Result<bool, String> {
621 let mut rules = self.get_rules().await;
622 let before = rules.len();
623 rules.retain(|rule| rule.id != rule_id);
624 if rules.len() == before {
625 return Ok(false);
626 }
627 self.set_rules_from(actor, operation, target, details, rules)
628 .await
629 .map(|_| true)
630 }
631
632 pub async fn create_mock_response_rule_from(
633 &self,
634 actor: AuditActor,
635 target: String,
636 details: serde_json::Value,
637 config: MockResponseRuleConfig,
638 ) -> Result<String, String> {
639 let rule_id = config.rule_id.clone();
640 let rule = build_mock_response_rule(config);
641 self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
642 .await
643 .map(|_| rule_id)
644 }
645
646 pub async fn create_intercept_rule_from(
647 &self,
648 actor: AuditActor,
649 target: String,
650 details: serde_json::Value,
651 config: InterceptRuleConfig,
652 ) -> Result<String, String> {
653 let rule_id = config.rule_id.clone();
654 let mut rules = self.get_rules().await;
655 rules.extend(build_intercept_rules(config));
656 self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
657 .await
658 .map(|_| rule_id)
659 }
660
661 pub async fn upsert_legacy_intercept_rule_from(
662 &self,
663 actor: AuditActor,
664 target: String,
665 details: serde_json::Value,
666 rule: InterceptRule,
667 ) -> Result<(), String> {
668 let rule_id = rule.id.clone();
669 let mut rules = self.get_rules().await;
670 rules.retain(|existing| {
671 existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
672 });
673 rules.extend(rule.to_rules());
674 self.set_rules_from(
675 actor,
676 "rule.intercept_legacy_upsert",
677 target,
678 details,
679 rules,
680 )
681 .await
682 }
683
684 pub async fn set_rules_from(
685 &self,
686 actor: AuditActor,
687 operation: &str,
688 target: String,
689 details: serde_json::Value,
690 rules: Vec<Rule>,
691 ) -> Result<(), String> {
692 let rule_count = rules.len();
693 if let Err(e) = self
694 .rule_store
695 .send(RuleStoreMessage::SetRules(rules))
696 .await
697 {
698 error!("Failed to send SetRules request: {}", e);
699 self.record_audit_event(AuditEvent::new(
700 actor,
701 AuditEventKind::RuleChanged,
702 target,
703 AuditOutcome::Failed,
704 json!({
705 "operation": operation,
706 "rule_count": rule_count,
707 "details": details,
708 "error": e.to_string()
709 }),
710 ));
711 return Err(e.to_string());
712 }
713
714 self.record_audit_event(AuditEvent::new(
715 actor,
716 AuditEventKind::RuleChanged,
717 target,
718 AuditOutcome::Success,
719 json!({
720 "operation": operation,
721 "rule_count": rule_count,
722 "details": details
723 }),
724 ));
725 Ok(())
726 }
727
728 pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
729 let mut new_rules = Vec::new();
730 for rule in rules {
731 new_rules.extend(rule.to_rules());
732 }
733 self.set_rules(new_rules).await;
734 }
735
736 pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
737 let (tx, rx) = oneshot::channel();
738 if let Err(e) = self
739 .rule_store
740 .send(RuleStoreMessage::GetRuleEngine(tx))
741 .await
742 {
743 error!("Failed to send GetRuleEngine request: {}", e);
744 return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
745 }
746 rx.await
747 .map_err(|e| {
748 error!("Failed to receive RuleEngine response: {}", e);
749 e
750 })
751 .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
752 }
753
754 pub fn update_policy(&self, policy: ProxyPolicy) {
755 self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
756 }
757
758 pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
759 let mut policy = self.policy_snapshot();
760 policy.apply_patch(patch);
761 self.update_policy_from(actor, target, policy);
762 }
763
764 pub fn policy_snapshot(&self) -> ProxyPolicy {
765 self.policy_tx.borrow().clone()
766 }
767
768 pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
769 let details = json!({
770 "strict_http_semantics": policy.strict_http_semantics,
771 "request_timeout_ms": policy.request_timeout_ms,
772 "max_body_size": policy.max_body_size,
773 "transparent_enabled": policy.transparent_enabled,
774 "redaction_enabled": policy.redaction.enabled,
775 "redact_bodies": policy.redaction.redact_bodies
776 });
777
778 self.policy_tx.send_replace(policy);
779 self.record_audit_event(AuditEvent::new(
780 actor,
781 AuditEventKind::PolicyUpdated,
782 target,
783 AuditOutcome::Success,
784 details,
785 ));
786 }
787
788 pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
789 if let Err(e) = self
790 .intercept_broker
791 .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
792 .await
793 {
794 error!("Failed to send RegisterIntercept request: {}", e);
795 }
796 }
797
798 pub async fn resolve_intercept(
799 &self,
800 key: String,
801 result: InterceptionResult,
802 ) -> Result<(), String> {
803 let (tx, rx) = oneshot::channel();
804 if let Err(e) = self
805 .intercept_broker
806 .send(InterceptBrokerMessage::ResolveIntercept {
807 key,
808 result,
809 respond_to: tx,
810 })
811 .await
812 {
813 error!("Failed to send ResolveIntercept request: {}", e);
814 return Err(e.to_string());
815 }
816 rx.await.map_err(|_| "Actor dropped".to_string())?
817 }
818
819 pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
820 let (tx, rx) = oneshot::channel();
821 if let Err(e) = self
822 .intercept_broker
823 .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
824 key,
825 respond_to: tx,
826 })
827 .await
828 {
829 error!("Failed to send GetPendingWebSocketMessage request: {}", e);
830 return None;
831 }
832 rx.await
833 .map_err(|e| {
834 error!("Failed to receive WebSocketMessage response: {}", e);
835 e
836 })
837 .unwrap_or(None)
838 }
839
840 pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
841 if let Err(e) = self
842 .intercept_broker
843 .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
844 .await
845 {
846 error!("Failed to send SetPendingWebSocketMessage request: {}", e);
847 }
848 }
849
850 pub async fn is_intercept_pending(&self, key: String) -> bool {
851 let (tx, rx) = oneshot::channel();
852 if let Err(e) = self
853 .intercept_broker
854 .send(InterceptBrokerMessage::GetPendingIntercept {
855 key,
856 respond_to: tx,
857 })
858 .await
859 {
860 error!("Failed to send GetPendingIntercept request: {}", e);
861 return false;
862 }
863 rx.await
864 .map_err(|e| {
865 error!("Failed to receive PendingIntercept response: {}", e);
866 e
867 })
868 .unwrap_or(false)
869 }
870
871 pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
872 let (tx, rx) = oneshot::channel();
873 if let Err(e) = self
874 .intercept_broker
875 .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
876 flow_id,
877 respond_to: tx,
878 })
879 .await
880 {
881 error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
882 return false;
883 }
884 rx.await
885 .map_err(|e| {
886 error!("Failed to receive PendingInterceptByFlowId response: {}", e);
887 e
888 })
889 .unwrap_or(false)
890 }
891
892 pub fn upsert_flow(&self, flow: Box<Flow>) {
893 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
894 error!("FlowStore dropped flow: {}", e);
896 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
897 }
898 }
899
900 pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
901 if let Err(e) = self
902 .flow_store
903 .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
904 {
905 error!("FlowStore dropped WS message: {}", e);
906 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
907 }
908 }
909
910 pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
911 if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
912 flow_id,
913 body,
914 direction,
915 }) {
916 error!("FlowStore dropped HTTP body: {}", e);
917 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
918 }
919 }
920
921 pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
923 if let Err(e) = self
924 .flow_store
925 .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
926 {
927 error!("FlowStore dropped budget-exceeded tag: {}", e);
928 self.flows_dropped.fetch_add(1, Ordering::Relaxed);
929 }
930 }
931
932 pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
934 self.flow_broadcast_tx.subscribe()
935 }
936
937 pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
938 self.audit_broadcast_tx.subscribe()
939 }
940
941 fn current_redaction_policy(&self) -> RedactionPolicy {
942 self.policy_tx.borrow().redaction.clone()
943 }
944
945 pub fn record_flow_events_lagged(&self, skipped: u64) {
946 self.flow_events_lagged_total
947 .fetch_add(skipped as usize, Ordering::Relaxed);
948 }
949
950 pub fn record_audit_events_lagged(&self, skipped: u64) {
951 self.audit_events_lagged_total
952 .fetch_add(skipped as usize, Ordering::Relaxed);
953 }
954
955 pub fn lifecycle(&self) -> RuntimeLifecycle {
956 self.lifecycle_tx.borrow().clone()
957 }
958
959 pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
960 self.lifecycle_tx.subscribe()
961 }
962
963 fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
964 let current = self.lifecycle();
965 if current.is_active() {
966 return Err(format!(
967 "Proxy is already {} on port {}",
968 current.phase.as_str(),
969 current.port.unwrap_or(port)
970 ));
971 }
972
973 let mut guard = self
974 .shutdown_tx
975 .lock()
976 .map_err(|_| "shutdown state poisoned".to_string())?;
977 *guard = Some(shutdown_tx);
978 drop(guard);
979
980 self.update_lifecycle(RuntimeLifecycle {
981 phase: RuntimeLifecyclePhase::Starting,
982 port: Some(port),
983 started_at_ms: None,
984 last_error: None,
985 });
986 Ok(())
987 }
988
989 pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
990 let mut guard = self
991 .shutdown_tx
992 .lock()
993 .map_err(|_| "shutdown state poisoned".to_string())?;
994 let Some(tx) = guard.take() else {
995 return Ok(ProxyStopResult::NotRunning);
996 };
997 drop(guard);
998
999 let current = self.lifecycle();
1000 self.update_lifecycle(RuntimeLifecycle {
1001 phase: RuntimeLifecyclePhase::Stopping,
1002 port: current.port,
1003 started_at_ms: current.started_at_ms,
1004 last_error: current.last_error,
1005 });
1006 let _ = tx.send(());
1007 Ok(ProxyStopResult::Stopping)
1008 }
1009
1010 pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1011 self.audit_history
1012 .lock()
1013 .map(|events| events.iter().cloned().collect())
1014 .unwrap_or_default()
1015 }
1016
1017 pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1019 let redaction = self.current_redaction_policy();
1020 if let Some(store) = &self.store {
1021 return store
1022 .query_flow_summaries(&query)
1023 .await
1024 .unwrap_or_default()
1025 .into_iter()
1026 .map(|summary| redact_flow_summary(summary, &redaction))
1027 .collect();
1028 }
1029 let (tx, rx) = oneshot::channel();
1030 if let Err(e) = self
1031 .flow_store
1032 .send(FlowStoreMessage::SearchFlows {
1033 query,
1034 respond_to: tx,
1035 })
1036 .await
1037 {
1038 error!("Failed to send SearchFlows request: {}", e);
1039 return Vec::new();
1040 }
1041 rx.await
1042 .unwrap_or_default()
1043 .into_iter()
1044 .map(|summary| redact_flow_summary(summary, &redaction))
1045 .collect()
1046 }
1047
1048 pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1049 let redaction = self.current_redaction_policy();
1050 redact_flow_update(update, &redaction)
1051 }
1052
1053 pub async fn resolve_intercept_with_modifications(
1063 &self,
1064 key: String,
1065 action: &str,
1066 mods: Option<relay_core_api::modification::FlowModification>,
1067 ) -> Result<(), String> {
1068 self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1069 .await
1070 }
1071
1072 pub async fn resolve_intercept_with_modifications_from(
1073 &self,
1074 actor: AuditActor,
1075 key: String,
1076 action: &str,
1077 mods: Option<relay_core_api::modification::FlowModification>,
1078 ) -> Result<(), String> {
1079 let modified_fields = modification_field_names(mods.as_ref());
1080 let result = match action {
1081 "drop" => InterceptionResult::Drop,
1082 _ => match mods {
1083 None => InterceptionResult::Continue,
1084 Some(m) => {
1085 let parts: Vec<&str> = key.splitn(4, ':').collect();
1086 match parts.as_slice() {
1087 [flow_id, phase] => {
1088 if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1089 modification::apply_flow_modification(&flow, phase, m)
1090 } else {
1091 InterceptionResult::Continue
1092 }
1093 }
1094 [_, "ws_msg", _] => {
1097 if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1098 modification::apply_ws_modification(&msg, m)
1099 } else {
1100 InterceptionResult::Continue
1101 }
1102 }
1103 _ => InterceptionResult::Continue,
1104 }
1105 }
1106 },
1107 };
1108 let outcome = self.resolve_intercept(key.clone(), result).await;
1109 let audit_outcome = if outcome.is_ok() {
1110 AuditOutcome::Success
1111 } else {
1112 AuditOutcome::Failed
1113 };
1114 let error_message = outcome.as_ref().err().cloned();
1115
1116 self.record_audit_event(AuditEvent::new(
1117 actor,
1118 AuditEventKind::InterceptResolved,
1119 key,
1120 audit_outcome,
1121 json!({
1122 "action": action,
1123 "has_modifications": !modified_fields.is_empty(),
1124 "modified_fields": modified_fields,
1125 "error": error_message
1126 }),
1127 ));
1128 outcome
1129 }
1130
1131 #[cfg(feature = "script")]
1132 pub async fn load_script_from(
1133 &self,
1134 actor: AuditActor,
1135 target: String,
1136 script: &str,
1137 ) -> Result<(), String> {
1138 let result = self
1139 .script_interceptor
1140 .load_script(script)
1141 .await
1142 .map_err(|e| e.to_string());
1143
1144 let outcome = if result.is_ok() {
1145 AuditOutcome::Success
1146 } else {
1147 AuditOutcome::Failed
1148 };
1149 let error_message = result.as_ref().err().cloned();
1150
1151 self.record_audit_event(AuditEvent::new(
1152 actor,
1153 AuditEventKind::ScriptReloaded,
1154 target,
1155 outcome,
1156 json!({
1157 "script_bytes": script.len(),
1158 "error": error_message
1159 }),
1160 ));
1161
1162 result
1163 }
1164
1165 #[cfg(feature = "script")]
1167 pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1168 self.script_interceptor.set_env_allow(env_allow).await;
1169 }
1170
1171 fn record_audit_event(&self, event: AuditEvent) {
1172 const AUDIT_HISTORY_LIMIT: usize = 200;
1173 self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1174 if event.outcome == AuditOutcome::Failed {
1175 self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1176 }
1177
1178 let details = event.details.to_string();
1179 tracing::info!(
1180 target: "relay_core_audit",
1181 event_id = %event.id,
1182 actor = %event.actor.as_str(),
1183 kind = %event.kind.as_str(),
1184 target = %event.target,
1185 outcome = %event.outcome.as_str(),
1186 details = %details
1187 );
1188
1189 if let Ok(mut history) = self.audit_history.lock() {
1190 if history.len() >= AUDIT_HISTORY_LIMIT {
1191 history.pop_front();
1192 }
1193 history.push_back(event.clone());
1194 }
1195
1196 if let Some(store) = self.store.clone() {
1197 let event_json = serde_json::to_value(&event).unwrap_or_default();
1198 let event_id = event.id.clone();
1199 let timestamp_ms = event.timestamp_ms;
1200 let actor = event.actor.as_str().to_string();
1201 let kind = event.kind.as_str().to_string();
1202 let target = event.target.clone();
1203 let outcome = event.outcome.as_str().to_string();
1204 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1205 handle.spawn(async move {
1206 if let Err(e) = store
1207 .save_audit_event(AuditEventRecord {
1208 id: &event_id,
1209 timestamp_ms,
1210 actor: &actor,
1211 kind: &kind,
1212 target: &target,
1213 outcome: &outcome,
1214 content: &event_json,
1215 })
1216 .await
1217 {
1218 tracing::error!("Failed to persist audit event: {}", e);
1219 }
1220 });
1221 }
1222 }
1223
1224 let _ = self.audit_broadcast_tx.send(event);
1225 }
1226
1227 fn transition_to_running(&self, port: u16) {
1228 self.update_lifecycle(RuntimeLifecycle {
1229 phase: RuntimeLifecyclePhase::Running,
1230 port: Some(port),
1231 started_at_ms: Some(now_unix_ms()),
1232 last_error: None,
1233 });
1234 }
1235
1236 fn transition_to_stopped(&self) {
1237 if let Ok(mut guard) = self.shutdown_tx.lock() {
1238 *guard = None;
1239 }
1240
1241 self.update_lifecycle(RuntimeLifecycle {
1242 phase: RuntimeLifecyclePhase::Stopped,
1243 port: None,
1244 started_at_ms: None,
1245 last_error: None,
1246 });
1247 }
1248
1249 fn transition_to_failed(&self, port: u16, error: String) {
1250 if let Ok(mut guard) = self.shutdown_tx.lock() {
1251 *guard = None;
1252 }
1253
1254 self.update_lifecycle(RuntimeLifecycle {
1255 phase: RuntimeLifecyclePhase::Failed,
1256 port: Some(port),
1257 started_at_ms: None,
1258 last_error: Some(error),
1259 });
1260 }
1261
1262 fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1263 tracing::info!(
1264 target: "relay_core_lifecycle",
1265 phase = %lifecycle.phase.as_str(),
1266 port = ?lifecycle.port,
1267 started_at_ms = ?lifecycle.started_at_ms,
1268 last_error = ?lifecycle.last_error
1269 );
1270 self.lifecycle_tx.send_replace(lifecycle);
1271 }
1272
1273 pub fn spawn_proxy(
1274 self: &Arc<Self>,
1275 config: ProxyConfig,
1276 sink: mpsc::Sender<FlowUpdate>,
1277 extra_interceptor: Option<Arc<dyn Interceptor>>,
1278 ) -> Result<ProxySpawnResult, String> {
1279 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1280 match self.prepare_start(config.port, shutdown_tx) {
1281 Ok(()) => {}
1282 Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1283 Err(error) => return Err(error),
1284 }
1285 let state = self.clone();
1286 Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1287 if let Err(error) = state
1288 .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1289 .await
1290 {
1291 error!("Proxy failed: {}", error);
1292 }
1293 })))
1294 }
1295
1296 pub async fn start_proxy(
1297 self: &Arc<Self>,
1298 config: ProxyConfig,
1299 sink: mpsc::Sender<FlowUpdate>,
1300 extra_interceptor: Option<Arc<dyn Interceptor>>,
1301 ) -> Result<(), String> {
1302 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1303 self.prepare_start(config.port, shutdown_tx)?;
1304 self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1305 .await
1306 }
1307
1308 async fn run_proxy(
1309 self: &Arc<Self>,
1310 config: ProxyConfig,
1311 sink: mpsc::Sender<FlowUpdate>,
1312 extra_interceptor: Option<Arc<dyn Interceptor>>,
1313 shutdown_rx: oneshot::Receiver<()>,
1314 ) -> Result<(), String> {
1315 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1316 let state = self.clone();
1317
1318 if let Some(parent) = config.ca_cert_path.parent()
1319 && !parent.exists()
1320 {
1321 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1322 }
1323
1324 let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1325 .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1326 let ca = Arc::new(ca);
1327
1328 #[cfg(feature = "script")]
1329 let script_interceptor = self.script_interceptor.clone();
1330
1331 #[cfg(feature = "script")]
1332 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1333
1334 #[cfg(not(feature = "script"))]
1335 let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1336
1337 interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1338 self.clone(),
1339 )));
1340
1341 if let Some(interceptor) = extra_interceptor {
1342 interceptors.push(interceptor);
1343 }
1344
1345 let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1346 let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1347
1348 tokio::spawn(async move {
1349 while let Some(update) = proxy_rx.recv().await {
1350 match update.clone() {
1351 FlowUpdate::Full(flow) => {
1352 state.upsert_flow(flow);
1353 }
1354 FlowUpdate::WebSocketMessage { flow_id, message } => {
1355 state.append_ws_message(flow_id, message);
1356 }
1357 FlowUpdate::HttpBody {
1358 flow_id,
1359 direction,
1360 body,
1361 } => {
1362 state.update_http_body(flow_id, body, direction);
1363 }
1364 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1365 state.tag_flow_budget_exceeded(flow_id, direction);
1367 }
1368 }
1369
1370 let _ = state.flow_broadcast_tx.send(update.clone());
1371
1372 if sink.try_send(update).is_err() {
1373 relay_core_lib::metrics::inc_flows_dropped();
1374 }
1375 }
1376 });
1377
1378 if let Some(udp_port) = config.udp_tproxy_port {
1379 let udp_proxy_tx = proxy_tx.clone();
1380 let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1381 tokio::spawn(async move {
1382 match tokio::net::UdpSocket::bind(udp_addr).await {
1383 Ok(socket) => {
1384 let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1385 if let Err(e) = proxy.run(udp_proxy_tx).await {
1386 error!("UDP TPROXY failed: {}", e);
1387 }
1388 }
1389 Err(e) => {
1390 error!("Failed to bind UDP TPROXY socket: {}", e);
1391 }
1392 }
1393 });
1394 }
1395
1396 let listener = match TcpListener::bind(addr).await {
1397 Ok(listener) => listener,
1398 Err(e) => {
1399 if let Ok(mut guard) = self.shutdown_tx.lock() {
1400 *guard = None;
1401 }
1402 let message = format!("Failed to bind to address {}: {}", addr, e);
1403 self.transition_to_failed(config.port, message.clone());
1404 return Err(message);
1405 }
1406 };
1407
1408 self.transition_to_running(config.port);
1409 let shutdown_rx = Some(shutdown_rx);
1410
1411 if config.transparent {
1412 let policy = ProxyPolicy {
1413 transparent_enabled: true,
1414 ..Default::default()
1415 };
1416 self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1417 let policy_rx = self.policy_tx.subscribe();
1418
1419 let provider: Arc<dyn OriginalDstProvider> = {
1420 let mut addrs = BTreeSet::new();
1421 if let Ok(local) = listener.local_addr() {
1422 addrs.insert(local);
1423 }
1424
1425 #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1426 {
1427 Arc::new(LinuxOriginalDstProvider::new(addrs))
1428 }
1429 #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1430 {
1431 match MacOsOriginalDstProvider::new(addrs.clone()) {
1432 Ok(provider) => Arc::new(provider),
1433 Err(e) => {
1434 error!("Failed to initialize macOS PF provider: {}", e);
1435 Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1436 }
1437 }
1438 }
1439 #[cfg(target_os = "windows")]
1440 {
1441 let filter =
1442 "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1443 .to_string();
1444 let port = config.port;
1445 tokio::spawn(async move {
1446 relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1447 .await;
1448 });
1449
1450 Arc::new(WindowsOriginalDstProvider::new(addrs))
1451 }
1452 #[cfg(not(any(
1453 all(target_os = "linux", feature = "transparent-linux"),
1454 all(target_os = "macos", feature = "transparent-macos"),
1455 target_os = "windows"
1456 )))]
1457 {
1458 Arc::new(NoOpOriginalDstProvider::new(addrs))
1459 }
1460 };
1461
1462 let source = TransparentTcpCaptureSource::new(listener, provider);
1463 let result = relay_core_lib::start_proxy(
1464 source,
1465 proxy_tx,
1466 interceptor,
1467 ca,
1468 policy_rx,
1469 None,
1470 shutdown_rx,
1471 None,
1472 )
1473 .await
1474 .map_err(|e| e.to_string());
1475 if let Err(error) = &result {
1476 self.transition_to_failed(config.port, error.clone());
1477 } else {
1478 self.transition_to_stopped();
1479 }
1480 result
1481 } else {
1482 let source = TcpCaptureSource::new(listener);
1483 let policy = ProxyPolicy::default();
1484 self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1485 let policy_rx = self.policy_tx.subscribe();
1486
1487 let result = relay_core_lib::start_proxy(
1488 source,
1489 proxy_tx,
1490 interceptor,
1491 ca,
1492 policy_rx,
1493 None,
1494 shutdown_rx,
1495 None,
1496 )
1497 .await
1498 .map_err(|e| e.to_string());
1499 if let Err(error) = &result {
1500 self.transition_to_failed(config.port, error.clone());
1501 } else {
1502 self.transition_to_stopped();
1503 }
1504 result
1505 }
1506 }
1507}
1508
1509fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1510 match update {
1511 FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1512 FlowUpdate::WebSocketMessage {
1513 flow_id,
1514 mut message,
1515 } => {
1516 message.content = redact_body(message.content, redaction);
1517 FlowUpdate::WebSocketMessage { flow_id, message }
1518 }
1519 FlowUpdate::HttpBody {
1520 flow_id,
1521 direction,
1522 body,
1523 } => FlowUpdate::HttpBody {
1524 flow_id,
1525 direction,
1526 body: redact_body(body, redaction),
1527 },
1528 FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1529 FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1530 }
1531 }
1532}
1533
1534fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1535 if !redaction.enabled {
1536 return flow;
1537 }
1538 match &mut flow.layer {
1539 Layer::Http(http) => {
1540 redact_http_request(&mut http.request, redaction);
1541 if let Some(response) = &mut http.response {
1542 redact_headers(&mut response.headers, redaction);
1543 response.body = response
1544 .body
1545 .take()
1546 .map(|body| redact_body(body, redaction));
1547 }
1548 }
1549 Layer::WebSocket(ws) => {
1550 redact_http_request(&mut ws.handshake_request, redaction);
1551 redact_headers(&mut ws.handshake_response.headers, redaction);
1552 ws.handshake_response.body = ws
1553 .handshake_response
1554 .body
1555 .take()
1556 .map(|body| redact_body(body, redaction));
1557 for message in &mut ws.messages {
1558 message.content = redact_body(message.content.clone(), redaction);
1559 }
1560 }
1561 _ => {}
1562 }
1563 flow
1564}
1565
1566fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1567 if !redaction.enabled {
1568 return summary;
1569 }
1570 summary.url = redact_url_string(&summary.url, redaction);
1571 summary
1572}
1573
1574fn redact_http_request(
1575 request: &mut relay_core_api::flow::HttpRequest,
1576 redaction: &RedactionPolicy,
1577) {
1578 redact_headers(&mut request.headers, redaction);
1579 redact_query_pairs(&mut request.query, redaction);
1580 request.url = redact_url(&request.url, redaction);
1581 request.body = request.body.take().map(|body| redact_body(body, redaction));
1582}
1583
1584fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1585 if !redaction.enabled {
1586 return;
1587 }
1588 let sensitive = redaction_set(&redaction.sensitive_header_names);
1589 for (name, value) in headers.iter_mut() {
1590 if sensitive.contains(&name.to_ascii_lowercase()) {
1591 *value = "[REDACTED]".to_string();
1592 }
1593 }
1594}
1595
1596fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1597 if !redaction.enabled {
1598 return;
1599 }
1600 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1601 for (name, value) in query.iter_mut() {
1602 if sensitive.contains(&name.to_ascii_lowercase()) {
1603 *value = "[REDACTED]".to_string();
1604 }
1605 }
1606}
1607
1608fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1609 if !redaction.enabled {
1610 return url.clone();
1611 }
1612 let sensitive = redaction_set(&redaction.sensitive_query_keys);
1613 let pairs: Vec<(String, String)> = url
1614 .query_pairs()
1615 .map(|(k, v)| {
1616 let key = k.to_string();
1617 let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1618 "[REDACTED]".to_string()
1619 } else {
1620 v.to_string()
1621 };
1622 (key, value)
1623 })
1624 .collect();
1625 let mut next = url.clone();
1626 if pairs.is_empty() {
1627 return next;
1628 }
1629 next.query_pairs_mut().clear();
1630 for (k, v) in pairs {
1631 next.query_pairs_mut().append_pair(&k, &v);
1632 }
1633 next
1634}
1635
1636fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1637 match url::Url::parse(input) {
1638 Ok(url) => redact_url(&url, redaction).to_string(),
1639 Err(_) => input.to_string(),
1640 }
1641}
1642
1643fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1644 if redaction.enabled && redaction.redact_bodies {
1645 body.content = "[REDACTED]".to_string();
1646 }
1647 body
1648}
1649
1650fn redaction_set(values: &[String]) -> HashSet<String> {
1651 values
1652 .iter()
1653 .map(|value| value.to_ascii_lowercase())
1654 .collect()
1655}
1656
1657#[derive(Clone)]
1658pub struct ProxyConfig {
1659 pub port: u16,
1660 pub ca_cert_path: std::path::PathBuf,
1661 pub ca_key_path: std::path::PathBuf,
1662 pub transparent: bool,
1663 pub udp_tproxy_port: Option<u16>,
1664}
1665
1666impl ProxyConfig {
1667 pub fn new(
1668 port: u16,
1669 ca_cert_path: std::path::PathBuf,
1670 ca_key_path: std::path::PathBuf,
1671 ) -> Self {
1672 Self {
1673 port,
1674 ca_cert_path,
1675 ca_key_path,
1676 transparent: false,
1677 udp_tproxy_port: None,
1678 }
1679 }
1680
1681 pub fn from_app_data_dir(
1682 app_data_dir: impl Into<std::path::PathBuf>,
1683 port: u16,
1684 ) -> Result<Self, String> {
1685 let app_data_dir = app_data_dir.into();
1686 if !app_data_dir.exists() {
1687 std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1688 }
1689
1690 Ok(Self::new(
1691 port,
1692 app_data_dir.join("ca_cert.pem"),
1693 app_data_dir.join("ca_key.pem"),
1694 ))
1695 }
1696
1697 pub fn with_transparent(mut self, transparent: bool) -> Self {
1698 self.transparent = transparent;
1699 self
1700 }
1701
1702 pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1703 self.udp_tproxy_port = udp_tproxy_port;
1704 self
1705 }
1706}
1707
1708fn now_unix_ms() -> u64 {
1709 SystemTime::now()
1710 .duration_since(UNIX_EPOCH)
1711 .unwrap_or_default()
1712 .as_millis() as u64
1713}
1714
1715fn modification_field_names(
1716 mods: Option<&relay_core_api::modification::FlowModification>,
1717) -> Vec<&'static str> {
1718 let Some(mods) = mods else {
1719 return Vec::new();
1720 };
1721
1722 let mut fields = Vec::new();
1723 if mods.method.is_some() {
1724 fields.push("method");
1725 }
1726 if mods.url.is_some() {
1727 fields.push("url");
1728 }
1729 if mods.request_headers.is_some() {
1730 fields.push("request_headers");
1731 }
1732 if mods.request_body.is_some() {
1733 fields.push("request_body");
1734 }
1735 if mods.status_code.is_some() {
1736 fields.push("status_code");
1737 }
1738 if mods.response_headers.is_some() {
1739 fields.push("response_headers");
1740 }
1741 if mods.response_body.is_some() {
1742 fields.push("response_body");
1743 }
1744 if mods.message_content.is_some() {
1745 fields.push("message_content");
1746 }
1747 fields
1748}
1749
1750#[cfg(test)]
1751mod tests {
1752 use super::*;
1753 use chrono::Utc;
1754 use relay_core_api::flow::{
1755 BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1756 ResponseTiming, TransportProtocol,
1757 };
1758 use std::sync::atomic::{AtomicU64, Ordering};
1759 use tokio::time::{Duration, sleep};
1760 use url::Url;
1761 use uuid::Uuid;
1762
1763 static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1764
1765 fn sqlite_url() -> String {
1766 let nanos = SystemTime::now()
1767 .duration_since(UNIX_EPOCH)
1768 .expect("clock drift")
1769 .as_nanos();
1770 let pid = std::process::id();
1771 let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1772 let db_dir = std::env::current_dir()
1773 .expect("cwd")
1774 .join("target")
1775 .join("test-dbs");
1776 std::fs::create_dir_all(&db_dir).expect("create test db dir");
1777 let db_path = db_dir.join(format!(
1778 "relay-core-runtime-test-{}-{}-{}.db",
1779 pid, nanos, seq
1780 ));
1781 format!("sqlite://{}?mode=rwc", db_path.display())
1782 }
1783
1784 fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1785 let start_time =
1786 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1787 let request_url =
1788 Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1789 Flow {
1790 id: Uuid::new_v4(),
1791 start_time,
1792 end_time: Some(start_time),
1793 network: NetworkInfo {
1794 client_ip: "127.0.0.1".to_string(),
1795 client_port: 12000,
1796 server_ip: "127.0.0.1".to_string(),
1797 server_port: 8080,
1798 protocol: TransportProtocol::TCP,
1799 tls: false,
1800 tls_version: None,
1801 sni: None,
1802 },
1803 layer: Layer::Http(HttpLayer {
1804 request: HttpRequest {
1805 method: method.to_string(),
1806 url: request_url,
1807 version: "HTTP/1.1".to_string(),
1808 headers: vec![],
1809 cookies: vec![],
1810 query: vec![],
1811 body: None,
1812 },
1813 response: Some(HttpResponse {
1814 status,
1815 status_text: "OK".to_string(),
1816 version: "HTTP/1.1".to_string(),
1817 headers: vec![],
1818 cookies: vec![],
1819 body: None,
1820 timing: ResponseTiming {
1821 time_to_first_byte: None,
1822 time_to_last_byte: None,
1823 connect_time_ms: None,
1824 ssl_time_ms: None,
1825 },
1826 }),
1827 error: None,
1828 }),
1829 tags: vec![],
1830 meta: std::collections::HashMap::new(),
1831 resilience_trace: None,
1832 rule_variables: std::collections::HashMap::new(),
1833 matched_rules: vec![],
1834 }
1835 }
1836
1837 fn sample_sensitive_http_flow(ts: i64) -> Flow {
1838 let start_time =
1839 chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1840 let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1841 .expect("url should parse");
1842 Flow {
1843 id: Uuid::new_v4(),
1844 start_time,
1845 end_time: Some(start_time),
1846 network: NetworkInfo {
1847 client_ip: "127.0.0.1".to_string(),
1848 client_port: 12000,
1849 server_ip: "127.0.0.1".to_string(),
1850 server_port: 8080,
1851 protocol: TransportProtocol::TCP,
1852 tls: false,
1853 tls_version: None,
1854 sni: None,
1855 },
1856 layer: Layer::Http(HttpLayer {
1857 request: HttpRequest {
1858 method: "GET".to_string(),
1859 url: request_url,
1860 version: "HTTP/1.1".to_string(),
1861 headers: vec![
1862 (
1863 "Authorization".to_string(),
1864 "Bearer secret-token".to_string(),
1865 ),
1866 ("X-Normal".to_string(), "visible".to_string()),
1867 ],
1868 cookies: vec![],
1869 query: vec![
1870 ("token".to_string(), "abc123".to_string()),
1871 ("ok".to_string(), "1".to_string()),
1872 ],
1873 body: Some(BodyData {
1874 encoding: "utf-8".to_string(),
1875 content: "secret request body".to_string(),
1876 size: 19,
1877 }),
1878 },
1879 response: Some(HttpResponse {
1880 status: 200,
1881 status_text: "OK".to_string(),
1882 version: "HTTP/1.1".to_string(),
1883 headers: vec![
1884 ("Set-Cookie".to_string(), "session=abcd".to_string()),
1885 ("X-Response".to_string(), "visible".to_string()),
1886 ],
1887 cookies: vec![],
1888 body: Some(BodyData {
1889 encoding: "utf-8".to_string(),
1890 content: "secret response body".to_string(),
1891 size: 20,
1892 }),
1893 timing: ResponseTiming {
1894 time_to_first_byte: None,
1895 time_to_last_byte: None,
1896 connect_time_ms: None,
1897 ssl_time_ms: None,
1898 },
1899 }),
1900 error: None,
1901 }),
1902 tags: vec![],
1903 meta: std::collections::HashMap::new(),
1904 resilience_trace: None,
1905 rule_variables: std::collections::HashMap::new(),
1906 matched_rules: vec![],
1907 }
1908 }
1909
1910 #[tokio::test]
1911 async fn set_rules_from_records_audit_event() {
1912 let state = CoreState::new(None).await;
1913
1914 state
1915 .set_rules_from(
1916 AuditActor::Http,
1917 "rule.upsert",
1918 "rule-1".to_string(),
1919 json!({ "route": "/api/v1/rules" }),
1920 Vec::new(),
1921 )
1922 .await
1923 .expect("set rules should succeed");
1924
1925 let events = state.recent_audit_events();
1926 let event = events.last().expect("audit event should exist");
1927 assert_eq!(event.actor, AuditActor::Http);
1928 assert_eq!(event.kind, AuditEventKind::RuleChanged);
1929 assert_eq!(event.outcome, AuditOutcome::Success);
1930 assert_eq!(event.target, "rule-1");
1931 assert_eq!(event.details["operation"], "rule.upsert");
1932 assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1933 }
1934
1935 #[tokio::test]
1936 async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1937 let state = CoreState::new(None).await;
1938
1939 state
1940 .upsert_rule_from(
1941 AuditActor::Probe,
1942 "rule.upsert",
1943 "rule-1".to_string(),
1944 json!({ "tool": "set_rule" }),
1945 Rule {
1946 id: "rule-1".to_string(),
1947 name: "first".to_string(),
1948 active: true,
1949 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1950 priority: 1,
1951 termination: relay_core_lib::rule::RuleTermination::Continue,
1952 filter: relay_core_lib::rule::Filter::Url(
1953 relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1954 ),
1955 actions: vec![],
1956 constraints: None,
1957 },
1958 )
1959 .await
1960 .expect("initial upsert should succeed");
1961
1962 state
1963 .upsert_rule_from(
1964 AuditActor::Probe,
1965 "rule.upsert",
1966 "rule-1".to_string(),
1967 json!({ "tool": "set_rule" }),
1968 Rule {
1969 id: "rule-1".to_string(),
1970 name: "second".to_string(),
1971 active: true,
1972 stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1973 priority: 2,
1974 termination: relay_core_lib::rule::RuleTermination::Continue,
1975 filter: relay_core_lib::rule::Filter::Url(
1976 relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1977 ),
1978 actions: vec![],
1979 constraints: None,
1980 },
1981 )
1982 .await
1983 .expect("replacement upsert should succeed");
1984
1985 let rules = state.get_rules().await;
1986 assert_eq!(rules.len(), 1);
1987 assert_eq!(rules[0].name, "second");
1988 let event = state
1989 .recent_audit_events()
1990 .last()
1991 .cloned()
1992 .expect("audit event");
1993 assert_eq!(event.details["operation"], "rule.upsert");
1994 assert_eq!(event.details["details"]["tool"], "set_rule");
1995 }
1996
1997 #[tokio::test]
1998 async fn delete_rule_from_returns_false_when_rule_missing() {
1999 let state = CoreState::new(None).await;
2000
2001 let deleted = state
2002 .delete_rule_from(
2003 AuditActor::Http,
2004 "rule.delete",
2005 "missing".to_string(),
2006 json!({ "route": "/api/v1/rules/{id}" }),
2007 "missing",
2008 )
2009 .await
2010 .expect("delete should not fail");
2011
2012 assert!(!deleted);
2013 assert!(state.recent_audit_events().is_empty());
2014 }
2015
2016 #[tokio::test]
2017 async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2018 let state = CoreState::new(None).await;
2019
2020 let rule_id = state
2021 .create_mock_response_rule_from(
2022 AuditActor::Http,
2023 "api-mock-1".to_string(),
2024 json!({ "route": "/api/v1/mock", "status": 201 }),
2025 MockResponseRuleConfig {
2026 rule_id: "api-mock-1".to_string(),
2027 url_pattern: "example.com".to_string(),
2028 name: "api-mock:example.com".to_string(),
2029 status: 201,
2030 content_type: "application/json".to_string(),
2031 body: "{\"ok\":true}".to_string(),
2032 },
2033 )
2034 .await
2035 .expect("mock rule should be created");
2036
2037 assert_eq!(rule_id, "api-mock-1");
2038 let rules = state.get_rules().await;
2039 assert_eq!(rules.len(), 1);
2040 assert_eq!(rules[0].id, "api-mock-1");
2041 let event = state
2042 .recent_audit_events()
2043 .last()
2044 .cloned()
2045 .expect("audit event");
2046 assert_eq!(event.details["operation"], "rule.mock_create");
2047 assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2048 }
2049
2050 #[tokio::test]
2051 async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2052 let state = CoreState::new(None).await;
2053
2054 let rule_id = state
2055 .create_intercept_rule_from(
2056 AuditActor::Http,
2057 "intercept-1".to_string(),
2058 json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2059 InterceptRuleConfig {
2060 rule_id: "intercept-1".to_string(),
2061 active: true,
2062 url_pattern: "example.com".to_string(),
2063 method: None,
2064 phase: "request".to_string(),
2065 name: "api-intercept:example.com".to_string(),
2066 priority: 100,
2067 termination: relay_core_lib::rule::RuleTermination::Stop,
2068 },
2069 )
2070 .await
2071 .expect("intercept rule should be created");
2072
2073 assert_eq!(rule_id, "intercept-1");
2074 let rules = state.get_rules().await;
2075 assert_eq!(rules.len(), 1);
2076 assert_eq!(rules[0].id, "intercept-1");
2077 assert_eq!(rules[0].name, "api-intercept:example.com");
2078 let event = state
2079 .recent_audit_events()
2080 .last()
2081 .cloned()
2082 .expect("audit event");
2083 assert_eq!(event.details["operation"], "rule.intercept_create");
2084 assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2085 }
2086
2087 #[tokio::test]
2088 async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2089 let state = CoreState::new(None).await;
2090
2091 state
2092 .upsert_legacy_intercept_rule_from(
2093 AuditActor::Tauri,
2094 "legacy-1".to_string(),
2095 json!({ "command": "set_intercept_rule" }),
2096 InterceptRule {
2097 id: "legacy-1".to_string(),
2098 active: true,
2099 url_pattern: "example.com".to_string(),
2100 method: None,
2101 phase: "both".to_string(),
2102 },
2103 )
2104 .await
2105 .expect("initial family upsert should succeed");
2106 assert_eq!(state.get_rules().await.len(), 2);
2107
2108 state
2109 .upsert_legacy_intercept_rule_from(
2110 AuditActor::Tauri,
2111 "legacy-1".to_string(),
2112 json!({ "command": "set_intercept_rule" }),
2113 InterceptRule {
2114 id: "legacy-1".to_string(),
2115 active: true,
2116 url_pattern: "example.org".to_string(),
2117 method: Some("POST".to_string()),
2118 phase: "request".to_string(),
2119 },
2120 )
2121 .await
2122 .expect("replacement family upsert should succeed");
2123
2124 let rules = state.get_rules().await;
2125 assert_eq!(rules.len(), 1);
2126 assert_eq!(rules[0].id, "legacy-1");
2127 let event = state
2128 .recent_audit_events()
2129 .last()
2130 .cloned()
2131 .expect("audit event");
2132 assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2133 assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2134 }
2135
2136 #[tokio::test]
2137 async fn resolve_intercept_failure_records_failed_audit_event() {
2138 let state = CoreState::new(None).await;
2139
2140 let result = state
2141 .resolve_intercept_with_modifications_from(
2142 AuditActor::Probe,
2143 "missing-flow:request".to_string(),
2144 "drop",
2145 None,
2146 )
2147 .await;
2148
2149 assert!(result.is_err());
2150 let events = state.recent_audit_events();
2151 let event = events.last().expect("audit event should exist");
2152 assert_eq!(event.actor, AuditActor::Probe);
2153 assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2154 assert_eq!(event.outcome, AuditOutcome::Failed);
2155 assert_eq!(event.details["action"], "drop");
2156 assert!(
2157 event.details["error"]
2158 .as_str()
2159 .unwrap_or_default()
2160 .contains("Interception not found")
2161 );
2162 }
2163
2164 #[tokio::test]
2165 async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2166 let state = CoreState::new(None).await;
2167 let (shutdown_tx, shutdown_rx) = oneshot::channel();
2168
2169 state
2170 .prepare_start(8080, shutdown_tx)
2171 .expect("prepare start should succeed");
2172 let lifecycle = state.lifecycle();
2173 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2174 assert_eq!(lifecycle.port, Some(8080));
2175 assert!(lifecycle.started_at_ms.is_none());
2176 assert!(lifecycle.last_error.is_none());
2177
2178 assert_eq!(
2179 state.stop_proxy().expect("stop should succeed"),
2180 ProxyStopResult::Stopping
2181 );
2182 let lifecycle = state.lifecycle();
2183 assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2184 assert_eq!(lifecycle.port, Some(8080));
2185 assert!(shutdown_rx.await.is_ok());
2186 }
2187
2188 #[tokio::test]
2189 async fn status_snapshot_derives_runtime_facing_fields() {
2190 let state = CoreState::new(None).await;
2191 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2192 state
2193 .prepare_start(8080, shutdown_tx)
2194 .expect("prepare start should succeed");
2195
2196 let status = state.status_snapshot();
2197 assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2198 assert!(status.running);
2199 assert_eq!(status.port, Some(8080));
2200 assert!(status.uptime.is_none());
2201 assert!(status.last_error.is_none());
2202 }
2203
2204 #[tokio::test]
2205 async fn status_report_combines_status_and_metrics() {
2206 let state = CoreState::new(None).await;
2207 let report = state.status_report().await;
2208
2209 assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2210 assert!(!report.status.running);
2211 assert_eq!(report.metrics.intercepts_pending, 0);
2212 assert_eq!(report.metrics.ws_pending_messages, 0);
2213 assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2214 assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2215 assert_eq!(report.metrics.audit_events_total, 0);
2216 assert_eq!(report.metrics.audit_events_failed, 0);
2217 assert_eq!(report.metrics.flow_events_lagged_total, 0);
2218 assert_eq!(report.metrics.audit_events_lagged_total, 0);
2219 }
2220
2221 #[test]
2222 fn proxy_config_new_and_transport_setters_preserve_values() {
2223 let config = ProxyConfig::new(
2224 8080,
2225 std::path::PathBuf::from("/tmp/ca_cert.pem"),
2226 std::path::PathBuf::from("/tmp/ca_key.pem"),
2227 )
2228 .with_transparent(true)
2229 .with_udp_tproxy_port(Some(15000));
2230
2231 assert_eq!(config.port, 8080);
2232 assert_eq!(
2233 config.ca_cert_path,
2234 std::path::PathBuf::from("/tmp/ca_cert.pem")
2235 );
2236 assert_eq!(
2237 config.ca_key_path,
2238 std::path::PathBuf::from("/tmp/ca_key.pem")
2239 );
2240 assert!(config.transparent);
2241 assert_eq!(config.udp_tproxy_port, Some(15000));
2242 }
2243
2244 #[test]
2245 fn proxy_config_from_app_data_dir_creates_default_paths() {
2246 let unique = SystemTime::now()
2247 .duration_since(UNIX_EPOCH)
2248 .expect("clock drift")
2249 .as_nanos();
2250 let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2251
2252 let config =
2253 ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2254
2255 assert!(dir.exists());
2256 assert_eq!(config.port, 8899);
2257 assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2258 assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2259 assert!(!config.transparent);
2260 assert!(config.udp_tproxy_port.is_none());
2261 }
2262
2263 #[tokio::test]
2264 async fn intercept_snapshot_maps_pending_counts() {
2265 let state = CoreState::new(None).await;
2266 let snapshot = state.intercept_snapshot().await;
2267
2268 assert_eq!(snapshot.pending_count, 0);
2269 assert_eq!(snapshot.ws_pending_count, 0);
2270 }
2271
2272 #[tokio::test]
2273 async fn audit_snapshot_returns_latest_events_in_order() {
2274 let state = CoreState::new(None).await;
2275 state.record_audit_event(AuditEvent::new(
2276 AuditActor::Runtime,
2277 AuditEventKind::RuleChanged,
2278 "first",
2279 AuditOutcome::Success,
2280 json!({ "index": 1 }),
2281 ));
2282 state.record_audit_event(AuditEvent::new(
2283 AuditActor::Http,
2284 AuditEventKind::PolicyUpdated,
2285 "second",
2286 AuditOutcome::Success,
2287 json!({ "index": 2 }),
2288 ));
2289
2290 let snapshot = state.audit_snapshot(1);
2291
2292 assert_eq!(snapshot.events.len(), 1);
2293 assert_eq!(snapshot.events[0].target, "second");
2294 assert_eq!(snapshot.events[0].details["index"], 2);
2295 }
2296
2297 #[tokio::test]
2298 async fn query_audit_snapshot_filters_in_memory_events() {
2299 let state = CoreState::new(None).await;
2300 state.record_audit_event(AuditEvent::new(
2301 AuditActor::Http,
2302 AuditEventKind::RuleChanged,
2303 "rule-1",
2304 AuditOutcome::Success,
2305 json!({ "idx": 1 }),
2306 ));
2307 state.record_audit_event(AuditEvent::new(
2308 AuditActor::Probe,
2309 AuditEventKind::PolicyUpdated,
2310 "policy",
2311 AuditOutcome::Failed,
2312 json!({ "idx": 2 }),
2313 ));
2314
2315 let snapshot = state
2316 .query_audit_snapshot(CoreAuditQuery {
2317 actor: Some(AuditActor::Probe),
2318 kind: Some(AuditEventKind::PolicyUpdated),
2319 outcome: Some(AuditOutcome::Failed),
2320 limit: 10,
2321 ..Default::default()
2322 })
2323 .await;
2324
2325 assert_eq!(snapshot.events.len(), 1);
2326 assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2327 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2328 assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2329 }
2330
2331 #[tokio::test]
2332 async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2333 let state = CoreState::new(Some(sqlite_url())).await;
2334 state.update_policy_from(
2335 AuditActor::Http,
2336 "policy".to_string(),
2337 ProxyPolicy {
2338 transparent_enabled: true,
2339 ..Default::default()
2340 },
2341 );
2342
2343 let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2344 for _ in 0..10 {
2345 snapshot = state
2346 .query_audit_snapshot(CoreAuditQuery {
2347 actor: Some(AuditActor::Http),
2348 kind: Some(AuditEventKind::PolicyUpdated),
2349 limit: 10,
2350 ..Default::default()
2351 })
2352 .await;
2353 if !snapshot.events.is_empty() {
2354 break;
2355 }
2356 sleep(Duration::from_millis(20)).await;
2357 }
2358
2359 assert!(!snapshot.events.is_empty());
2360 assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2361 assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2362 }
2363
2364 #[tokio::test]
2365 async fn prepare_start_rejects_second_active_start() {
2366 let state = CoreState::new(None).await;
2367 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2368 state
2369 .prepare_start(8080, shutdown_tx)
2370 .expect("first start should succeed");
2371
2372 let (second_tx, _second_rx) = oneshot::channel();
2373 let error = state
2374 .prepare_start(8081, second_tx)
2375 .expect_err("second active start should be rejected");
2376 assert!(error.contains("already"));
2377 }
2378
2379 #[test]
2380 fn update_policy_records_audit_event() {
2381 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2382 let state = runtime.block_on(CoreState::new(None));
2383
2384 state.update_policy_from(
2385 AuditActor::Runtime,
2386 "policy".to_string(),
2387 ProxyPolicy {
2388 transparent_enabled: true,
2389 ..Default::default()
2390 },
2391 );
2392
2393 let events = state.recent_audit_events();
2394 let event = events.last().expect("audit event should exist");
2395 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2396 assert_eq!(event.outcome, AuditOutcome::Success);
2397 assert_eq!(event.details["transparent_enabled"], true);
2398 }
2399
2400 #[test]
2401 fn patch_policy_updates_redaction_without_replacing_other_fields() {
2402 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2403 let state = runtime.block_on(CoreState::new(None));
2404 let original_timeout = state.policy_snapshot().request_timeout_ms;
2405
2406 state.patch_policy_from(
2407 AuditActor::Runtime,
2408 "policy.patch".to_string(),
2409 relay_core_api::policy::ProxyPolicyPatch {
2410 redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2411 enabled: Some(true),
2412 redact_bodies: Some(true),
2413 ..Default::default()
2414 }),
2415 },
2416 );
2417
2418 let policy = state.policy_snapshot();
2419 assert_eq!(policy.request_timeout_ms, original_timeout);
2420 assert!(policy.redaction.enabled);
2421 assert!(policy.redaction.redact_bodies);
2422
2423 let events = state.recent_audit_events();
2424 let event = events.last().expect("audit event should exist");
2425 assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2426 assert_eq!(event.details["redaction_enabled"], true);
2427 }
2428
2429 #[tokio::test]
2430 async fn metrics_include_audit_and_lagged_event_counters() {
2431 let state = CoreState::new(None).await;
2432
2433 state.update_policy_from(
2434 AuditActor::Runtime,
2435 "policy".to_string(),
2436 ProxyPolicy::default(),
2437 );
2438 let _ = state
2439 .resolve_intercept_with_modifications_from(
2440 AuditActor::Probe,
2441 "missing-flow:request".to_string(),
2442 "drop",
2443 None,
2444 )
2445 .await;
2446
2447 state.record_flow_events_lagged(3);
2448 state.record_audit_events_lagged(5);
2449
2450 let metrics = state.get_metrics().await;
2451 assert_eq!(metrics.audit_events_total, 2);
2452 assert_eq!(metrics.audit_events_failed, 1);
2453 assert_eq!(metrics.flow_events_lagged_total, 3);
2454 assert_eq!(metrics.audit_events_lagged_total, 5);
2455 }
2456
2457 #[tokio::test]
2458 async fn prometheus_metrics_text_contains_observability_fields() {
2459 let state = CoreState::new(None).await;
2460 state.record_flow_events_lagged(2);
2461 state.record_audit_events_lagged(4);
2462
2463 let text = state.get_metrics_prometheus_text().await;
2464 assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2465 assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2466 assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2467 assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2468 }
2469
2470 #[tokio::test]
2471 async fn search_flows_uses_store_with_offset_pagination() {
2472 let state = CoreState::new(Some(sqlite_url())).await;
2473 let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2474 let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2475 let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2476
2477 state.upsert_flow(Box::new(flow_a));
2478 state.upsert_flow(Box::new(flow_b));
2479 state.upsert_flow(Box::new(flow_c));
2480
2481 let mut baseline = Vec::new();
2482 for _ in 0..20 {
2483 baseline = state
2484 .search_flows(FlowQuery {
2485 host: Some("api.example.com".to_string()),
2486 path_contains: None,
2487 method: None,
2488 status_min: None,
2489 status_max: None,
2490 has_error: None,
2491 is_websocket: None,
2492 limit: Some(3),
2493 offset: Some(0),
2494 })
2495 .await;
2496 if baseline.len() == 3 {
2497 break;
2498 }
2499 sleep(Duration::from_millis(20)).await;
2500 }
2501
2502 assert_eq!(baseline.len(), 3);
2503 let page = state
2504 .search_flows(FlowQuery {
2505 host: Some("api.example.com".to_string()),
2506 path_contains: None,
2507 method: None,
2508 status_min: None,
2509 status_max: None,
2510 has_error: None,
2511 is_websocket: None,
2512 limit: Some(1),
2513 offset: Some(1),
2514 })
2515 .await;
2516 assert_eq!(page.len(), 1);
2517 assert_eq!(page[0].id, baseline[1].id);
2518 }
2519
2520 #[tokio::test]
2521 async fn get_flow_falls_back_to_store_after_lru_eviction() {
2522 let state = CoreState::new(Some(sqlite_url())).await;
2523 let first_flow = sample_http_flow(
2524 "persist.example.com",
2525 "/first",
2526 "GET",
2527 200,
2528 1_700_000_010_000,
2529 );
2530 let first_id = first_flow.id.to_string();
2531 state.upsert_flow(Box::new(first_flow));
2532 for i in 0..240 {
2533 state.upsert_flow(Box::new(sample_http_flow(
2534 "persist.example.com",
2535 &format!("/{}", i),
2536 "GET",
2537 200,
2538 1_700_000_020_000 + i,
2539 )));
2540 }
2541
2542 sleep(Duration::from_millis(200)).await;
2543
2544 let loaded = state.get_flow(first_id).await;
2545 assert!(loaded.is_some());
2546 }
2547
2548 #[tokio::test]
2549 async fn search_flows_redacts_summary_url_when_enabled() {
2550 let state = CoreState::new(None).await;
2551 state.update_policy_from(
2552 AuditActor::Runtime,
2553 "policy.redaction".to_string(),
2554 ProxyPolicy {
2555 redaction: RedactionPolicy {
2556 enabled: true,
2557 sensitive_query_keys: vec!["token".to_string()],
2558 redact_bodies: false,
2559 ..Default::default()
2560 },
2561 ..Default::default()
2562 },
2563 );
2564 state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2565
2566 let mut items = Vec::new();
2567 for _ in 0..20 {
2568 items = state
2569 .search_flows(FlowQuery {
2570 host: Some("api.example.com".to_string()),
2571 path_contains: Some("/private".to_string()),
2572 ..Default::default()
2573 })
2574 .await;
2575 if !items.is_empty() {
2576 break;
2577 }
2578 sleep(Duration::from_millis(20)).await;
2579 }
2580
2581 assert!(!items.is_empty());
2582 let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2583 let token = redacted
2584 .query_pairs()
2585 .find(|(k, _)| k == "token")
2586 .map(|(_, v)| v.to_string());
2587 assert_eq!(token.as_deref(), Some("[REDACTED]"));
2588 }
2589
2590 #[tokio::test]
2591 async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2592 let state = CoreState::new(Some(sqlite_url())).await;
2593 state.update_policy_from(
2594 AuditActor::Runtime,
2595 "policy.redaction".to_string(),
2596 ProxyPolicy {
2597 redaction: RedactionPolicy {
2598 enabled: true,
2599 sensitive_header_names: vec![
2600 "authorization".to_string(),
2601 "set-cookie".to_string(),
2602 ],
2603 sensitive_query_keys: vec!["token".to_string()],
2604 redact_bodies: true,
2605 },
2606 ..Default::default()
2607 },
2608 );
2609
2610 let flow = sample_sensitive_http_flow(1_700_000_200_000);
2611 let flow_id = flow.id.to_string();
2612 state.upsert_flow(Box::new(flow));
2613 sleep(Duration::from_millis(80)).await;
2614
2615 let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2616 let Layer::Http(http) = loaded.layer else {
2617 panic!("expected http layer");
2618 };
2619
2620 let auth = http
2621 .request
2622 .headers
2623 .iter()
2624 .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2625 .map(|(_, v)| v.as_str());
2626 assert_eq!(auth, Some("[REDACTED]"));
2627
2628 let req_query_token = http
2629 .request
2630 .query
2631 .iter()
2632 .find(|(k, _)| k == "token")
2633 .map(|(_, v)| v.as_str());
2634 assert_eq!(req_query_token, Some("[REDACTED]"));
2635
2636 let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2637 assert_eq!(req_body, Some("[REDACTED]"));
2638
2639 let response = http.response.expect("response should exist");
2640 let set_cookie = response
2641 .headers
2642 .iter()
2643 .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2644 .map(|(_, v)| v.as_str());
2645 assert_eq!(set_cookie, Some("[REDACTED]"));
2646 let res_body = response.body.as_ref().map(|b| b.content.as_str());
2647 assert_eq!(res_body, Some("[REDACTED]"));
2648 }
2649
2650 #[test]
2651 fn redact_flow_update_masks_http_body_when_enabled() {
2652 let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2653 let state = runtime.block_on(CoreState::new(None));
2654 state.update_policy_from(
2655 AuditActor::Runtime,
2656 "policy.redaction".to_string(),
2657 ProxyPolicy {
2658 redaction: RedactionPolicy {
2659 enabled: true,
2660 redact_bodies: true,
2661 ..Default::default()
2662 },
2663 ..Default::default()
2664 },
2665 );
2666
2667 let update = FlowUpdate::HttpBody {
2668 flow_id: "f-1".to_string(),
2669 direction: relay_core_api::flow::Direction::ClientToServer,
2670 body: BodyData {
2671 encoding: "utf-8".to_string(),
2672 content: "super-secret".to_string(),
2673 size: 12,
2674 },
2675 };
2676 let redacted = state.redact_flow_update_for_output(update);
2677 match redacted {
2678 FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2679 _ => panic!("expected http body update"),
2680 }
2681 }
2682
2683 #[cfg(feature = "script")]
2684 #[tokio::test]
2685 async fn load_script_from_records_audit_event() {
2686 let state = CoreState::new(None).await;
2687
2688 state
2689 .load_script_from(
2690 AuditActor::Tauri,
2691 "tauri.load_script".to_string(),
2692 "globalThis.onRequestHeaders = (_flow) => {};",
2693 )
2694 .await
2695 .expect("script should load");
2696
2697 let events = state.recent_audit_events();
2698 let event = events.last().expect("audit event should exist");
2699 assert_eq!(event.actor, AuditActor::Tauri);
2700 assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2701 assert_eq!(event.outcome, AuditOutcome::Success);
2702 assert_eq!(event.target, "tauri.load_script");
2703 }
2704}