Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66// ── Re-exports for user convenience ──
67// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
68pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77    pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82    pub flows_total: usize,
83    pub flows_in_memory: usize,
84    pub flows_dropped: usize,
85    pub intercepts_pending: usize,
86    pub ws_pending_messages: usize,
87    pub oldest_intercept_age_ms: Option<u64>,
88    pub oldest_ws_message_age_ms: Option<u64>,
89    pub rule_exec_errors: usize,
90    pub audit_events_total: usize,
91    pub audit_events_failed: usize,
92    pub flow_events_lagged_total: usize,
93    pub audit_events_lagged_total: usize,
94    /// O4: Total bodies degraded (budget exceeded)
95    pub proxy_body_degraded_total: usize,
96    /// O4: Total HTTP requests processed through streaming pipeline
97    pub proxy_http_request_total: usize,
98    /// O4: Total sandbox rejections
99    pub proxy_sandbox_reject_total: usize,
100    /// O4: Total invalid method rejections (strict_http_semantics)
101    pub proxy_invalid_method_total: usize,
102    /// O4: Total invalid status code rejections (strict_http_semantics)
103    pub proxy_invalid_status_total: usize,
104    /// O4: Total idempotent request retries
105    pub proxy_retry_total: usize,
106    /// O4: Total bodies processed in tap (streaming) mode
107    pub proxy_stream_mode_tap_total: usize,
108    /// O4: Total bodies degraded from tap to pass-through
109    pub proxy_stream_mode_degrade_total: usize,
110}
111
112impl CoreMetrics {
113    pub fn to_prometheus_text(&self) -> String {
114        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
115        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
116        format!(
117            "relay_core_flows_total {}\n\
118relay_core_flows_in_memory {}\n\
119relay_core_flows_dropped_total {}\n\
120relay_core_intercepts_pending {}\n\
121relay_core_ws_pending_messages {}\n\
122relay_core_oldest_intercept_age_ms {}\n\
123relay_core_oldest_ws_message_age_ms {}\n\
124relay_core_rule_exec_errors_total {}\n\
125relay_core_audit_events_total {}\n\
126relay_core_audit_events_failed_total {}\n\
127relay_core_flow_events_lagged_total {}\n\
128relay_core_audit_events_lagged_total {}\n\
129relay_core_proxy_body_degraded_total {}\n\
130relay_core_proxy_http_request_total {}\n\
131relay_core_proxy_sandbox_reject_total {}\n\
132relay_core_proxy_invalid_method_total {}\n\
133relay_core_proxy_invalid_status_total {}\n\
134relay_core_proxy_retry_total {}\n\
135relay_core_proxy_stream_mode_tap_total {}\n\
136relay_core_proxy_stream_mode_degrade_total {}\n",
137            self.flows_total,
138            self.flows_in_memory,
139            self.flows_dropped,
140            self.intercepts_pending,
141            self.ws_pending_messages,
142            oldest_intercept_age_ms,
143            oldest_ws_message_age_ms,
144            self.rule_exec_errors,
145            self.audit_events_total,
146            self.audit_events_failed,
147            self.flow_events_lagged_total,
148            self.audit_events_lagged_total,
149            self.proxy_body_degraded_total,
150            self.proxy_http_request_total,
151            self.proxy_sandbox_reject_total,
152            self.proxy_invalid_method_total,
153            self.proxy_invalid_status_total,
154            self.proxy_retry_total,
155            self.proxy_stream_mode_tap_total,
156            self.proxy_stream_mode_degrade_total,
157        )
158    }
159}
160
161#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
162pub struct CoreStatusSnapshot {
163    pub phase: RuntimeLifecyclePhase,
164    pub running: bool,
165    pub port: Option<u16>,
166    pub uptime: Option<u64>,
167    pub last_error: Option<String>,
168}
169
170#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
171pub struct CoreStatusReport {
172    pub status: CoreStatusSnapshot,
173    pub metrics: CoreMetrics,
174}
175
176#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
177pub struct CoreInterceptSnapshot {
178    pub pending_count: usize,
179    pub ws_pending_count: usize,
180}
181
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
183pub struct CoreAuditSnapshot {
184    pub events: Vec<AuditEvent>,
185}
186
187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
188pub struct CoreAuditQuery {
189    pub since_ms: Option<u64>,
190    pub until_ms: Option<u64>,
191    pub actor: Option<AuditActor>,
192    pub kind: Option<AuditEventKind>,
193    pub outcome: Option<AuditOutcome>,
194    pub limit: usize,
195}
196
197impl Default for CoreAuditQuery {
198    fn default() -> Self {
199        Self {
200            since_ms: None,
201            until_ms: None,
202            actor: None,
203            kind: None,
204            outcome: None,
205            limit: 50,
206        }
207    }
208}
209
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
211#[serde(rename_all = "snake_case")]
212pub enum RuntimeLifecyclePhase {
213    Created,
214    Starting,
215    Running,
216    Stopping,
217    Stopped,
218    Failed,
219}
220
221impl RuntimeLifecyclePhase {
222    pub fn as_str(&self) -> &'static str {
223        match self {
224            Self::Created => "created",
225            Self::Starting => "starting",
226            Self::Running => "running",
227            Self::Stopping => "stopping",
228            Self::Stopped => "stopped",
229            Self::Failed => "failed",
230        }
231    }
232
233    pub fn is_active(&self) -> bool {
234        matches!(self, Self::Starting | Self::Running | Self::Stopping)
235    }
236}
237
238#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
239pub struct RuntimeLifecycle {
240    pub phase: RuntimeLifecyclePhase,
241    pub port: Option<u16>,
242    pub started_at_ms: Option<u64>,
243    pub last_error: Option<String>,
244}
245
246impl RuntimeLifecycle {
247    pub fn created() -> Self {
248        Self {
249            phase: RuntimeLifecyclePhase::Created,
250            port: None,
251            started_at_ms: None,
252            last_error: None,
253        }
254    }
255
256    pub fn is_active(&self) -> bool {
257        self.phase.is_active()
258    }
259
260    pub fn uptime_seconds(&self) -> Option<u64> {
261        let started_at_ms = self.started_at_ms?;
262        let now_ms = now_unix_ms();
263        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
264    }
265}
266
267pub enum ProxySpawnResult {
268    Started(tokio::task::JoinHandle<()>),
269    AlreadyRunning,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub enum ProxyStopResult {
274    Stopping,
275    NotRunning,
276}
277
278impl From<RuntimeLifecycle> for CoreStatusSnapshot {
279    fn from(lifecycle: RuntimeLifecycle) -> Self {
280        Self {
281            running: lifecycle.is_active(),
282            uptime: lifecycle.uptime_seconds(),
283            port: lifecycle.port,
284            last_error: lifecycle.last_error,
285            phase: lifecycle.phase,
286        }
287    }
288}
289
290pub struct CoreState {
291    flow_store: mpsc::Sender<FlowStoreMessage>,
292    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
293    rule_store: mpsc::Sender<RuleStoreMessage>,
294    store: Option<Store>,
295    #[cfg(feature = "script")]
296    pub script_interceptor: Arc<ScriptInterceptor>,
297    pub policy_tx: watch::Sender<ProxyPolicy>,
298    pub flows_dropped: Arc<AtomicUsize>,
299    audit_events_total: Arc<AtomicUsize>,
300    audit_events_failed: Arc<AtomicUsize>,
301    flow_events_lagged_total: Arc<AtomicUsize>,
302    audit_events_lagged_total: Arc<AtomicUsize>,
303    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
304    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
305    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
306    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
307    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
308    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
309}
310
311impl CoreState {
312    pub async fn new(db_url: Option<String>) -> Self {
313        const AUDIT_HISTORY_LIMIT: usize = 200;
314
315        let store = if let Some(url) = db_url {
316            match Store::connect(&url).await {
317                Ok(s) => {
318                    if let Err(e) = s.init().await {
319                        tracing::error!("Failed to init store: {}", e);
320                    }
321                    Some(s)
322                }
323                Err(e) => {
324                    tracing::error!("Failed to connect to store: {}", e);
325                    None
326                }
327            }
328        } else {
329            None
330        };
331
332        let (flow_tx, flow_rx) = mpsc::channel(10000);
333        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
334        tokio::spawn(flow_actor.run());
335
336        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
337        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
338        tokio::spawn(intercept_actor.run());
339
340        let (rule_tx, rule_rx) = mpsc::channel(100);
341        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
342        tokio::spawn(rule_actor.run());
343
344        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
345        let (flow_broadcast_tx, _) = broadcast::channel(1000);
346        let (audit_broadcast_tx, _) = broadcast::channel(256);
347        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
348
349        #[cfg(feature = "script")]
350        let script_interceptor = ScriptInterceptor::new()
351            .await
352            .expect("Failed to initialize ScriptInterceptor");
353        Self {
354            flow_store: flow_tx,
355            intercept_broker: intercept_tx,
356            rule_store: rule_tx,
357            store,
358            #[cfg(feature = "script")]
359            script_interceptor: Arc::new(script_interceptor),
360            policy_tx,
361            flows_dropped: Arc::new(AtomicUsize::new(0)),
362            audit_events_total: Arc::new(AtomicUsize::new(0)),
363            audit_events_failed: Arc::new(AtomicUsize::new(0)),
364            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
365            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
366            flow_broadcast_tx,
367            audit_broadcast_tx,
368            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
369            lifecycle_tx,
370            shutdown_tx: Mutex::new(None),
371        }
372    }
373
374    pub async fn get_metrics(&self) -> CoreMetrics {
375        let (flow_tx, flow_rx) = oneshot::channel();
376        let _ = self
377            .flow_store
378            .send(FlowStoreMessage::GetMetrics(flow_tx))
379            .await;
380        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
381
382        let (int_tx, int_rx) = oneshot::channel();
383        let _ = self
384            .intercept_broker
385            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
386            .await;
387        let (
388            intercepts_pending,
389            ws_pending_messages,
390            oldest_intercept_age_ms,
391            oldest_ws_message_age_ms,
392        ) = int_rx.await.unwrap_or((0, 0, None, None));
393
394        let (rule_tx, rule_rx) = oneshot::channel();
395        let _ = self
396            .rule_store
397            .send(RuleStoreMessage::GetMetrics(rule_tx))
398            .await;
399        let rule_exec_errors = rule_rx.await.unwrap_or(0);
400
401        CoreMetrics {
402            flows_total,
403            flows_in_memory,
404            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
405            intercepts_pending,
406            ws_pending_messages,
407            oldest_intercept_age_ms,
408            oldest_ws_message_age_ms,
409            rule_exec_errors,
410            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
411            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
412            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
413            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
414            proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
415            proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
416            proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
417            proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
418            proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
419            proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
420            proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
421            proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
422            ),
423        }
424    }
425
426    pub async fn get_metrics_prometheus_text(&self) -> String {
427        let mut text = self.get_metrics().await.to_prometheus_text();
428        #[cfg(feature = "script")]
429        {
430            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
431        }
432        text
433    }
434
435    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
436        self.lifecycle().into()
437    }
438
439    pub async fn status_report(&self) -> CoreStatusReport {
440        CoreStatusReport {
441            status: self.status_snapshot(),
442            metrics: self.get_metrics().await,
443        }
444    }
445
446    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
447        let metrics = self.get_metrics().await;
448        CoreInterceptSnapshot {
449            pending_count: metrics.intercepts_pending,
450            ws_pending_count: metrics.ws_pending_messages,
451        }
452    }
453
454    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
455        let events = self.recent_audit_events();
456        let start = events.len().saturating_sub(limit);
457        CoreAuditSnapshot {
458            events: events.into_iter().skip(start).collect(),
459        }
460    }
461
462    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
463        let limit = query.limit.clamp(1, 500);
464        if let Some(store) = &self.store {
465            let rows = store
466                .query_audit_events(
467                    query.since_ms,
468                    query.until_ms,
469                    query.actor.as_ref().map(AuditActor::as_str),
470                    query.kind.as_ref().map(AuditEventKind::as_str),
471                    query.outcome.as_ref().map(AuditOutcome::as_str),
472                    limit,
473                )
474                .await
475                .unwrap_or_default();
476
477            let mut events = Vec::with_capacity(rows.len());
478            for row in rows {
479                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
480                    events.push(event);
481                }
482            }
483            return CoreAuditSnapshot { events };
484        }
485
486        let mut events = self.recent_audit_events();
487        events.reverse();
488        let filtered = events
489            .into_iter()
490            .filter(|event| {
491                query
492                    .since_ms
493                    .map(|v| event.timestamp_ms >= v)
494                    .unwrap_or(true)
495            })
496            .filter(|event| {
497                query
498                    .until_ms
499                    .map(|v| event.timestamp_ms <= v)
500                    .unwrap_or(true)
501            })
502            .filter(|event| {
503                query
504                    .actor
505                    .as_ref()
506                    .map(|v| &event.actor == v)
507                    .unwrap_or(true)
508            })
509            .filter(|event| {
510                query
511                    .kind
512                    .as_ref()
513                    .map(|v| &event.kind == v)
514                    .unwrap_or(true)
515            })
516            .filter(|event| {
517                query
518                    .outcome
519                    .as_ref()
520                    .map(|v| &event.outcome == v)
521                    .unwrap_or(true)
522            })
523            .take(limit)
524            .collect();
525        CoreAuditSnapshot { events: filtered }
526    }
527
528    pub async fn get_flow(&self, id: String) -> Option<Flow> {
529        let (tx, rx) = oneshot::channel();
530        if let Err(e) = self
531            .flow_store
532            .send(FlowStoreMessage::GetFlow {
533                id: id.clone(),
534                respond_to: tx,
535            })
536            .await
537        {
538            error!("Failed to send GetFlow request: {}", e);
539            if let Some(store) = &self.store {
540                return store
541                    .load_flow(&id)
542                    .await
543                    .ok()
544                    .flatten()
545                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
546            }
547            return None;
548        }
549        let flow = rx
550            .await
551            .map_err(|e| {
552                error!("Failed to receive Flow response: {}", e);
553                e
554            })
555            .unwrap_or(None);
556        if let Some(flow) = flow {
557            return Some(redact_flow(flow, &self.current_redaction_policy()));
558        }
559        if let Some(store) = &self.store {
560            return store
561                .load_flow(&id)
562                .await
563                .ok()
564                .flatten()
565                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
566                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
567        }
568        None
569    }
570
571    pub async fn get_rules(&self) -> Vec<Rule> {
572        let (tx, rx) = oneshot::channel();
573        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
574            error!("Failed to send GetRules request: {}", e);
575            return Vec::new();
576        }
577        rx.await
578            .map_err(|e| {
579                error!("Failed to receive Rules response: {}", e);
580                e
581            })
582            .unwrap_or_default()
583    }
584
585    pub async fn set_rules(&self, rules: Vec<Rule>) {
586        let _ = self
587            .set_rules_from(
588                AuditActor::Runtime,
589                "rules.replace",
590                "rule_set".to_string(),
591                json!({}),
592                rules,
593            )
594            .await;
595    }
596
597    pub async fn upsert_rule_from(
598        &self,
599        actor: AuditActor,
600        operation: &str,
601        target: String,
602        details: serde_json::Value,
603        rule: Rule,
604    ) -> Result<(), String> {
605        let rule_id = rule.id.clone();
606        let mut rules = self.get_rules().await;
607        rules.retain(|existing| existing.id != rule_id);
608        rules.push(rule);
609        self.set_rules_from(actor, operation, target, details, rules)
610            .await
611    }
612
613    pub async fn delete_rule_from(
614        &self,
615        actor: AuditActor,
616        operation: &str,
617        target: String,
618        details: serde_json::Value,
619        rule_id: &str,
620    ) -> Result<bool, String> {
621        let mut rules = self.get_rules().await;
622        let before = rules.len();
623        rules.retain(|rule| rule.id != rule_id);
624        if rules.len() == before {
625            return Ok(false);
626        }
627        self.set_rules_from(actor, operation, target, details, rules)
628            .await
629            .map(|_| true)
630    }
631
632    pub async fn create_mock_response_rule_from(
633        &self,
634        actor: AuditActor,
635        target: String,
636        details: serde_json::Value,
637        config: MockResponseRuleConfig,
638    ) -> Result<String, String> {
639        let rule_id = config.rule_id.clone();
640        let rule = build_mock_response_rule(config);
641        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
642            .await
643            .map(|_| rule_id)
644    }
645
646    pub async fn create_intercept_rule_from(
647        &self,
648        actor: AuditActor,
649        target: String,
650        details: serde_json::Value,
651        config: InterceptRuleConfig,
652    ) -> Result<String, String> {
653        let rule_id = config.rule_id.clone();
654        let mut rules = self.get_rules().await;
655        rules.extend(build_intercept_rules(config));
656        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
657            .await
658            .map(|_| rule_id)
659    }
660
661    pub async fn upsert_legacy_intercept_rule_from(
662        &self,
663        actor: AuditActor,
664        target: String,
665        details: serde_json::Value,
666        rule: InterceptRule,
667    ) -> Result<(), String> {
668        let rule_id = rule.id.clone();
669        let mut rules = self.get_rules().await;
670        rules.retain(|existing| {
671            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
672        });
673        rules.extend(rule.to_rules());
674        self.set_rules_from(
675            actor,
676            "rule.intercept_legacy_upsert",
677            target,
678            details,
679            rules,
680        )
681        .await
682    }
683
684    pub async fn set_rules_from(
685        &self,
686        actor: AuditActor,
687        operation: &str,
688        target: String,
689        details: serde_json::Value,
690        rules: Vec<Rule>,
691    ) -> Result<(), String> {
692        let rule_count = rules.len();
693        if let Err(e) = self
694            .rule_store
695            .send(RuleStoreMessage::SetRules(rules))
696            .await
697        {
698            error!("Failed to send SetRules request: {}", e);
699            self.record_audit_event(AuditEvent::new(
700                actor,
701                AuditEventKind::RuleChanged,
702                target,
703                AuditOutcome::Failed,
704                json!({
705                    "operation": operation,
706                    "rule_count": rule_count,
707                    "details": details,
708                    "error": e.to_string()
709                }),
710            ));
711            return Err(e.to_string());
712        }
713
714        self.record_audit_event(AuditEvent::new(
715            actor,
716            AuditEventKind::RuleChanged,
717            target,
718            AuditOutcome::Success,
719            json!({
720                "operation": operation,
721                "rule_count": rule_count,
722                "details": details
723            }),
724        ));
725        Ok(())
726    }
727
728    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
729        let mut new_rules = Vec::new();
730        for rule in rules {
731            new_rules.extend(rule.to_rules());
732        }
733        self.set_rules(new_rules).await;
734    }
735
736    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
737        let (tx, rx) = oneshot::channel();
738        if let Err(e) = self
739            .rule_store
740            .send(RuleStoreMessage::GetRuleEngine(tx))
741            .await
742        {
743            error!("Failed to send GetRuleEngine request: {}", e);
744            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
745        }
746        rx.await
747            .map_err(|e| {
748                error!("Failed to receive RuleEngine response: {}", e);
749                e
750            })
751            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
752    }
753
754    pub fn update_policy(&self, policy: ProxyPolicy) {
755        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
756    }
757
758    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
759        let mut policy = self.policy_snapshot();
760        policy.apply_patch(patch);
761        self.update_policy_from(actor, target, policy);
762    }
763
764    pub fn policy_snapshot(&self) -> ProxyPolicy {
765        self.policy_tx.borrow().clone()
766    }
767
768    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
769        let details = json!({
770            "strict_http_semantics": policy.strict_http_semantics,
771            "request_timeout_ms": policy.request_timeout_ms,
772            "max_body_size": policy.max_body_size,
773            "transparent_enabled": policy.transparent_enabled,
774            "redaction_enabled": policy.redaction.enabled,
775            "redact_bodies": policy.redaction.redact_bodies
776        });
777
778        self.policy_tx.send_replace(policy);
779        self.record_audit_event(AuditEvent::new(
780            actor,
781            AuditEventKind::PolicyUpdated,
782            target,
783            AuditOutcome::Success,
784            details,
785        ));
786    }
787
788    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
789        if let Err(e) = self
790            .intercept_broker
791            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
792            .await
793        {
794            error!("Failed to send RegisterIntercept request: {}", e);
795        }
796    }
797
798    pub async fn resolve_intercept(
799        &self,
800        key: String,
801        result: InterceptionResult,
802    ) -> Result<(), String> {
803        let (tx, rx) = oneshot::channel();
804        if let Err(e) = self
805            .intercept_broker
806            .send(InterceptBrokerMessage::ResolveIntercept {
807                key,
808                result,
809                respond_to: tx,
810            })
811            .await
812        {
813            error!("Failed to send ResolveIntercept request: {}", e);
814            return Err(e.to_string());
815        }
816        rx.await.map_err(|_| "Actor dropped".to_string())?
817    }
818
819    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
820        let (tx, rx) = oneshot::channel();
821        if let Err(e) = self
822            .intercept_broker
823            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
824                key,
825                respond_to: tx,
826            })
827            .await
828        {
829            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
830            return None;
831        }
832        rx.await
833            .map_err(|e| {
834                error!("Failed to receive WebSocketMessage response: {}", e);
835                e
836            })
837            .unwrap_or(None)
838    }
839
840    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
841        if let Err(e) = self
842            .intercept_broker
843            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
844            .await
845        {
846            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
847        }
848    }
849
850    pub async fn is_intercept_pending(&self, key: String) -> bool {
851        let (tx, rx) = oneshot::channel();
852        if let Err(e) = self
853            .intercept_broker
854            .send(InterceptBrokerMessage::GetPendingIntercept {
855                key,
856                respond_to: tx,
857            })
858            .await
859        {
860            error!("Failed to send GetPendingIntercept request: {}", e);
861            return false;
862        }
863        rx.await
864            .map_err(|e| {
865                error!("Failed to receive PendingIntercept response: {}", e);
866                e
867            })
868            .unwrap_or(false)
869    }
870
871    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
872        let (tx, rx) = oneshot::channel();
873        if let Err(e) = self
874            .intercept_broker
875            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
876                flow_id,
877                respond_to: tx,
878            })
879            .await
880        {
881            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
882            return false;
883        }
884        rx.await
885            .map_err(|e| {
886                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
887                e
888            })
889            .unwrap_or(false)
890    }
891
892    pub fn upsert_flow(&self, flow: Box<Flow>) {
893        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
894            // Drop flow if channel is full
895            error!("FlowStore dropped flow: {}", e);
896            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
897        }
898    }
899
900    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
901        if let Err(e) = self
902            .flow_store
903            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
904        {
905            error!("FlowStore dropped WS message: {}", e);
906            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
907        }
908    }
909
910    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
911        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
912            flow_id,
913            body,
914            direction,
915        }) {
916            error!("FlowStore dropped HTTP body: {}", e);
917            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
918        }
919    }
920
921    /// P1: Tag a flow as budget-exceeded (body too large for full rule inspection)
922    pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
923        if let Err(e) = self
924            .flow_store
925            .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
926        {
927            error!("FlowStore dropped budget-exceeded tag: {}", e);
928            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
929        }
930    }
931
932    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
933    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
934        self.flow_broadcast_tx.subscribe()
935    }
936
937    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
938        self.audit_broadcast_tx.subscribe()
939    }
940
941    fn current_redaction_policy(&self) -> RedactionPolicy {
942        self.policy_tx.borrow().redaction.clone()
943    }
944
945    pub fn record_flow_events_lagged(&self, skipped: u64) {
946        self.flow_events_lagged_total
947            .fetch_add(skipped as usize, Ordering::Relaxed);
948    }
949
950    pub fn record_audit_events_lagged(&self, skipped: u64) {
951        self.audit_events_lagged_total
952            .fetch_add(skipped as usize, Ordering::Relaxed);
953    }
954
955    pub fn lifecycle(&self) -> RuntimeLifecycle {
956        self.lifecycle_tx.borrow().clone()
957    }
958
959    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
960        self.lifecycle_tx.subscribe()
961    }
962
963    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
964        let current = self.lifecycle();
965        if current.is_active() {
966            return Err(format!(
967                "Proxy is already {} on port {}",
968                current.phase.as_str(),
969                current.port.unwrap_or(port)
970            ));
971        }
972
973        let mut guard = self
974            .shutdown_tx
975            .lock()
976            .map_err(|_| "shutdown state poisoned".to_string())?;
977        *guard = Some(shutdown_tx);
978        drop(guard);
979
980        self.update_lifecycle(RuntimeLifecycle {
981            phase: RuntimeLifecyclePhase::Starting,
982            port: Some(port),
983            started_at_ms: None,
984            last_error: None,
985        });
986        Ok(())
987    }
988
989    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
990        let mut guard = self
991            .shutdown_tx
992            .lock()
993            .map_err(|_| "shutdown state poisoned".to_string())?;
994        let Some(tx) = guard.take() else {
995            return Ok(ProxyStopResult::NotRunning);
996        };
997        drop(guard);
998
999        let current = self.lifecycle();
1000        self.update_lifecycle(RuntimeLifecycle {
1001            phase: RuntimeLifecyclePhase::Stopping,
1002            port: current.port,
1003            started_at_ms: current.started_at_ms,
1004            last_error: current.last_error,
1005        });
1006        let _ = tx.send(());
1007        Ok(ProxyStopResult::Stopping)
1008    }
1009
1010    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1011        self.audit_history
1012            .lock()
1013            .map(|events| events.iter().cloned().collect())
1014            .unwrap_or_default()
1015    }
1016
1017    /// 搜索内存中的 Flow 列表,返回轻量摘要。
1018    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1019        let redaction = self.current_redaction_policy();
1020        if let Some(store) = &self.store {
1021            return store
1022                .query_flow_summaries(&query)
1023                .await
1024                .unwrap_or_default()
1025                .into_iter()
1026                .map(|summary| redact_flow_summary(summary, &redaction))
1027                .collect();
1028        }
1029        let (tx, rx) = oneshot::channel();
1030        if let Err(e) = self
1031            .flow_store
1032            .send(FlowStoreMessage::SearchFlows {
1033                query,
1034                respond_to: tx,
1035            })
1036            .await
1037        {
1038            error!("Failed to send SearchFlows request: {}", e);
1039            return Vec::new();
1040        }
1041        rx.await
1042            .unwrap_or_default()
1043            .into_iter()
1044            .map(|summary| redact_flow_summary(summary, &redaction))
1045            .collect()
1046    }
1047
1048    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1049        let redaction = self.current_redaction_policy();
1050        redact_flow_update(update, &redaction)
1051    }
1052
1053    /// 解除截获并可选地应用修改。
1054    ///
1055    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1056    /// - 若 `mods` 为 None,直接 Continue;
1057    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1058    ///
1059    /// `key` 格式约定:
1060    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1061    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1062    pub async fn resolve_intercept_with_modifications(
1063        &self,
1064        key: String,
1065        action: &str,
1066        mods: Option<relay_core_api::modification::FlowModification>,
1067    ) -> Result<(), String> {
1068        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1069            .await
1070    }
1071
1072    pub async fn resolve_intercept_with_modifications_from(
1073        &self,
1074        actor: AuditActor,
1075        key: String,
1076        action: &str,
1077        mods: Option<relay_core_api::modification::FlowModification>,
1078    ) -> Result<(), String> {
1079        let modified_fields = modification_field_names(mods.as_ref());
1080        let result = match action {
1081            "drop" => InterceptionResult::Drop,
1082            _ => match mods {
1083                None => InterceptionResult::Continue,
1084                Some(m) => {
1085                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1086                    match parts.as_slice() {
1087                        [flow_id, phase] => {
1088                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1089                                modification::apply_flow_modification(&flow, phase, m)
1090                            } else {
1091                                InterceptionResult::Continue
1092                            }
1093                        }
1094                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1095                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1096                        [_, "ws_msg", _] => {
1097                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1098                                modification::apply_ws_modification(&msg, m)
1099                            } else {
1100                                InterceptionResult::Continue
1101                            }
1102                        }
1103                        _ => InterceptionResult::Continue,
1104                    }
1105                }
1106            },
1107        };
1108        let outcome = self.resolve_intercept(key.clone(), result).await;
1109        let audit_outcome = if outcome.is_ok() {
1110            AuditOutcome::Success
1111        } else {
1112            AuditOutcome::Failed
1113        };
1114        let error_message = outcome.as_ref().err().cloned();
1115
1116        self.record_audit_event(AuditEvent::new(
1117            actor,
1118            AuditEventKind::InterceptResolved,
1119            key,
1120            audit_outcome,
1121            json!({
1122                "action": action,
1123                "has_modifications": !modified_fields.is_empty(),
1124                "modified_fields": modified_fields,
1125                "error": error_message
1126            }),
1127        ));
1128        outcome
1129    }
1130
1131    #[cfg(feature = "script")]
1132    pub async fn load_script_from(
1133        &self,
1134        actor: AuditActor,
1135        target: String,
1136        script: &str,
1137    ) -> Result<(), String> {
1138        let result = self
1139            .script_interceptor
1140            .load_script(script)
1141            .await
1142            .map_err(|e| e.to_string());
1143
1144        let outcome = if result.is_ok() {
1145            AuditOutcome::Success
1146        } else {
1147            AuditOutcome::Failed
1148        };
1149        let error_message = result.as_ref().err().cloned();
1150
1151        self.record_audit_event(AuditEvent::new(
1152            actor,
1153            AuditEventKind::ScriptReloaded,
1154            target,
1155            outcome,
1156            json!({
1157                "script_bytes": script.len(),
1158                "error": error_message
1159            }),
1160        ));
1161
1162        result
1163    }
1164
1165    /// S5: Set the env var whitelist for relay.env() in user scripts.
1166    #[cfg(feature = "script")]
1167    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1168        self.script_interceptor.set_env_allow(env_allow).await;
1169    }
1170
1171    fn record_audit_event(&self, event: AuditEvent) {
1172        const AUDIT_HISTORY_LIMIT: usize = 200;
1173        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1174        if event.outcome == AuditOutcome::Failed {
1175            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1176        }
1177
1178        let details = event.details.to_string();
1179        tracing::info!(
1180            target: "relay_core_audit",
1181            event_id = %event.id,
1182            actor = %event.actor.as_str(),
1183            kind = %event.kind.as_str(),
1184            target = %event.target,
1185            outcome = %event.outcome.as_str(),
1186            details = %details
1187        );
1188
1189        if let Ok(mut history) = self.audit_history.lock() {
1190            if history.len() >= AUDIT_HISTORY_LIMIT {
1191                history.pop_front();
1192            }
1193            history.push_back(event.clone());
1194        }
1195
1196        if let Some(store) = self.store.clone() {
1197            let event_json = serde_json::to_value(&event).unwrap_or_default();
1198            let event_id = event.id.clone();
1199            let timestamp_ms = event.timestamp_ms;
1200            let actor = event.actor.as_str().to_string();
1201            let kind = event.kind.as_str().to_string();
1202            let target = event.target.clone();
1203            let outcome = event.outcome.as_str().to_string();
1204            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1205                handle.spawn(async move {
1206                    if let Err(e) = store
1207                        .save_audit_event(AuditEventRecord {
1208                            id: &event_id,
1209                            timestamp_ms,
1210                            actor: &actor,
1211                            kind: &kind,
1212                            target: &target,
1213                            outcome: &outcome,
1214                            content: &event_json,
1215                        })
1216                        .await
1217                    {
1218                        tracing::error!("Failed to persist audit event: {}", e);
1219                    }
1220                });
1221            }
1222        }
1223
1224        let _ = self.audit_broadcast_tx.send(event);
1225    }
1226
1227    fn transition_to_running(&self, port: u16) {
1228        self.update_lifecycle(RuntimeLifecycle {
1229            phase: RuntimeLifecyclePhase::Running,
1230            port: Some(port),
1231            started_at_ms: Some(now_unix_ms()),
1232            last_error: None,
1233        });
1234    }
1235
1236    fn transition_to_stopped(&self) {
1237        if let Ok(mut guard) = self.shutdown_tx.lock() {
1238            *guard = None;
1239        }
1240
1241        self.update_lifecycle(RuntimeLifecycle {
1242            phase: RuntimeLifecyclePhase::Stopped,
1243            port: None,
1244            started_at_ms: None,
1245            last_error: None,
1246        });
1247    }
1248
1249    fn transition_to_failed(&self, port: u16, error: String) {
1250        if let Ok(mut guard) = self.shutdown_tx.lock() {
1251            *guard = None;
1252        }
1253
1254        self.update_lifecycle(RuntimeLifecycle {
1255            phase: RuntimeLifecyclePhase::Failed,
1256            port: Some(port),
1257            started_at_ms: None,
1258            last_error: Some(error),
1259        });
1260    }
1261
1262    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1263        tracing::info!(
1264            target: "relay_core_lifecycle",
1265            phase = %lifecycle.phase.as_str(),
1266            port = ?lifecycle.port,
1267            started_at_ms = ?lifecycle.started_at_ms,
1268            last_error = ?lifecycle.last_error
1269        );
1270        self.lifecycle_tx.send_replace(lifecycle);
1271    }
1272
1273    pub fn spawn_proxy(
1274        self: &Arc<Self>,
1275        config: ProxyConfig,
1276        sink: mpsc::Sender<FlowUpdate>,
1277        extra_interceptor: Option<Arc<dyn Interceptor>>,
1278    ) -> Result<ProxySpawnResult, String> {
1279        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1280        match self.prepare_start(config.port, shutdown_tx) {
1281            Ok(()) => {}
1282            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1283            Err(error) => return Err(error),
1284        }
1285        let state = self.clone();
1286        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1287            if let Err(error) = state
1288                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1289                .await
1290            {
1291                error!("Proxy failed: {}", error);
1292            }
1293        })))
1294    }
1295
1296    pub async fn start_proxy(
1297        self: &Arc<Self>,
1298        config: ProxyConfig,
1299        sink: mpsc::Sender<FlowUpdate>,
1300        extra_interceptor: Option<Arc<dyn Interceptor>>,
1301    ) -> Result<(), String> {
1302        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1303        self.prepare_start(config.port, shutdown_tx)?;
1304        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1305            .await
1306    }
1307
1308    async fn run_proxy(
1309        self: &Arc<Self>,
1310        config: ProxyConfig,
1311        sink: mpsc::Sender<FlowUpdate>,
1312        extra_interceptor: Option<Arc<dyn Interceptor>>,
1313        shutdown_rx: oneshot::Receiver<()>,
1314    ) -> Result<(), String> {
1315        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1316        let state = self.clone();
1317
1318        if let Some(parent) = config.ca_cert_path.parent()
1319            && !parent.exists()
1320        {
1321            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1322        }
1323
1324        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1325            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1326        let ca = Arc::new(ca);
1327
1328        #[cfg(feature = "script")]
1329        let script_interceptor = self.script_interceptor.clone();
1330
1331        #[cfg(feature = "script")]
1332        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1333
1334        #[cfg(not(feature = "script"))]
1335        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1336
1337        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1338            self.clone(),
1339        )));
1340
1341        if let Some(interceptor) = extra_interceptor {
1342            interceptors.push(interceptor);
1343        }
1344
1345        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1346        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1347
1348        tokio::spawn(async move {
1349            while let Some(update) = proxy_rx.recv().await {
1350                match update.clone() {
1351                    FlowUpdate::Full(flow) => {
1352                        state.upsert_flow(flow);
1353                    }
1354                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1355                        state.append_ws_message(flow_id, message);
1356                    }
1357                    FlowUpdate::HttpBody {
1358                        flow_id,
1359                        direction,
1360                        body,
1361                    } => {
1362                        state.update_http_body(flow_id, body, direction);
1363                    }
1364                    FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1365                        // P1: Tag the flow as budget-exceeded and update resilience trace
1366                        state.tag_flow_budget_exceeded(flow_id, direction);
1367                    }
1368                }
1369
1370                let _ = state.flow_broadcast_tx.send(update.clone());
1371
1372                if sink.try_send(update).is_err() {
1373                    relay_core_lib::metrics::inc_flows_dropped();
1374                }
1375            }
1376        });
1377
1378        if let Some(udp_port) = config.udp_tproxy_port {
1379            let udp_proxy_tx = proxy_tx.clone();
1380            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1381            tokio::spawn(async move {
1382                match tokio::net::UdpSocket::bind(udp_addr).await {
1383                    Ok(socket) => {
1384                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1385                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1386                            error!("UDP TPROXY failed: {}", e);
1387                        }
1388                    }
1389                    Err(e) => {
1390                        error!("Failed to bind UDP TPROXY socket: {}", e);
1391                    }
1392                }
1393            });
1394        }
1395
1396        let listener = match TcpListener::bind(addr).await {
1397            Ok(listener) => listener,
1398            Err(e) => {
1399                if let Ok(mut guard) = self.shutdown_tx.lock() {
1400                    *guard = None;
1401                }
1402                let message = format!("Failed to bind to address {}: {}", addr, e);
1403                self.transition_to_failed(config.port, message.clone());
1404                return Err(message);
1405            }
1406        };
1407
1408        self.transition_to_running(config.port);
1409        let shutdown_rx = Some(shutdown_rx);
1410
1411        if config.transparent {
1412            let policy = ProxyPolicy {
1413                transparent_enabled: true,
1414                ..Default::default()
1415            };
1416            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1417            let policy_rx = self.policy_tx.subscribe();
1418
1419            let provider: Arc<dyn OriginalDstProvider> = {
1420                let mut addrs = BTreeSet::new();
1421                if let Ok(local) = listener.local_addr() {
1422                    addrs.insert(local);
1423                }
1424
1425                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1426                {
1427                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1428                }
1429                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1430                {
1431                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1432                        Ok(provider) => Arc::new(provider),
1433                        Err(e) => {
1434                            error!("Failed to initialize macOS PF provider: {}", e);
1435                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1436                        }
1437                    }
1438                }
1439                #[cfg(target_os = "windows")]
1440                {
1441                    let filter =
1442                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1443                            .to_string();
1444                    let port = config.port;
1445                    tokio::spawn(async move {
1446                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1447                            .await;
1448                    });
1449
1450                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1451                }
1452                #[cfg(not(any(
1453                    all(target_os = "linux", feature = "transparent-linux"),
1454                    all(target_os = "macos", feature = "transparent-macos"),
1455                    target_os = "windows"
1456                )))]
1457                {
1458                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1459                }
1460            };
1461
1462            let source = TransparentTcpCaptureSource::new(listener, provider);
1463            let result = relay_core_lib::start_proxy(
1464                source,
1465                proxy_tx,
1466                interceptor,
1467                ca,
1468                policy_rx,
1469                None,
1470                shutdown_rx,
1471                None,
1472            )
1473            .await
1474            .map_err(|e| e.to_string());
1475            if let Err(error) = &result {
1476                self.transition_to_failed(config.port, error.clone());
1477            } else {
1478                self.transition_to_stopped();
1479            }
1480            result
1481        } else {
1482            let source = TcpCaptureSource::new(listener);
1483            let policy = ProxyPolicy::default();
1484            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1485            let policy_rx = self.policy_tx.subscribe();
1486
1487            let result = relay_core_lib::start_proxy(
1488                source,
1489                proxy_tx,
1490                interceptor,
1491                ca,
1492                policy_rx,
1493                None,
1494                shutdown_rx,
1495                None,
1496            )
1497            .await
1498            .map_err(|e| e.to_string());
1499            if let Err(error) = &result {
1500                self.transition_to_failed(config.port, error.clone());
1501            } else {
1502                self.transition_to_stopped();
1503            }
1504            result
1505        }
1506    }
1507}
1508
1509fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1510    match update {
1511        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1512        FlowUpdate::WebSocketMessage {
1513            flow_id,
1514            mut message,
1515        } => {
1516            message.content = redact_body(message.content, redaction);
1517            FlowUpdate::WebSocketMessage { flow_id, message }
1518        }
1519        FlowUpdate::HttpBody {
1520            flow_id,
1521            direction,
1522            body,
1523        } => FlowUpdate::HttpBody {
1524            flow_id,
1525            direction,
1526            body: redact_body(body, redaction),
1527        },
1528        FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1529            FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1530        }
1531    }
1532}
1533
1534fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1535    if !redaction.enabled {
1536        return flow;
1537    }
1538    match &mut flow.layer {
1539        Layer::Http(http) => {
1540            redact_http_request(&mut http.request, redaction);
1541            if let Some(response) = &mut http.response {
1542                redact_headers(&mut response.headers, redaction);
1543                response.body = response
1544                    .body
1545                    .take()
1546                    .map(|body| redact_body(body, redaction));
1547            }
1548        }
1549        Layer::WebSocket(ws) => {
1550            redact_http_request(&mut ws.handshake_request, redaction);
1551            redact_headers(&mut ws.handshake_response.headers, redaction);
1552            ws.handshake_response.body = ws
1553                .handshake_response
1554                .body
1555                .take()
1556                .map(|body| redact_body(body, redaction));
1557            for message in &mut ws.messages {
1558                message.content = redact_body(message.content.clone(), redaction);
1559            }
1560        }
1561        _ => {}
1562    }
1563    flow
1564}
1565
1566fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1567    if !redaction.enabled {
1568        return summary;
1569    }
1570    summary.url = redact_url_string(&summary.url, redaction);
1571    summary
1572}
1573
1574fn redact_http_request(
1575    request: &mut relay_core_api::flow::HttpRequest,
1576    redaction: &RedactionPolicy,
1577) {
1578    redact_headers(&mut request.headers, redaction);
1579    redact_query_pairs(&mut request.query, redaction);
1580    request.url = redact_url(&request.url, redaction);
1581    request.body = request.body.take().map(|body| redact_body(body, redaction));
1582}
1583
1584fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1585    if !redaction.enabled {
1586        return;
1587    }
1588    let sensitive = redaction_set(&redaction.sensitive_header_names);
1589    for (name, value) in headers.iter_mut() {
1590        if sensitive.contains(&name.to_ascii_lowercase()) {
1591            *value = "[REDACTED]".to_string();
1592        }
1593    }
1594}
1595
1596fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1597    if !redaction.enabled {
1598        return;
1599    }
1600    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1601    for (name, value) in query.iter_mut() {
1602        if sensitive.contains(&name.to_ascii_lowercase()) {
1603            *value = "[REDACTED]".to_string();
1604        }
1605    }
1606}
1607
1608fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1609    if !redaction.enabled {
1610        return url.clone();
1611    }
1612    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1613    let pairs: Vec<(String, String)> = url
1614        .query_pairs()
1615        .map(|(k, v)| {
1616            let key = k.to_string();
1617            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1618                "[REDACTED]".to_string()
1619            } else {
1620                v.to_string()
1621            };
1622            (key, value)
1623        })
1624        .collect();
1625    let mut next = url.clone();
1626    if pairs.is_empty() {
1627        return next;
1628    }
1629    next.query_pairs_mut().clear();
1630    for (k, v) in pairs {
1631        next.query_pairs_mut().append_pair(&k, &v);
1632    }
1633    next
1634}
1635
1636fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1637    match url::Url::parse(input) {
1638        Ok(url) => redact_url(&url, redaction).to_string(),
1639        Err(_) => input.to_string(),
1640    }
1641}
1642
1643fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1644    if redaction.enabled && redaction.redact_bodies {
1645        body.content = "[REDACTED]".to_string();
1646    }
1647    body
1648}
1649
1650fn redaction_set(values: &[String]) -> HashSet<String> {
1651    values
1652        .iter()
1653        .map(|value| value.to_ascii_lowercase())
1654        .collect()
1655}
1656
1657#[derive(Clone)]
1658pub struct ProxyConfig {
1659    pub port: u16,
1660    pub ca_cert_path: std::path::PathBuf,
1661    pub ca_key_path: std::path::PathBuf,
1662    pub transparent: bool,
1663    pub udp_tproxy_port: Option<u16>,
1664}
1665
1666impl ProxyConfig {
1667    pub fn new(
1668        port: u16,
1669        ca_cert_path: std::path::PathBuf,
1670        ca_key_path: std::path::PathBuf,
1671    ) -> Self {
1672        Self {
1673            port,
1674            ca_cert_path,
1675            ca_key_path,
1676            transparent: false,
1677            udp_tproxy_port: None,
1678        }
1679    }
1680
1681    pub fn from_app_data_dir(
1682        app_data_dir: impl Into<std::path::PathBuf>,
1683        port: u16,
1684    ) -> Result<Self, String> {
1685        let app_data_dir = app_data_dir.into();
1686        if !app_data_dir.exists() {
1687            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1688        }
1689
1690        Ok(Self::new(
1691            port,
1692            app_data_dir.join("ca_cert.pem"),
1693            app_data_dir.join("ca_key.pem"),
1694        ))
1695    }
1696
1697    pub fn with_transparent(mut self, transparent: bool) -> Self {
1698        self.transparent = transparent;
1699        self
1700    }
1701
1702    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1703        self.udp_tproxy_port = udp_tproxy_port;
1704        self
1705    }
1706}
1707
1708fn now_unix_ms() -> u64 {
1709    SystemTime::now()
1710        .duration_since(UNIX_EPOCH)
1711        .unwrap_or_default()
1712        .as_millis() as u64
1713}
1714
1715fn modification_field_names(
1716    mods: Option<&relay_core_api::modification::FlowModification>,
1717) -> Vec<&'static str> {
1718    let Some(mods) = mods else {
1719        return Vec::new();
1720    };
1721
1722    let mut fields = Vec::new();
1723    if mods.method.is_some() {
1724        fields.push("method");
1725    }
1726    if mods.url.is_some() {
1727        fields.push("url");
1728    }
1729    if mods.request_headers.is_some() {
1730        fields.push("request_headers");
1731    }
1732    if mods.request_body.is_some() {
1733        fields.push("request_body");
1734    }
1735    if mods.status_code.is_some() {
1736        fields.push("status_code");
1737    }
1738    if mods.response_headers.is_some() {
1739        fields.push("response_headers");
1740    }
1741    if mods.response_body.is_some() {
1742        fields.push("response_body");
1743    }
1744    if mods.message_content.is_some() {
1745        fields.push("message_content");
1746    }
1747    fields
1748}
1749
1750#[cfg(test)]
1751mod tests {
1752    use super::*;
1753    use chrono::Utc;
1754    use relay_core_api::flow::{
1755        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1756        ResponseTiming, TransportProtocol,
1757    };
1758    use std::sync::atomic::{AtomicU64, Ordering};
1759    use tokio::time::{Duration, sleep};
1760    use url::Url;
1761    use uuid::Uuid;
1762
1763    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1764
1765    fn sqlite_url() -> String {
1766        let nanos = SystemTime::now()
1767            .duration_since(UNIX_EPOCH)
1768            .expect("clock drift")
1769            .as_nanos();
1770        let pid = std::process::id();
1771        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1772        let db_dir = std::env::current_dir()
1773            .expect("cwd")
1774            .join("target")
1775            .join("test-dbs");
1776        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1777        let db_path = db_dir.join(format!(
1778            "relay-core-runtime-test-{}-{}-{}.db",
1779            pid, nanos, seq
1780        ));
1781        format!("sqlite://{}?mode=rwc", db_path.display())
1782    }
1783
1784    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1785        let start_time =
1786            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1787        let request_url =
1788            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1789        Flow {
1790            id: Uuid::new_v4(),
1791            start_time,
1792            end_time: Some(start_time),
1793            network: NetworkInfo {
1794                client_ip: "127.0.0.1".to_string(),
1795                client_port: 12000,
1796                server_ip: "127.0.0.1".to_string(),
1797                server_port: 8080,
1798                protocol: TransportProtocol::TCP,
1799                tls: false,
1800                tls_version: None,
1801                sni: None,
1802            },
1803            layer: Layer::Http(HttpLayer {
1804                request: HttpRequest {
1805                    method: method.to_string(),
1806                    url: request_url,
1807                    version: "HTTP/1.1".to_string(),
1808                    headers: vec![],
1809                    cookies: vec![],
1810                    query: vec![],
1811                    body: None,
1812                },
1813                response: Some(HttpResponse {
1814                    status,
1815                    status_text: "OK".to_string(),
1816                    version: "HTTP/1.1".to_string(),
1817                    headers: vec![],
1818                    cookies: vec![],
1819                    body: None,
1820                    timing: ResponseTiming {
1821                        time_to_first_byte: None,
1822                        time_to_last_byte: None,
1823                        connect_time_ms: None,
1824                        ssl_time_ms: None,
1825                    },
1826                }),
1827                error: None,
1828            }),
1829            tags: vec![],
1830            meta: std::collections::HashMap::new(),
1831            resilience_trace: None,
1832            rule_variables: std::collections::HashMap::new(),
1833            matched_rules: vec![],
1834        }
1835    }
1836
1837    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1838        let start_time =
1839            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1840        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1841            .expect("url should parse");
1842        Flow {
1843            id: Uuid::new_v4(),
1844            start_time,
1845            end_time: Some(start_time),
1846            network: NetworkInfo {
1847                client_ip: "127.0.0.1".to_string(),
1848                client_port: 12000,
1849                server_ip: "127.0.0.1".to_string(),
1850                server_port: 8080,
1851                protocol: TransportProtocol::TCP,
1852                tls: false,
1853                tls_version: None,
1854                sni: None,
1855            },
1856            layer: Layer::Http(HttpLayer {
1857                request: HttpRequest {
1858                    method: "GET".to_string(),
1859                    url: request_url,
1860                    version: "HTTP/1.1".to_string(),
1861                    headers: vec![
1862                        (
1863                            "Authorization".to_string(),
1864                            "Bearer secret-token".to_string(),
1865                        ),
1866                        ("X-Normal".to_string(), "visible".to_string()),
1867                    ],
1868                    cookies: vec![],
1869                    query: vec![
1870                        ("token".to_string(), "abc123".to_string()),
1871                        ("ok".to_string(), "1".to_string()),
1872                    ],
1873                    body: Some(BodyData {
1874                        encoding: "utf-8".to_string(),
1875                        content: "secret request body".to_string(),
1876                        size: 19,
1877                    }),
1878                },
1879                response: Some(HttpResponse {
1880                    status: 200,
1881                    status_text: "OK".to_string(),
1882                    version: "HTTP/1.1".to_string(),
1883                    headers: vec![
1884                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1885                        ("X-Response".to_string(), "visible".to_string()),
1886                    ],
1887                    cookies: vec![],
1888                    body: Some(BodyData {
1889                        encoding: "utf-8".to_string(),
1890                        content: "secret response body".to_string(),
1891                        size: 20,
1892                    }),
1893                    timing: ResponseTiming {
1894                        time_to_first_byte: None,
1895                        time_to_last_byte: None,
1896                        connect_time_ms: None,
1897                        ssl_time_ms: None,
1898                    },
1899                }),
1900                error: None,
1901            }),
1902            tags: vec![],
1903            meta: std::collections::HashMap::new(),
1904            resilience_trace: None,
1905            rule_variables: std::collections::HashMap::new(),
1906            matched_rules: vec![],
1907        }
1908    }
1909
1910    #[tokio::test]
1911    async fn set_rules_from_records_audit_event() {
1912        let state = CoreState::new(None).await;
1913
1914        state
1915            .set_rules_from(
1916                AuditActor::Http,
1917                "rule.upsert",
1918                "rule-1".to_string(),
1919                json!({ "route": "/api/v1/rules" }),
1920                Vec::new(),
1921            )
1922            .await
1923            .expect("set rules should succeed");
1924
1925        let events = state.recent_audit_events();
1926        let event = events.last().expect("audit event should exist");
1927        assert_eq!(event.actor, AuditActor::Http);
1928        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1929        assert_eq!(event.outcome, AuditOutcome::Success);
1930        assert_eq!(event.target, "rule-1");
1931        assert_eq!(event.details["operation"], "rule.upsert");
1932        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1933    }
1934
1935    #[tokio::test]
1936    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1937        let state = CoreState::new(None).await;
1938
1939        state
1940            .upsert_rule_from(
1941                AuditActor::Probe,
1942                "rule.upsert",
1943                "rule-1".to_string(),
1944                json!({ "tool": "set_rule" }),
1945                Rule {
1946                    id: "rule-1".to_string(),
1947                    name: "first".to_string(),
1948                    active: true,
1949                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1950                    priority: 1,
1951                    termination: relay_core_lib::rule::RuleTermination::Continue,
1952                    filter: relay_core_lib::rule::Filter::Url(
1953                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1954                    ),
1955                    actions: vec![],
1956                    constraints: None,
1957                },
1958            )
1959            .await
1960            .expect("initial upsert should succeed");
1961
1962        state
1963            .upsert_rule_from(
1964                AuditActor::Probe,
1965                "rule.upsert",
1966                "rule-1".to_string(),
1967                json!({ "tool": "set_rule" }),
1968                Rule {
1969                    id: "rule-1".to_string(),
1970                    name: "second".to_string(),
1971                    active: true,
1972                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1973                    priority: 2,
1974                    termination: relay_core_lib::rule::RuleTermination::Continue,
1975                    filter: relay_core_lib::rule::Filter::Url(
1976                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1977                    ),
1978                    actions: vec![],
1979                    constraints: None,
1980                },
1981            )
1982            .await
1983            .expect("replacement upsert should succeed");
1984
1985        let rules = state.get_rules().await;
1986        assert_eq!(rules.len(), 1);
1987        assert_eq!(rules[0].name, "second");
1988        let event = state
1989            .recent_audit_events()
1990            .last()
1991            .cloned()
1992            .expect("audit event");
1993        assert_eq!(event.details["operation"], "rule.upsert");
1994        assert_eq!(event.details["details"]["tool"], "set_rule");
1995    }
1996
1997    #[tokio::test]
1998    async fn delete_rule_from_returns_false_when_rule_missing() {
1999        let state = CoreState::new(None).await;
2000
2001        let deleted = state
2002            .delete_rule_from(
2003                AuditActor::Http,
2004                "rule.delete",
2005                "missing".to_string(),
2006                json!({ "route": "/api/v1/rules/{id}" }),
2007                "missing",
2008            )
2009            .await
2010            .expect("delete should not fail");
2011
2012        assert!(!deleted);
2013        assert!(state.recent_audit_events().is_empty());
2014    }
2015
2016    #[tokio::test]
2017    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2018        let state = CoreState::new(None).await;
2019
2020        let rule_id = state
2021            .create_mock_response_rule_from(
2022                AuditActor::Http,
2023                "api-mock-1".to_string(),
2024                json!({ "route": "/api/v1/mock", "status": 201 }),
2025                MockResponseRuleConfig {
2026                    rule_id: "api-mock-1".to_string(),
2027                    url_pattern: "example.com".to_string(),
2028                    name: "api-mock:example.com".to_string(),
2029                    status: 201,
2030                    content_type: "application/json".to_string(),
2031                    body: "{\"ok\":true}".to_string(),
2032                },
2033            )
2034            .await
2035            .expect("mock rule should be created");
2036
2037        assert_eq!(rule_id, "api-mock-1");
2038        let rules = state.get_rules().await;
2039        assert_eq!(rules.len(), 1);
2040        assert_eq!(rules[0].id, "api-mock-1");
2041        let event = state
2042            .recent_audit_events()
2043            .last()
2044            .cloned()
2045            .expect("audit event");
2046        assert_eq!(event.details["operation"], "rule.mock_create");
2047        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2048    }
2049
2050    #[tokio::test]
2051    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2052        let state = CoreState::new(None).await;
2053
2054        let rule_id = state
2055            .create_intercept_rule_from(
2056                AuditActor::Http,
2057                "intercept-1".to_string(),
2058                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2059                InterceptRuleConfig {
2060                    rule_id: "intercept-1".to_string(),
2061                    active: true,
2062                    url_pattern: "example.com".to_string(),
2063                    method: None,
2064                    phase: "request".to_string(),
2065                    name: "api-intercept:example.com".to_string(),
2066                    priority: 100,
2067                    termination: relay_core_lib::rule::RuleTermination::Stop,
2068                },
2069            )
2070            .await
2071            .expect("intercept rule should be created");
2072
2073        assert_eq!(rule_id, "intercept-1");
2074        let rules = state.get_rules().await;
2075        assert_eq!(rules.len(), 1);
2076        assert_eq!(rules[0].id, "intercept-1");
2077        assert_eq!(rules[0].name, "api-intercept:example.com");
2078        let event = state
2079            .recent_audit_events()
2080            .last()
2081            .cloned()
2082            .expect("audit event");
2083        assert_eq!(event.details["operation"], "rule.intercept_create");
2084        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2085    }
2086
2087    #[tokio::test]
2088    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2089        let state = CoreState::new(None).await;
2090
2091        state
2092            .upsert_legacy_intercept_rule_from(
2093                AuditActor::Tauri,
2094                "legacy-1".to_string(),
2095                json!({ "command": "set_intercept_rule" }),
2096                InterceptRule {
2097                    id: "legacy-1".to_string(),
2098                    active: true,
2099                    url_pattern: "example.com".to_string(),
2100                    method: None,
2101                    phase: "both".to_string(),
2102                },
2103            )
2104            .await
2105            .expect("initial family upsert should succeed");
2106        assert_eq!(state.get_rules().await.len(), 2);
2107
2108        state
2109            .upsert_legacy_intercept_rule_from(
2110                AuditActor::Tauri,
2111                "legacy-1".to_string(),
2112                json!({ "command": "set_intercept_rule" }),
2113                InterceptRule {
2114                    id: "legacy-1".to_string(),
2115                    active: true,
2116                    url_pattern: "example.org".to_string(),
2117                    method: Some("POST".to_string()),
2118                    phase: "request".to_string(),
2119                },
2120            )
2121            .await
2122            .expect("replacement family upsert should succeed");
2123
2124        let rules = state.get_rules().await;
2125        assert_eq!(rules.len(), 1);
2126        assert_eq!(rules[0].id, "legacy-1");
2127        let event = state
2128            .recent_audit_events()
2129            .last()
2130            .cloned()
2131            .expect("audit event");
2132        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2133        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2134    }
2135
2136    #[tokio::test]
2137    async fn resolve_intercept_failure_records_failed_audit_event() {
2138        let state = CoreState::new(None).await;
2139
2140        let result = state
2141            .resolve_intercept_with_modifications_from(
2142                AuditActor::Probe,
2143                "missing-flow:request".to_string(),
2144                "drop",
2145                None,
2146            )
2147            .await;
2148
2149        assert!(result.is_err());
2150        let events = state.recent_audit_events();
2151        let event = events.last().expect("audit event should exist");
2152        assert_eq!(event.actor, AuditActor::Probe);
2153        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2154        assert_eq!(event.outcome, AuditOutcome::Failed);
2155        assert_eq!(event.details["action"], "drop");
2156        assert!(
2157            event.details["error"]
2158                .as_str()
2159                .unwrap_or_default()
2160                .contains("Interception not found")
2161        );
2162    }
2163
2164    #[tokio::test]
2165    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2166        let state = CoreState::new(None).await;
2167        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2168
2169        state
2170            .prepare_start(8080, shutdown_tx)
2171            .expect("prepare start should succeed");
2172        let lifecycle = state.lifecycle();
2173        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2174        assert_eq!(lifecycle.port, Some(8080));
2175        assert!(lifecycle.started_at_ms.is_none());
2176        assert!(lifecycle.last_error.is_none());
2177
2178        assert_eq!(
2179            state.stop_proxy().expect("stop should succeed"),
2180            ProxyStopResult::Stopping
2181        );
2182        let lifecycle = state.lifecycle();
2183        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2184        assert_eq!(lifecycle.port, Some(8080));
2185        assert!(shutdown_rx.await.is_ok());
2186    }
2187
2188    #[tokio::test]
2189    async fn status_snapshot_derives_runtime_facing_fields() {
2190        let state = CoreState::new(None).await;
2191        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2192        state
2193            .prepare_start(8080, shutdown_tx)
2194            .expect("prepare start should succeed");
2195
2196        let status = state.status_snapshot();
2197        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2198        assert!(status.running);
2199        assert_eq!(status.port, Some(8080));
2200        assert!(status.uptime.is_none());
2201        assert!(status.last_error.is_none());
2202    }
2203
2204    #[tokio::test]
2205    async fn status_report_combines_status_and_metrics() {
2206        let state = CoreState::new(None).await;
2207        let report = state.status_report().await;
2208
2209        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2210        assert!(!report.status.running);
2211        assert_eq!(report.metrics.intercepts_pending, 0);
2212        assert_eq!(report.metrics.ws_pending_messages, 0);
2213        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2214        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2215        assert_eq!(report.metrics.audit_events_total, 0);
2216        assert_eq!(report.metrics.audit_events_failed, 0);
2217        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2218        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2219    }
2220
2221    #[test]
2222    fn proxy_config_new_and_transport_setters_preserve_values() {
2223        let config = ProxyConfig::new(
2224            8080,
2225            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2226            std::path::PathBuf::from("/tmp/ca_key.pem"),
2227        )
2228        .with_transparent(true)
2229        .with_udp_tproxy_port(Some(15000));
2230
2231        assert_eq!(config.port, 8080);
2232        assert_eq!(
2233            config.ca_cert_path,
2234            std::path::PathBuf::from("/tmp/ca_cert.pem")
2235        );
2236        assert_eq!(
2237            config.ca_key_path,
2238            std::path::PathBuf::from("/tmp/ca_key.pem")
2239        );
2240        assert!(config.transparent);
2241        assert_eq!(config.udp_tproxy_port, Some(15000));
2242    }
2243
2244    #[test]
2245    fn proxy_config_from_app_data_dir_creates_default_paths() {
2246        let unique = SystemTime::now()
2247            .duration_since(UNIX_EPOCH)
2248            .expect("clock drift")
2249            .as_nanos();
2250        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2251
2252        let config =
2253            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2254
2255        assert!(dir.exists());
2256        assert_eq!(config.port, 8899);
2257        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2258        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2259        assert!(!config.transparent);
2260        assert!(config.udp_tproxy_port.is_none());
2261    }
2262
2263    #[tokio::test]
2264    async fn intercept_snapshot_maps_pending_counts() {
2265        let state = CoreState::new(None).await;
2266        let snapshot = state.intercept_snapshot().await;
2267
2268        assert_eq!(snapshot.pending_count, 0);
2269        assert_eq!(snapshot.ws_pending_count, 0);
2270    }
2271
2272    #[tokio::test]
2273    async fn audit_snapshot_returns_latest_events_in_order() {
2274        let state = CoreState::new(None).await;
2275        state.record_audit_event(AuditEvent::new(
2276            AuditActor::Runtime,
2277            AuditEventKind::RuleChanged,
2278            "first",
2279            AuditOutcome::Success,
2280            json!({ "index": 1 }),
2281        ));
2282        state.record_audit_event(AuditEvent::new(
2283            AuditActor::Http,
2284            AuditEventKind::PolicyUpdated,
2285            "second",
2286            AuditOutcome::Success,
2287            json!({ "index": 2 }),
2288        ));
2289
2290        let snapshot = state.audit_snapshot(1);
2291
2292        assert_eq!(snapshot.events.len(), 1);
2293        assert_eq!(snapshot.events[0].target, "second");
2294        assert_eq!(snapshot.events[0].details["index"], 2);
2295    }
2296
2297    #[tokio::test]
2298    async fn query_audit_snapshot_filters_in_memory_events() {
2299        let state = CoreState::new(None).await;
2300        state.record_audit_event(AuditEvent::new(
2301            AuditActor::Http,
2302            AuditEventKind::RuleChanged,
2303            "rule-1",
2304            AuditOutcome::Success,
2305            json!({ "idx": 1 }),
2306        ));
2307        state.record_audit_event(AuditEvent::new(
2308            AuditActor::Probe,
2309            AuditEventKind::PolicyUpdated,
2310            "policy",
2311            AuditOutcome::Failed,
2312            json!({ "idx": 2 }),
2313        ));
2314
2315        let snapshot = state
2316            .query_audit_snapshot(CoreAuditQuery {
2317                actor: Some(AuditActor::Probe),
2318                kind: Some(AuditEventKind::PolicyUpdated),
2319                outcome: Some(AuditOutcome::Failed),
2320                limit: 10,
2321                ..Default::default()
2322            })
2323            .await;
2324
2325        assert_eq!(snapshot.events.len(), 1);
2326        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2327        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2328        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2329    }
2330
2331    #[tokio::test]
2332    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2333        let state = CoreState::new(Some(sqlite_url())).await;
2334        state.update_policy_from(
2335            AuditActor::Http,
2336            "policy".to_string(),
2337            ProxyPolicy {
2338                transparent_enabled: true,
2339                ..Default::default()
2340            },
2341        );
2342
2343        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2344        for _ in 0..10 {
2345            snapshot = state
2346                .query_audit_snapshot(CoreAuditQuery {
2347                    actor: Some(AuditActor::Http),
2348                    kind: Some(AuditEventKind::PolicyUpdated),
2349                    limit: 10,
2350                    ..Default::default()
2351                })
2352                .await;
2353            if !snapshot.events.is_empty() {
2354                break;
2355            }
2356            sleep(Duration::from_millis(20)).await;
2357        }
2358
2359        assert!(!snapshot.events.is_empty());
2360        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2361        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2362    }
2363
2364    #[tokio::test]
2365    async fn prepare_start_rejects_second_active_start() {
2366        let state = CoreState::new(None).await;
2367        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2368        state
2369            .prepare_start(8080, shutdown_tx)
2370            .expect("first start should succeed");
2371
2372        let (second_tx, _second_rx) = oneshot::channel();
2373        let error = state
2374            .prepare_start(8081, second_tx)
2375            .expect_err("second active start should be rejected");
2376        assert!(error.contains("already"));
2377    }
2378
2379    #[test]
2380    fn update_policy_records_audit_event() {
2381        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2382        let state = runtime.block_on(CoreState::new(None));
2383
2384        state.update_policy_from(
2385            AuditActor::Runtime,
2386            "policy".to_string(),
2387            ProxyPolicy {
2388                transparent_enabled: true,
2389                ..Default::default()
2390            },
2391        );
2392
2393        let events = state.recent_audit_events();
2394        let event = events.last().expect("audit event should exist");
2395        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2396        assert_eq!(event.outcome, AuditOutcome::Success);
2397        assert_eq!(event.details["transparent_enabled"], true);
2398    }
2399
2400    #[test]
2401    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2402        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2403        let state = runtime.block_on(CoreState::new(None));
2404        let original_timeout = state.policy_snapshot().request_timeout_ms;
2405
2406        state.patch_policy_from(
2407            AuditActor::Runtime,
2408            "policy.patch".to_string(),
2409            relay_core_api::policy::ProxyPolicyPatch {
2410                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2411                    enabled: Some(true),
2412                    redact_bodies: Some(true),
2413                    ..Default::default()
2414                }),
2415            },
2416        );
2417
2418        let policy = state.policy_snapshot();
2419        assert_eq!(policy.request_timeout_ms, original_timeout);
2420        assert!(policy.redaction.enabled);
2421        assert!(policy.redaction.redact_bodies);
2422
2423        let events = state.recent_audit_events();
2424        let event = events.last().expect("audit event should exist");
2425        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2426        assert_eq!(event.details["redaction_enabled"], true);
2427    }
2428
2429    #[tokio::test]
2430    async fn metrics_include_audit_and_lagged_event_counters() {
2431        let state = CoreState::new(None).await;
2432
2433        state.update_policy_from(
2434            AuditActor::Runtime,
2435            "policy".to_string(),
2436            ProxyPolicy::default(),
2437        );
2438        let _ = state
2439            .resolve_intercept_with_modifications_from(
2440                AuditActor::Probe,
2441                "missing-flow:request".to_string(),
2442                "drop",
2443                None,
2444            )
2445            .await;
2446
2447        state.record_flow_events_lagged(3);
2448        state.record_audit_events_lagged(5);
2449
2450        let metrics = state.get_metrics().await;
2451        assert_eq!(metrics.audit_events_total, 2);
2452        assert_eq!(metrics.audit_events_failed, 1);
2453        assert_eq!(metrics.flow_events_lagged_total, 3);
2454        assert_eq!(metrics.audit_events_lagged_total, 5);
2455    }
2456
2457    #[tokio::test]
2458    async fn prometheus_metrics_text_contains_observability_fields() {
2459        let state = CoreState::new(None).await;
2460        state.record_flow_events_lagged(2);
2461        state.record_audit_events_lagged(4);
2462
2463        let text = state.get_metrics_prometheus_text().await;
2464        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2465        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2466        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2467        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2468    }
2469
2470    #[tokio::test]
2471    async fn search_flows_uses_store_with_offset_pagination() {
2472        let state = CoreState::new(Some(sqlite_url())).await;
2473        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2474        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2475        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2476
2477        state.upsert_flow(Box::new(flow_a));
2478        state.upsert_flow(Box::new(flow_b));
2479        state.upsert_flow(Box::new(flow_c));
2480
2481        let mut baseline = Vec::new();
2482        for _ in 0..20 {
2483            baseline = state
2484                .search_flows(FlowQuery {
2485                    host: Some("api.example.com".to_string()),
2486                    path_contains: None,
2487                    method: None,
2488                    status_min: None,
2489                    status_max: None,
2490                    has_error: None,
2491                    is_websocket: None,
2492                    limit: Some(3),
2493                    offset: Some(0),
2494                })
2495                .await;
2496            if baseline.len() == 3 {
2497                break;
2498            }
2499            sleep(Duration::from_millis(20)).await;
2500        }
2501
2502        assert_eq!(baseline.len(), 3);
2503        let page = state
2504            .search_flows(FlowQuery {
2505                host: Some("api.example.com".to_string()),
2506                path_contains: None,
2507                method: None,
2508                status_min: None,
2509                status_max: None,
2510                has_error: None,
2511                is_websocket: None,
2512                limit: Some(1),
2513                offset: Some(1),
2514            })
2515            .await;
2516        assert_eq!(page.len(), 1);
2517        assert_eq!(page[0].id, baseline[1].id);
2518    }
2519
2520    #[tokio::test]
2521    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2522        let state = CoreState::new(Some(sqlite_url())).await;
2523        let first_flow = sample_http_flow(
2524            "persist.example.com",
2525            "/first",
2526            "GET",
2527            200,
2528            1_700_000_010_000,
2529        );
2530        let first_id = first_flow.id.to_string();
2531        state.upsert_flow(Box::new(first_flow));
2532        for i in 0..240 {
2533            state.upsert_flow(Box::new(sample_http_flow(
2534                "persist.example.com",
2535                &format!("/{}", i),
2536                "GET",
2537                200,
2538                1_700_000_020_000 + i,
2539            )));
2540        }
2541
2542        sleep(Duration::from_millis(200)).await;
2543
2544        let loaded = state.get_flow(first_id).await;
2545        assert!(loaded.is_some());
2546    }
2547
2548    #[tokio::test]
2549    async fn search_flows_redacts_summary_url_when_enabled() {
2550        let state = CoreState::new(None).await;
2551        state.update_policy_from(
2552            AuditActor::Runtime,
2553            "policy.redaction".to_string(),
2554            ProxyPolicy {
2555                redaction: RedactionPolicy {
2556                    enabled: true,
2557                    sensitive_query_keys: vec!["token".to_string()],
2558                    redact_bodies: false,
2559                    ..Default::default()
2560                },
2561                ..Default::default()
2562            },
2563        );
2564        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2565
2566        let mut items = Vec::new();
2567        for _ in 0..20 {
2568            items = state
2569                .search_flows(FlowQuery {
2570                    host: Some("api.example.com".to_string()),
2571                    path_contains: Some("/private".to_string()),
2572                    ..Default::default()
2573                })
2574                .await;
2575            if !items.is_empty() {
2576                break;
2577            }
2578            sleep(Duration::from_millis(20)).await;
2579        }
2580
2581        assert!(!items.is_empty());
2582        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2583        let token = redacted
2584            .query_pairs()
2585            .find(|(k, _)| k == "token")
2586            .map(|(_, v)| v.to_string());
2587        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2588    }
2589
2590    #[tokio::test]
2591    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2592        let state = CoreState::new(Some(sqlite_url())).await;
2593        state.update_policy_from(
2594            AuditActor::Runtime,
2595            "policy.redaction".to_string(),
2596            ProxyPolicy {
2597                redaction: RedactionPolicy {
2598                    enabled: true,
2599                    sensitive_header_names: vec![
2600                        "authorization".to_string(),
2601                        "set-cookie".to_string(),
2602                    ],
2603                    sensitive_query_keys: vec!["token".to_string()],
2604                    redact_bodies: true,
2605                },
2606                ..Default::default()
2607            },
2608        );
2609
2610        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2611        let flow_id = flow.id.to_string();
2612        state.upsert_flow(Box::new(flow));
2613        sleep(Duration::from_millis(80)).await;
2614
2615        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2616        let Layer::Http(http) = loaded.layer else {
2617            panic!("expected http layer");
2618        };
2619
2620        let auth = http
2621            .request
2622            .headers
2623            .iter()
2624            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2625            .map(|(_, v)| v.as_str());
2626        assert_eq!(auth, Some("[REDACTED]"));
2627
2628        let req_query_token = http
2629            .request
2630            .query
2631            .iter()
2632            .find(|(k, _)| k == "token")
2633            .map(|(_, v)| v.as_str());
2634        assert_eq!(req_query_token, Some("[REDACTED]"));
2635
2636        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2637        assert_eq!(req_body, Some("[REDACTED]"));
2638
2639        let response = http.response.expect("response should exist");
2640        let set_cookie = response
2641            .headers
2642            .iter()
2643            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2644            .map(|(_, v)| v.as_str());
2645        assert_eq!(set_cookie, Some("[REDACTED]"));
2646        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2647        assert_eq!(res_body, Some("[REDACTED]"));
2648    }
2649
2650    #[test]
2651    fn redact_flow_update_masks_http_body_when_enabled() {
2652        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2653        let state = runtime.block_on(CoreState::new(None));
2654        state.update_policy_from(
2655            AuditActor::Runtime,
2656            "policy.redaction".to_string(),
2657            ProxyPolicy {
2658                redaction: RedactionPolicy {
2659                    enabled: true,
2660                    redact_bodies: true,
2661                    ..Default::default()
2662                },
2663                ..Default::default()
2664            },
2665        );
2666
2667        let update = FlowUpdate::HttpBody {
2668            flow_id: "f-1".to_string(),
2669            direction: relay_core_api::flow::Direction::ClientToServer,
2670            body: BodyData {
2671                encoding: "utf-8".to_string(),
2672                content: "super-secret".to_string(),
2673                size: 12,
2674            },
2675        };
2676        let redacted = state.redact_flow_update_for_output(update);
2677        match redacted {
2678            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2679            _ => panic!("expected http body update"),
2680        }
2681    }
2682
2683    #[cfg(feature = "script")]
2684    #[tokio::test]
2685    async fn load_script_from_records_audit_event() {
2686        let state = CoreState::new(None).await;
2687
2688        state
2689            .load_script_from(
2690                AuditActor::Tauri,
2691                "tauri.load_script".to_string(),
2692                "globalThis.onRequestHeaders = (_flow) => {};",
2693            )
2694            .await
2695            .expect("script should load");
2696
2697        let events = state.recent_audit_events();
2698        let event = events.last().expect("audit event should exist");
2699        assert_eq!(event.actor, AuditActor::Tauri);
2700        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2701        assert_eq!(event.outcome, AuditOutcome::Success);
2702        assert_eq!(event.target, "tauri.load_script");
2703    }
2704}