Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62mod log_format;
63pub mod modification;
64pub mod paths;
65pub mod rule;
66pub mod services;
67
68// ── Re-exports for user convenience ──
69// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
70pub use paths::CaPaths;
71pub use relay_core_api::{flow, policy};
72pub use relay_core_lib::InterceptionResult;
73pub use relay_core_lib::rule as lib_rule;
74
75use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
76use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
77use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
78
79pub mod rule_engine {
80    pub use relay_core_lib::rule_engine::*;
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
84pub struct CoreMetrics {
85    pub flows_total: usize,
86    pub flows_in_memory: usize,
87    pub flows_dropped: usize,
88    pub intercepts_pending: usize,
89    pub ws_pending_messages: usize,
90    pub oldest_intercept_age_ms: Option<u64>,
91    pub oldest_ws_message_age_ms: Option<u64>,
92    pub rule_exec_errors: usize,
93    pub audit_events_total: usize,
94    pub audit_events_failed: usize,
95    pub flow_events_lagged_total: usize,
96    pub audit_events_lagged_total: usize,
97    /// O4: Total bodies degraded (budget exceeded)
98    pub proxy_body_degraded_total: usize,
99    /// O4: Total HTTP requests processed through streaming pipeline
100    pub proxy_http_request_total: usize,
101    /// O4: Total sandbox rejections
102    pub proxy_sandbox_reject_total: usize,
103    /// O4: Total invalid method rejections (strict_http_semantics)
104    pub proxy_invalid_method_total: usize,
105    /// O4: Total invalid status code rejections (strict_http_semantics)
106    pub proxy_invalid_status_total: usize,
107    /// O4: Total idempotent request retries
108    pub proxy_retry_total: usize,
109    /// O4: Total bodies processed in tap (streaming) mode
110    pub proxy_stream_mode_tap_total: usize,
111    /// O4: Total bodies degraded from tap to pass-through
112    pub proxy_stream_mode_degrade_total: usize,
113}
114
115impl CoreMetrics {
116    pub fn to_prometheus_text(&self) -> String {
117        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
118        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
119        format!(
120            "relay_core_flows_total {}\n\
121relay_core_flows_in_memory {}\n\
122relay_core_flows_dropped_total {}\n\
123relay_core_intercepts_pending {}\n\
124relay_core_ws_pending_messages {}\n\
125relay_core_oldest_intercept_age_ms {}\n\
126relay_core_oldest_ws_message_age_ms {}\n\
127relay_core_rule_exec_errors_total {}\n\
128relay_core_audit_events_total {}\n\
129relay_core_audit_events_failed_total {}\n\
130relay_core_flow_events_lagged_total {}\n\
131relay_core_audit_events_lagged_total {}\n\
132relay_core_proxy_body_degraded_total {}\n\
133relay_core_proxy_http_request_total {}\n\
134relay_core_proxy_sandbox_reject_total {}\n\
135relay_core_proxy_invalid_method_total {}\n\
136relay_core_proxy_invalid_status_total {}\n\
137relay_core_proxy_retry_total {}\n\
138relay_core_proxy_stream_mode_tap_total {}\n\
139relay_core_proxy_stream_mode_degrade_total {}\n",
140            self.flows_total,
141            self.flows_in_memory,
142            self.flows_dropped,
143            self.intercepts_pending,
144            self.ws_pending_messages,
145            oldest_intercept_age_ms,
146            oldest_ws_message_age_ms,
147            self.rule_exec_errors,
148            self.audit_events_total,
149            self.audit_events_failed,
150            self.flow_events_lagged_total,
151            self.audit_events_lagged_total,
152            self.proxy_body_degraded_total,
153            self.proxy_http_request_total,
154            self.proxy_sandbox_reject_total,
155            self.proxy_invalid_method_total,
156            self.proxy_invalid_status_total,
157            self.proxy_retry_total,
158            self.proxy_stream_mode_tap_total,
159            self.proxy_stream_mode_degrade_total,
160        )
161    }
162}
163
164#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
165pub struct CoreStatusSnapshot {
166    pub phase: RuntimeLifecyclePhase,
167    pub running: bool,
168    pub port: Option<u16>,
169    pub uptime: Option<u64>,
170    pub last_error: Option<String>,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
174pub struct CoreStatusReport {
175    pub status: CoreStatusSnapshot,
176    pub metrics: CoreMetrics,
177}
178
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
180pub struct CoreInterceptSnapshot {
181    pub pending_count: usize,
182    pub ws_pending_count: usize,
183}
184
185#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
186pub struct CoreAuditSnapshot {
187    pub events: Vec<AuditEvent>,
188}
189
190#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
191pub struct CoreAuditQuery {
192    pub since_ms: Option<u64>,
193    pub until_ms: Option<u64>,
194    pub actor: Option<AuditActor>,
195    pub kind: Option<AuditEventKind>,
196    pub outcome: Option<AuditOutcome>,
197    pub limit: usize,
198}
199
200impl Default for CoreAuditQuery {
201    fn default() -> Self {
202        Self {
203            since_ms: None,
204            until_ms: None,
205            actor: None,
206            kind: None,
207            outcome: None,
208            limit: 50,
209        }
210    }
211}
212
213#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
214#[serde(rename_all = "snake_case")]
215pub enum RuntimeLifecyclePhase {
216    Created,
217    Starting,
218    Running,
219    Stopping,
220    Stopped,
221    Failed,
222}
223
224impl RuntimeLifecyclePhase {
225    pub fn as_str(&self) -> &'static str {
226        match self {
227            Self::Created => "created",
228            Self::Starting => "starting",
229            Self::Running => "running",
230            Self::Stopping => "stopping",
231            Self::Stopped => "stopped",
232            Self::Failed => "failed",
233        }
234    }
235
236    pub fn is_active(&self) -> bool {
237        matches!(self, Self::Starting | Self::Running | Self::Stopping)
238    }
239}
240
241#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
242pub struct RuntimeLifecycle {
243    pub phase: RuntimeLifecyclePhase,
244    pub port: Option<u16>,
245    pub started_at_ms: Option<u64>,
246    pub last_error: Option<String>,
247}
248
249impl RuntimeLifecycle {
250    pub fn created() -> Self {
251        Self {
252            phase: RuntimeLifecyclePhase::Created,
253            port: None,
254            started_at_ms: None,
255            last_error: None,
256        }
257    }
258
259    pub fn is_active(&self) -> bool {
260        self.phase.is_active()
261    }
262
263    pub fn uptime_seconds(&self) -> Option<u64> {
264        let started_at_ms = self.started_at_ms?;
265        let now_ms = now_unix_ms();
266        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
267    }
268}
269
270pub enum ProxySpawnResult {
271    Started(tokio::task::JoinHandle<()>),
272    AlreadyRunning,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum ProxyStopResult {
277    Stopping,
278    NotRunning,
279}
280
281impl From<RuntimeLifecycle> for CoreStatusSnapshot {
282    fn from(lifecycle: RuntimeLifecycle) -> Self {
283        Self {
284            running: lifecycle.is_active(),
285            uptime: lifecycle.uptime_seconds(),
286            port: lifecycle.port,
287            last_error: lifecycle.last_error,
288            phase: lifecycle.phase,
289        }
290    }
291}
292
293pub struct CoreState {
294    flow_store: mpsc::Sender<FlowStoreMessage>,
295    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
296    rule_store: mpsc::Sender<RuleStoreMessage>,
297    store: Option<Store>,
298    #[cfg(feature = "script")]
299    pub script_interceptor: Arc<ScriptInterceptor>,
300    pub policy_tx: watch::Sender<ProxyPolicy>,
301    pub flows_dropped: Arc<AtomicUsize>,
302    audit_events_total: Arc<AtomicUsize>,
303    audit_events_failed: Arc<AtomicUsize>,
304    flow_events_lagged_total: Arc<AtomicUsize>,
305    audit_events_lagged_total: Arc<AtomicUsize>,
306    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
307    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
308    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
309    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
310    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
311    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
312}
313
314impl CoreState {
315    pub async fn new(db_url: Option<String>) -> Self {
316        const AUDIT_HISTORY_LIMIT: usize = 200;
317
318        let store = if let Some(url) = db_url {
319            match Store::connect(&url).await {
320                Ok(s) => {
321                    if let Err(e) = s.init().await {
322                        tracing::error!("Failed to init store: {}", e);
323                    }
324                    Some(s)
325                }
326                Err(e) => {
327                    tracing::error!("Failed to connect to store: {}", e);
328                    None
329                }
330            }
331        } else {
332            None
333        };
334
335        let (flow_tx, flow_rx) = mpsc::channel(10000);
336        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
337        tokio::spawn(flow_actor.run());
338
339        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
340        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
341        tokio::spawn(intercept_actor.run());
342
343        let (rule_tx, rule_rx) = mpsc::channel(100);
344        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
345        tokio::spawn(rule_actor.run());
346
347        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
348        let (flow_broadcast_tx, _) = broadcast::channel(1000);
349        let (audit_broadcast_tx, _) = broadcast::channel(256);
350        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
351
352        #[cfg(feature = "script")]
353        let script_interceptor = ScriptInterceptor::new()
354            .await
355            .expect("Failed to initialize ScriptInterceptor");
356        Self {
357            flow_store: flow_tx,
358            intercept_broker: intercept_tx,
359            rule_store: rule_tx,
360            store,
361            #[cfg(feature = "script")]
362            script_interceptor: Arc::new(script_interceptor),
363            policy_tx,
364            flows_dropped: Arc::new(AtomicUsize::new(0)),
365            audit_events_total: Arc::new(AtomicUsize::new(0)),
366            audit_events_failed: Arc::new(AtomicUsize::new(0)),
367            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
368            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
369            flow_broadcast_tx,
370            audit_broadcast_tx,
371            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
372            lifecycle_tx,
373            shutdown_tx: Mutex::new(None),
374        }
375    }
376
377    pub async fn get_metrics(&self) -> CoreMetrics {
378        let (flow_tx, flow_rx) = oneshot::channel();
379        let _ = self
380            .flow_store
381            .send(FlowStoreMessage::GetMetrics(flow_tx))
382            .await;
383        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
384
385        let (int_tx, int_rx) = oneshot::channel();
386        let _ = self
387            .intercept_broker
388            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
389            .await;
390        let (
391            intercepts_pending,
392            ws_pending_messages,
393            oldest_intercept_age_ms,
394            oldest_ws_message_age_ms,
395        ) = int_rx.await.unwrap_or((0, 0, None, None));
396
397        let (rule_tx, rule_rx) = oneshot::channel();
398        let _ = self
399            .rule_store
400            .send(RuleStoreMessage::GetMetrics(rule_tx))
401            .await;
402        let rule_exec_errors = rule_rx.await.unwrap_or(0);
403
404        CoreMetrics {
405            flows_total,
406            flows_in_memory,
407            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
408            intercepts_pending,
409            ws_pending_messages,
410            oldest_intercept_age_ms,
411            oldest_ws_message_age_ms,
412            rule_exec_errors,
413            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
414            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
415            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
416            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
417            proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
418            proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
419            proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
420            proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
421            proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
422            proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
423            proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
424            proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
425            ),
426        }
427    }
428
429    pub async fn get_metrics_prometheus_text(&self) -> String {
430        let mut text = self.get_metrics().await.to_prometheus_text();
431        #[cfg(feature = "script")]
432        {
433            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
434        }
435        text
436    }
437
438    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
439        self.lifecycle().into()
440    }
441
442    pub async fn status_report(&self) -> CoreStatusReport {
443        CoreStatusReport {
444            status: self.status_snapshot(),
445            metrics: self.get_metrics().await,
446        }
447    }
448
449    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
450        let metrics = self.get_metrics().await;
451        CoreInterceptSnapshot {
452            pending_count: metrics.intercepts_pending,
453            ws_pending_count: metrics.ws_pending_messages,
454        }
455    }
456
457    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
458        let events = self.recent_audit_events();
459        let start = events.len().saturating_sub(limit);
460        CoreAuditSnapshot {
461            events: events.into_iter().skip(start).collect(),
462        }
463    }
464
465    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
466        let limit = query.limit.clamp(1, 500);
467        if let Some(store) = &self.store {
468            let rows = store
469                .query_audit_events(
470                    query.since_ms,
471                    query.until_ms,
472                    query.actor.as_ref().map(AuditActor::as_str),
473                    query.kind.as_ref().map(AuditEventKind::as_str),
474                    query.outcome.as_ref().map(AuditOutcome::as_str),
475                    limit,
476                )
477                .await
478                .unwrap_or_default();
479
480            let mut events = Vec::with_capacity(rows.len());
481            for row in rows {
482                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
483                    events.push(event);
484                }
485            }
486            return CoreAuditSnapshot { events };
487        }
488
489        let mut events = self.recent_audit_events();
490        events.reverse();
491        let filtered = events
492            .into_iter()
493            .filter(|event| {
494                query
495                    .since_ms
496                    .map(|v| event.timestamp_ms >= v)
497                    .unwrap_or(true)
498            })
499            .filter(|event| {
500                query
501                    .until_ms
502                    .map(|v| event.timestamp_ms <= v)
503                    .unwrap_or(true)
504            })
505            .filter(|event| {
506                query
507                    .actor
508                    .as_ref()
509                    .map(|v| &event.actor == v)
510                    .unwrap_or(true)
511            })
512            .filter(|event| {
513                query
514                    .kind
515                    .as_ref()
516                    .map(|v| &event.kind == v)
517                    .unwrap_or(true)
518            })
519            .filter(|event| {
520                query
521                    .outcome
522                    .as_ref()
523                    .map(|v| &event.outcome == v)
524                    .unwrap_or(true)
525            })
526            .take(limit)
527            .collect();
528        CoreAuditSnapshot { events: filtered }
529    }
530
531    pub async fn get_flow(&self, id: String) -> Option<Flow> {
532        let (tx, rx) = oneshot::channel();
533        if let Err(e) = self
534            .flow_store
535            .send(FlowStoreMessage::GetFlow {
536                id: id.clone(),
537                respond_to: tx,
538            })
539            .await
540        {
541            error!("Failed to send GetFlow request: {}", e);
542            if let Some(store) = &self.store {
543                return store
544                    .load_flow(&id)
545                    .await
546                    .ok()
547                    .flatten()
548                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
549            }
550            return None;
551        }
552        let flow = rx
553            .await
554            .map_err(|e| {
555                error!("Failed to receive Flow response: {}", e);
556                e
557            })
558            .unwrap_or(None);
559        if let Some(flow) = flow {
560            return Some(redact_flow(flow, &self.current_redaction_policy()));
561        }
562        if let Some(store) = &self.store {
563            return store
564                .load_flow(&id)
565                .await
566                .ok()
567                .flatten()
568                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
569                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
570        }
571        None
572    }
573
574    pub async fn get_rules(&self) -> Vec<Rule> {
575        let (tx, rx) = oneshot::channel();
576        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
577            error!("Failed to send GetRules request: {}", e);
578            return Vec::new();
579        }
580        rx.await
581            .map_err(|e| {
582                error!("Failed to receive Rules response: {}", e);
583                e
584            })
585            .unwrap_or_default()
586    }
587
588    pub async fn set_rules(&self, rules: Vec<Rule>) {
589        let _ = self
590            .set_rules_from(
591                AuditActor::Runtime,
592                "rules.replace",
593                "rule_set".to_string(),
594                json!({}),
595                rules,
596            )
597            .await;
598    }
599
600    pub async fn upsert_rule_from(
601        &self,
602        actor: AuditActor,
603        operation: &str,
604        target: String,
605        details: serde_json::Value,
606        rule: Rule,
607    ) -> Result<(), String> {
608        let rule_id = rule.id.clone();
609        let mut rules = self.get_rules().await;
610        rules.retain(|existing| existing.id != rule_id);
611        rules.push(rule);
612        self.set_rules_from(actor, operation, target, details, rules)
613            .await
614    }
615
616    pub async fn delete_rule_from(
617        &self,
618        actor: AuditActor,
619        operation: &str,
620        target: String,
621        details: serde_json::Value,
622        rule_id: &str,
623    ) -> Result<bool, String> {
624        let mut rules = self.get_rules().await;
625        let before = rules.len();
626        rules.retain(|rule| rule.id != rule_id);
627        if rules.len() == before {
628            return Ok(false);
629        }
630        self.set_rules_from(actor, operation, target, details, rules)
631            .await
632            .map(|_| true)
633    }
634
635    pub async fn create_mock_response_rule_from(
636        &self,
637        actor: AuditActor,
638        target: String,
639        details: serde_json::Value,
640        config: MockResponseRuleConfig,
641    ) -> Result<String, String> {
642        let rule_id = config.rule_id.clone();
643        let rule = build_mock_response_rule(config);
644        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
645            .await
646            .map(|_| rule_id)
647    }
648
649    pub async fn create_intercept_rule_from(
650        &self,
651        actor: AuditActor,
652        target: String,
653        details: serde_json::Value,
654        config: InterceptRuleConfig,
655    ) -> Result<String, String> {
656        let rule_id = config.rule_id.clone();
657        let mut rules = self.get_rules().await;
658        rules.extend(build_intercept_rules(config));
659        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
660            .await
661            .map(|_| rule_id)
662    }
663
664    pub async fn upsert_legacy_intercept_rule_from(
665        &self,
666        actor: AuditActor,
667        target: String,
668        details: serde_json::Value,
669        rule: InterceptRule,
670    ) -> Result<(), String> {
671        let rule_id = rule.id.clone();
672        let mut rules = self.get_rules().await;
673        rules.retain(|existing| {
674            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
675        });
676        rules.extend(rule.to_rules());
677        self.set_rules_from(
678            actor,
679            "rule.intercept_legacy_upsert",
680            target,
681            details,
682            rules,
683        )
684        .await
685    }
686
687    pub async fn set_rules_from(
688        &self,
689        actor: AuditActor,
690        operation: &str,
691        target: String,
692        details: serde_json::Value,
693        rules: Vec<Rule>,
694    ) -> Result<(), String> {
695        let rule_count = rules.len();
696        if let Err(e) = self
697            .rule_store
698            .send(RuleStoreMessage::SetRules(rules))
699            .await
700        {
701            error!("Failed to send SetRules request: {}", e);
702            self.record_audit_event(AuditEvent::new(
703                actor,
704                AuditEventKind::RuleChanged,
705                target,
706                AuditOutcome::Failed,
707                json!({
708                    "operation": operation,
709                    "rule_count": rule_count,
710                    "details": details,
711                    "error": e.to_string()
712                }),
713            ));
714            return Err(e.to_string());
715        }
716
717        self.record_audit_event(AuditEvent::new(
718            actor,
719            AuditEventKind::RuleChanged,
720            target,
721            AuditOutcome::Success,
722            json!({
723                "operation": operation,
724                "rule_count": rule_count,
725                "details": details
726            }),
727        ));
728        Ok(())
729    }
730
731    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
732        let mut new_rules = Vec::new();
733        for rule in rules {
734            new_rules.extend(rule.to_rules());
735        }
736        self.set_rules(new_rules).await;
737    }
738
739    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
740        let (tx, rx) = oneshot::channel();
741        if let Err(e) = self
742            .rule_store
743            .send(RuleStoreMessage::GetRuleEngine(tx))
744            .await
745        {
746            error!("Failed to send GetRuleEngine request: {}", e);
747            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
748        }
749        rx.await
750            .map_err(|e| {
751                error!("Failed to receive RuleEngine response: {}", e);
752                e
753            })
754            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
755    }
756
757    pub fn update_policy(&self, policy: ProxyPolicy) {
758        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
759    }
760
761    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
762        let mut policy = self.policy_snapshot();
763        policy.apply_patch(patch);
764        self.update_policy_from(actor, target, policy);
765    }
766
767    pub fn policy_snapshot(&self) -> ProxyPolicy {
768        self.policy_tx.borrow().clone()
769    }
770
771    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
772        let details = json!({
773            "strict_http_semantics": policy.strict_http_semantics,
774            "request_timeout_ms": policy.request_timeout_ms,
775            "max_body_size": policy.max_body_size,
776            "transparent_enabled": policy.transparent_enabled,
777            "redaction_enabled": policy.redaction.enabled,
778            "redact_bodies": policy.redaction.redact_bodies
779        });
780
781        self.policy_tx.send_replace(policy);
782        self.record_audit_event(AuditEvent::new(
783            actor,
784            AuditEventKind::PolicyUpdated,
785            target,
786            AuditOutcome::Success,
787            details,
788        ));
789    }
790
791    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
792        if let Err(e) = self
793            .intercept_broker
794            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
795            .await
796        {
797            error!("Failed to send RegisterIntercept request: {}", e);
798        }
799    }
800
801    pub async fn resolve_intercept(
802        &self,
803        key: String,
804        result: InterceptionResult,
805    ) -> Result<(), String> {
806        let (tx, rx) = oneshot::channel();
807        if let Err(e) = self
808            .intercept_broker
809            .send(InterceptBrokerMessage::ResolveIntercept {
810                key,
811                result,
812                respond_to: tx,
813            })
814            .await
815        {
816            error!("Failed to send ResolveIntercept request: {}", e);
817            return Err(e.to_string());
818        }
819        rx.await.map_err(|_| "Actor dropped".to_string())?
820    }
821
822    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
823        let (tx, rx) = oneshot::channel();
824        if let Err(e) = self
825            .intercept_broker
826            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
827                key,
828                respond_to: tx,
829            })
830            .await
831        {
832            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
833            return None;
834        }
835        rx.await
836            .map_err(|e| {
837                error!("Failed to receive WebSocketMessage response: {}", e);
838                e
839            })
840            .unwrap_or(None)
841    }
842
843    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
844        if let Err(e) = self
845            .intercept_broker
846            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
847            .await
848        {
849            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
850        }
851    }
852
853    pub async fn is_intercept_pending(&self, key: String) -> bool {
854        let (tx, rx) = oneshot::channel();
855        if let Err(e) = self
856            .intercept_broker
857            .send(InterceptBrokerMessage::GetPendingIntercept {
858                key,
859                respond_to: tx,
860            })
861            .await
862        {
863            error!("Failed to send GetPendingIntercept request: {}", e);
864            return false;
865        }
866        rx.await
867            .map_err(|e| {
868                error!("Failed to receive PendingIntercept response: {}", e);
869                e
870            })
871            .unwrap_or(false)
872    }
873
874    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
875        let (tx, rx) = oneshot::channel();
876        if let Err(e) = self
877            .intercept_broker
878            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
879                flow_id,
880                respond_to: tx,
881            })
882            .await
883        {
884            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
885            return false;
886        }
887        rx.await
888            .map_err(|e| {
889                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
890                e
891            })
892            .unwrap_or(false)
893    }
894
895    pub fn upsert_flow(&self, flow: Box<Flow>) {
896        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
897            // Drop flow if channel is full
898            error!("FlowStore dropped flow: {}", e);
899            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
900        }
901    }
902
903    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
904        if let Err(e) = self
905            .flow_store
906            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
907        {
908            error!("FlowStore dropped WS message: {}", e);
909            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
910        }
911    }
912
913    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
914        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
915            flow_id,
916            body,
917            direction,
918        }) {
919            error!("FlowStore dropped HTTP body: {}", e);
920            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
921        }
922    }
923
924    /// P1: Tag a flow as budget-exceeded (body too large for full rule inspection)
925    pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
926        if let Err(e) = self
927            .flow_store
928            .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
929        {
930            error!("FlowStore dropped budget-exceeded tag: {}", e);
931            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
932        }
933    }
934
935    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
936    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
937        self.flow_broadcast_tx.subscribe()
938    }
939
940    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
941        self.audit_broadcast_tx.subscribe()
942    }
943
944    fn current_redaction_policy(&self) -> RedactionPolicy {
945        self.policy_tx.borrow().redaction.clone()
946    }
947
948    pub fn record_flow_events_lagged(&self, skipped: u64) {
949        self.flow_events_lagged_total
950            .fetch_add(skipped as usize, Ordering::Relaxed);
951    }
952
953    pub fn record_audit_events_lagged(&self, skipped: u64) {
954        self.audit_events_lagged_total
955            .fetch_add(skipped as usize, Ordering::Relaxed);
956    }
957
958    pub fn lifecycle(&self) -> RuntimeLifecycle {
959        self.lifecycle_tx.borrow().clone()
960    }
961
962    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
963        self.lifecycle_tx.subscribe()
964    }
965
966    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
967        let current = self.lifecycle();
968        if current.is_active() {
969            return Err(format!(
970                "Proxy is already {} on port {}",
971                current.phase.as_str(),
972                current.port.unwrap_or(port)
973            ));
974        }
975
976        let mut guard = self
977            .shutdown_tx
978            .lock()
979            .map_err(|_| "shutdown state poisoned".to_string())?;
980        *guard = Some(shutdown_tx);
981        drop(guard);
982
983        self.update_lifecycle(RuntimeLifecycle {
984            phase: RuntimeLifecyclePhase::Starting,
985            port: Some(port),
986            started_at_ms: None,
987            last_error: None,
988        });
989        Ok(())
990    }
991
992    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
993        let mut guard = self
994            .shutdown_tx
995            .lock()
996            .map_err(|_| "shutdown state poisoned".to_string())?;
997        let Some(tx) = guard.take() else {
998            return Ok(ProxyStopResult::NotRunning);
999        };
1000        drop(guard);
1001
1002        let current = self.lifecycle();
1003        self.update_lifecycle(RuntimeLifecycle {
1004            phase: RuntimeLifecyclePhase::Stopping,
1005            port: current.port,
1006            started_at_ms: current.started_at_ms,
1007            last_error: current.last_error,
1008        });
1009        let _ = tx.send(());
1010        Ok(ProxyStopResult::Stopping)
1011    }
1012
1013    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1014        self.audit_history
1015            .lock()
1016            .map(|events| events.iter().cloned().collect())
1017            .unwrap_or_default()
1018    }
1019
1020    /// 搜索内存中的 Flow 列表,返回轻量摘要。
1021    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1022        let redaction = self.current_redaction_policy();
1023        if let Some(store) = &self.store {
1024            return store
1025                .query_flow_summaries(&query)
1026                .await
1027                .unwrap_or_default()
1028                .into_iter()
1029                .map(|summary| redact_flow_summary(summary, &redaction))
1030                .collect();
1031        }
1032        let (tx, rx) = oneshot::channel();
1033        if let Err(e) = self
1034            .flow_store
1035            .send(FlowStoreMessage::SearchFlows {
1036                query,
1037                respond_to: tx,
1038            })
1039            .await
1040        {
1041            error!("Failed to send SearchFlows request: {}", e);
1042            return Vec::new();
1043        }
1044        rx.await
1045            .unwrap_or_default()
1046            .into_iter()
1047            .map(|summary| redact_flow_summary(summary, &redaction))
1048            .collect()
1049    }
1050
1051    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1052        let redaction = self.current_redaction_policy();
1053        redact_flow_update(update, &redaction)
1054    }
1055
1056    /// 解除截获并可选地应用修改。
1057    ///
1058    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1059    /// - 若 `mods` 为 None,直接 Continue;
1060    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1061    ///
1062    /// `key` 格式约定:
1063    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1064    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1065    pub async fn resolve_intercept_with_modifications(
1066        &self,
1067        key: String,
1068        action: &str,
1069        mods: Option<relay_core_api::modification::FlowModification>,
1070    ) -> Result<(), String> {
1071        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1072            .await
1073    }
1074
1075    pub async fn resolve_intercept_with_modifications_from(
1076        &self,
1077        actor: AuditActor,
1078        key: String,
1079        action: &str,
1080        mods: Option<relay_core_api::modification::FlowModification>,
1081    ) -> Result<(), String> {
1082        let modified_fields = modification_field_names(mods.as_ref());
1083        let result = match action {
1084            "drop" => InterceptionResult::Drop,
1085            _ => match mods {
1086                None => InterceptionResult::Continue,
1087                Some(m) => {
1088                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1089                    match parts.as_slice() {
1090                        [flow_id, phase] => {
1091                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1092                                modification::apply_flow_modification(&flow, phase, m)
1093                            } else {
1094                                InterceptionResult::Continue
1095                            }
1096                        }
1097                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1098                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1099                        [_, "ws_msg", _] => {
1100                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1101                                modification::apply_ws_modification(&msg, m)
1102                            } else {
1103                                InterceptionResult::Continue
1104                            }
1105                        }
1106                        _ => InterceptionResult::Continue,
1107                    }
1108                }
1109            },
1110        };
1111        let outcome = self.resolve_intercept(key.clone(), result).await;
1112        let audit_outcome = if outcome.is_ok() {
1113            AuditOutcome::Success
1114        } else {
1115            AuditOutcome::Failed
1116        };
1117        let error_message = outcome.as_ref().err().cloned();
1118
1119        self.record_audit_event(AuditEvent::new(
1120            actor,
1121            AuditEventKind::InterceptResolved,
1122            key,
1123            audit_outcome,
1124            json!({
1125                "action": action,
1126                "has_modifications": !modified_fields.is_empty(),
1127                "modified_fields": modified_fields,
1128                "error": error_message
1129            }),
1130        ));
1131        outcome
1132    }
1133
1134    #[cfg(feature = "script")]
1135    pub async fn load_script_from(
1136        &self,
1137        actor: AuditActor,
1138        target: String,
1139        script: &str,
1140    ) -> Result<(), String> {
1141        let result = self
1142            .script_interceptor
1143            .load_script(script)
1144            .await
1145            .map_err(|e| e.to_string());
1146
1147        let outcome = if result.is_ok() {
1148            AuditOutcome::Success
1149        } else {
1150            AuditOutcome::Failed
1151        };
1152        let error_message = result.as_ref().err().cloned();
1153
1154        self.record_audit_event(AuditEvent::new(
1155            actor,
1156            AuditEventKind::ScriptReloaded,
1157            target,
1158            outcome,
1159            json!({
1160                "script_bytes": script.len(),
1161                "error": error_message
1162            }),
1163        ));
1164
1165        result
1166    }
1167
1168    /// S5: Set the env var whitelist for relay.env() in user scripts.
1169    #[cfg(feature = "script")]
1170    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1171        self.script_interceptor.set_env_allow(env_allow).await;
1172    }
1173
1174    fn record_audit_event(&self, event: AuditEvent) {
1175        const AUDIT_HISTORY_LIMIT: usize = 200;
1176        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1177        if event.outcome == AuditOutcome::Failed {
1178            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1179        }
1180
1181        let details = log_format::audit_details_log(&event.kind, &event.details);
1182        tracing::info!(
1183            target: "relay_core_audit",
1184            event_id = %event.id,
1185            actor = %event.actor.as_str(),
1186            kind = %event.kind.as_str(),
1187            target = %event.target,
1188            outcome = %event.outcome.as_str(),
1189            details = %details
1190        );
1191
1192        if let Ok(mut history) = self.audit_history.lock() {
1193            if history.len() >= AUDIT_HISTORY_LIMIT {
1194                history.pop_front();
1195            }
1196            history.push_back(event.clone());
1197        }
1198
1199        if let Some(store) = self.store.clone() {
1200            let event_json = serde_json::to_value(&event).unwrap_or_default();
1201            let event_id = event.id.clone();
1202            let timestamp_ms = event.timestamp_ms;
1203            let actor = event.actor.as_str().to_string();
1204            let kind = event.kind.as_str().to_string();
1205            let target = event.target.clone();
1206            let outcome = event.outcome.as_str().to_string();
1207            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1208                handle.spawn(async move {
1209                    if let Err(e) = store
1210                        .save_audit_event(AuditEventRecord {
1211                            id: &event_id,
1212                            timestamp_ms,
1213                            actor: &actor,
1214                            kind: &kind,
1215                            target: &target,
1216                            outcome: &outcome,
1217                            content: &event_json,
1218                        })
1219                        .await
1220                    {
1221                        tracing::error!("Failed to persist audit event: {}", e);
1222                    }
1223                });
1224            }
1225        }
1226
1227        let _ = self.audit_broadcast_tx.send(event);
1228    }
1229
1230    fn transition_to_running(&self, port: u16) {
1231        self.update_lifecycle(RuntimeLifecycle {
1232            phase: RuntimeLifecyclePhase::Running,
1233            port: Some(port),
1234            started_at_ms: Some(now_unix_ms()),
1235            last_error: None,
1236        });
1237    }
1238
1239    fn transition_to_stopped(&self) {
1240        if let Ok(mut guard) = self.shutdown_tx.lock() {
1241            *guard = None;
1242        }
1243
1244        self.update_lifecycle(RuntimeLifecycle {
1245            phase: RuntimeLifecyclePhase::Stopped,
1246            port: None,
1247            started_at_ms: None,
1248            last_error: None,
1249        });
1250    }
1251
1252    fn transition_to_failed(&self, port: u16, error: String) {
1253        if let Ok(mut guard) = self.shutdown_tx.lock() {
1254            *guard = None;
1255        }
1256
1257        self.update_lifecycle(RuntimeLifecycle {
1258            phase: RuntimeLifecyclePhase::Failed,
1259            port: Some(port),
1260            started_at_ms: None,
1261            last_error: Some(error),
1262        });
1263    }
1264
1265    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1266        tracing::info!(
1267            target: "relay_core_lifecycle",
1268            phase = %lifecycle.phase.as_str(),
1269            port = ?lifecycle.port,
1270            started_at_ms = ?lifecycle.started_at_ms,
1271            last_error = ?lifecycle.last_error,
1272            "{}",
1273            log_format::lifecycle_log_line(&lifecycle),
1274        );
1275        self.lifecycle_tx.send_replace(lifecycle);
1276    }
1277
1278    pub fn spawn_proxy(
1279        self: &Arc<Self>,
1280        config: ProxyConfig,
1281        sink: mpsc::Sender<FlowUpdate>,
1282        extra_interceptor: Option<Arc<dyn Interceptor>>,
1283    ) -> Result<ProxySpawnResult, String> {
1284        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1285        match self.prepare_start(config.port, shutdown_tx) {
1286            Ok(()) => {}
1287            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1288            Err(error) => return Err(error),
1289        }
1290        let state = self.clone();
1291        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1292            if let Err(error) = state
1293                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1294                .await
1295            {
1296                error!("Proxy failed: {}", error);
1297            }
1298        })))
1299    }
1300
1301    pub async fn start_proxy(
1302        self: &Arc<Self>,
1303        config: ProxyConfig,
1304        sink: mpsc::Sender<FlowUpdate>,
1305        extra_interceptor: Option<Arc<dyn Interceptor>>,
1306    ) -> Result<(), String> {
1307        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1308        self.prepare_start(config.port, shutdown_tx)?;
1309        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1310            .await
1311    }
1312
1313    async fn run_proxy(
1314        self: &Arc<Self>,
1315        config: ProxyConfig,
1316        sink: mpsc::Sender<FlowUpdate>,
1317        extra_interceptor: Option<Arc<dyn Interceptor>>,
1318        shutdown_rx: oneshot::Receiver<()>,
1319    ) -> Result<(), String> {
1320        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1321        let state = self.clone();
1322
1323        if let Some(parent) = config.ca_cert_path.parent()
1324            && !parent.exists()
1325        {
1326            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1327        }
1328
1329        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1330            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1331        let ca = Arc::new(ca);
1332
1333        #[cfg(feature = "script")]
1334        let script_interceptor = self.script_interceptor.clone();
1335
1336        #[cfg(feature = "script")]
1337        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1338
1339        #[cfg(not(feature = "script"))]
1340        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1341
1342        interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1343            self.clone(),
1344        )));
1345
1346        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1347            self.clone(),
1348        )));
1349
1350        if let Some(interceptor) = extra_interceptor {
1351            interceptors.push(interceptor);
1352        }
1353
1354        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1355        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1356
1357        tokio::spawn(async move {
1358            while let Some(update) = proxy_rx.recv().await {
1359                match update.clone() {
1360                    FlowUpdate::Full(flow) => {
1361                        state.upsert_flow(flow);
1362                    }
1363                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1364                        state.append_ws_message(flow_id, message);
1365                    }
1366                    FlowUpdate::HttpBody {
1367                        flow_id,
1368                        direction,
1369                        body,
1370                    } => {
1371                        state.update_http_body(flow_id, body, direction);
1372                    }
1373                    FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1374                        // P1: Tag the flow as budget-exceeded and update resilience trace
1375                        state.tag_flow_budget_exceeded(flow_id, direction);
1376                    }
1377                }
1378
1379                let _ = state.flow_broadcast_tx.send(update.clone());
1380
1381                if sink.try_send(update).is_err() {
1382                    relay_core_lib::metrics::inc_flows_dropped();
1383                }
1384            }
1385        });
1386
1387        if let Some(udp_port) = config.udp_tproxy_port {
1388            let udp_proxy_tx = proxy_tx.clone();
1389            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1390            tokio::spawn(async move {
1391                match tokio::net::UdpSocket::bind(udp_addr).await {
1392                    Ok(socket) => {
1393                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1394                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1395                            error!("UDP TPROXY failed: {}", e);
1396                        }
1397                    }
1398                    Err(e) => {
1399                        error!("Failed to bind UDP TPROXY socket: {}", e);
1400                    }
1401                }
1402            });
1403        }
1404
1405        let listener = match TcpListener::bind(addr).await {
1406            Ok(listener) => listener,
1407            Err(e) => {
1408                if let Ok(mut guard) = self.shutdown_tx.lock() {
1409                    *guard = None;
1410                }
1411                let message = format!("Failed to bind to address {}: {}", addr, e);
1412                self.transition_to_failed(config.port, message.clone());
1413                return Err(message);
1414            }
1415        };
1416
1417        self.transition_to_running(config.port);
1418        let shutdown_rx = Some(shutdown_rx);
1419
1420        if config.transparent {
1421            let policy = ProxyPolicy {
1422                transparent_enabled: true,
1423                ..Default::default()
1424            };
1425            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1426            let policy_rx = self.policy_tx.subscribe();
1427
1428            let provider: Arc<dyn OriginalDstProvider> = {
1429                let mut addrs = BTreeSet::new();
1430                if let Ok(local) = listener.local_addr() {
1431                    addrs.insert(local);
1432                }
1433
1434                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1435                {
1436                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1437                }
1438                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1439                {
1440                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1441                        Ok(provider) => Arc::new(provider),
1442                        Err(e) => {
1443                            error!("Failed to initialize macOS PF provider: {}", e);
1444                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1445                        }
1446                    }
1447                }
1448                #[cfg(target_os = "windows")]
1449                {
1450                    let filter =
1451                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1452                            .to_string();
1453                    let port = config.port;
1454                    tokio::spawn(async move {
1455                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1456                            .await;
1457                    });
1458
1459                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1460                }
1461                #[cfg(not(any(
1462                    all(target_os = "linux", feature = "transparent-linux"),
1463                    all(target_os = "macos", feature = "transparent-macos"),
1464                    target_os = "windows"
1465                )))]
1466                {
1467                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1468                }
1469            };
1470
1471            let source = TransparentTcpCaptureSource::new(listener, provider);
1472            let result = relay_core_lib::start_proxy(
1473                source,
1474                proxy_tx,
1475                interceptor,
1476                ca,
1477                policy_rx,
1478                None,
1479                shutdown_rx,
1480                Some(self.get_rule_engine().await),
1481            )
1482            .await
1483            .map_err(|e| e.to_string());
1484            if let Err(error) = &result {
1485                self.transition_to_failed(config.port, error.clone());
1486            } else {
1487                self.transition_to_stopped();
1488            }
1489            result
1490        } else {
1491            let source = TcpCaptureSource::new(listener);
1492            let policy = ProxyPolicy::default();
1493            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1494            let policy_rx = self.policy_tx.subscribe();
1495
1496            let result = relay_core_lib::start_proxy(
1497                source,
1498                proxy_tx,
1499                interceptor,
1500                ca,
1501                policy_rx,
1502                None,
1503                shutdown_rx,
1504                Some(self.get_rule_engine().await),
1505            )
1506            .await
1507            .map_err(|e| e.to_string());
1508            if let Err(error) = &result {
1509                self.transition_to_failed(config.port, error.clone());
1510            } else {
1511                self.transition_to_stopped();
1512            }
1513            result
1514        }
1515    }
1516}
1517
1518fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1519    match update {
1520        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1521        FlowUpdate::WebSocketMessage {
1522            flow_id,
1523            mut message,
1524        } => {
1525            message.content = redact_body(message.content, redaction);
1526            FlowUpdate::WebSocketMessage { flow_id, message }
1527        }
1528        FlowUpdate::HttpBody {
1529            flow_id,
1530            direction,
1531            body,
1532        } => FlowUpdate::HttpBody {
1533            flow_id,
1534            direction,
1535            body: redact_body(body, redaction),
1536        },
1537        FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1538            FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1539        }
1540    }
1541}
1542
1543fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1544    if !redaction.enabled {
1545        return flow;
1546    }
1547    match &mut flow.layer {
1548        Layer::Http(http) => {
1549            redact_http_request(&mut http.request, redaction);
1550            if let Some(response) = &mut http.response {
1551                redact_headers(&mut response.headers, redaction);
1552                response.body = response
1553                    .body
1554                    .take()
1555                    .map(|body| redact_body(body, redaction));
1556            }
1557        }
1558        Layer::WebSocket(ws) => {
1559            redact_http_request(&mut ws.handshake_request, redaction);
1560            redact_headers(&mut ws.handshake_response.headers, redaction);
1561            ws.handshake_response.body = ws
1562                .handshake_response
1563                .body
1564                .take()
1565                .map(|body| redact_body(body, redaction));
1566            for message in &mut ws.messages {
1567                message.content = redact_body(message.content.clone(), redaction);
1568            }
1569        }
1570        _ => {}
1571    }
1572    flow
1573}
1574
1575fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1576    if !redaction.enabled {
1577        return summary;
1578    }
1579    summary.url = redact_url_string(&summary.url, redaction);
1580    summary
1581}
1582
1583fn redact_http_request(
1584    request: &mut relay_core_api::flow::HttpRequest,
1585    redaction: &RedactionPolicy,
1586) {
1587    redact_headers(&mut request.headers, redaction);
1588    redact_query_pairs(&mut request.query, redaction);
1589    request.url = redact_url(&request.url, redaction);
1590    request.body = request.body.take().map(|body| redact_body(body, redaction));
1591}
1592
1593fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1594    if !redaction.enabled {
1595        return;
1596    }
1597    let sensitive = redaction_set(&redaction.sensitive_header_names);
1598    for (name, value) in headers.iter_mut() {
1599        if sensitive.contains(&name.to_ascii_lowercase()) {
1600            *value = "[REDACTED]".to_string();
1601        }
1602    }
1603}
1604
1605fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1606    if !redaction.enabled {
1607        return;
1608    }
1609    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1610    for (name, value) in query.iter_mut() {
1611        if sensitive.contains(&name.to_ascii_lowercase()) {
1612            *value = "[REDACTED]".to_string();
1613        }
1614    }
1615}
1616
1617fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1618    if !redaction.enabled {
1619        return url.clone();
1620    }
1621    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1622    let pairs: Vec<(String, String)> = url
1623        .query_pairs()
1624        .map(|(k, v)| {
1625            let key = k.to_string();
1626            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1627                "[REDACTED]".to_string()
1628            } else {
1629                v.to_string()
1630            };
1631            (key, value)
1632        })
1633        .collect();
1634    let mut next = url.clone();
1635    if pairs.is_empty() {
1636        return next;
1637    }
1638    next.query_pairs_mut().clear();
1639    for (k, v) in pairs {
1640        next.query_pairs_mut().append_pair(&k, &v);
1641    }
1642    next
1643}
1644
1645fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1646    match url::Url::parse(input) {
1647        Ok(url) => redact_url(&url, redaction).to_string(),
1648        Err(_) => input.to_string(),
1649    }
1650}
1651
1652fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1653    if redaction.enabled && redaction.redact_bodies {
1654        body.content = "[REDACTED]".to_string();
1655    }
1656    body
1657}
1658
1659fn redaction_set(values: &[String]) -> HashSet<String> {
1660    values
1661        .iter()
1662        .map(|value| value.to_ascii_lowercase())
1663        .collect()
1664}
1665
1666#[derive(Clone)]
1667pub struct ProxyConfig {
1668    pub port: u16,
1669    pub ca_cert_path: std::path::PathBuf,
1670    pub ca_key_path: std::path::PathBuf,
1671    pub transparent: bool,
1672    pub udp_tproxy_port: Option<u16>,
1673}
1674
1675impl ProxyConfig {
1676    pub fn new(
1677        port: u16,
1678        ca_cert_path: std::path::PathBuf,
1679        ca_key_path: std::path::PathBuf,
1680    ) -> Self {
1681        Self {
1682            port,
1683            ca_cert_path,
1684            ca_key_path,
1685            transparent: false,
1686            udp_tproxy_port: None,
1687        }
1688    }
1689
1690    pub fn from_app_data_dir(
1691        app_data_dir: impl Into<std::path::PathBuf>,
1692        port: u16,
1693    ) -> Result<Self, String> {
1694        let app_data_dir = app_data_dir.into();
1695        if !app_data_dir.exists() {
1696            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1697        }
1698
1699        Ok(Self::new(
1700            port,
1701            app_data_dir.join("ca_cert.pem"),
1702            app_data_dir.join("ca_key.pem"),
1703        ))
1704    }
1705
1706    pub fn with_transparent(mut self, transparent: bool) -> Self {
1707        self.transparent = transparent;
1708        self
1709    }
1710
1711    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1712        self.udp_tproxy_port = udp_tproxy_port;
1713        self
1714    }
1715}
1716
1717fn now_unix_ms() -> u64 {
1718    SystemTime::now()
1719        .duration_since(UNIX_EPOCH)
1720        .unwrap_or_default()
1721        .as_millis() as u64
1722}
1723
1724fn modification_field_names(
1725    mods: Option<&relay_core_api::modification::FlowModification>,
1726) -> Vec<&'static str> {
1727    let Some(mods) = mods else {
1728        return Vec::new();
1729    };
1730
1731    let mut fields = Vec::new();
1732    if mods.method.is_some() {
1733        fields.push("method");
1734    }
1735    if mods.url.is_some() {
1736        fields.push("url");
1737    }
1738    if mods.request_headers.is_some() {
1739        fields.push("request_headers");
1740    }
1741    if mods.request_body.is_some() {
1742        fields.push("request_body");
1743    }
1744    if mods.status_code.is_some() {
1745        fields.push("status_code");
1746    }
1747    if mods.response_headers.is_some() {
1748        fields.push("response_headers");
1749    }
1750    if mods.response_body.is_some() {
1751        fields.push("response_body");
1752    }
1753    if mods.message_content.is_some() {
1754        fields.push("message_content");
1755    }
1756    fields
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761    use super::*;
1762    use chrono::Utc;
1763    use relay_core_api::flow::{
1764        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1765        ResponseTiming, TransportProtocol,
1766    };
1767    use std::sync::atomic::{AtomicU64, Ordering};
1768    use tokio::time::{Duration, sleep};
1769    use url::Url;
1770    use uuid::Uuid;
1771
1772    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1773
1774    fn sqlite_url() -> String {
1775        let nanos = SystemTime::now()
1776            .duration_since(UNIX_EPOCH)
1777            .expect("clock drift")
1778            .as_nanos();
1779        let pid = std::process::id();
1780        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1781        let db_dir = std::env::current_dir()
1782            .expect("cwd")
1783            .join("target")
1784            .join("test-dbs");
1785        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1786        let db_path = db_dir.join(format!(
1787            "relay-core-runtime-test-{}-{}-{}.db",
1788            pid, nanos, seq
1789        ));
1790        format!("sqlite://{}?mode=rwc", db_path.display())
1791    }
1792
1793    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1794        let start_time =
1795            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1796        let request_url =
1797            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1798        Flow {
1799            id: Uuid::new_v4(),
1800            start_time,
1801            end_time: Some(start_time),
1802            network: NetworkInfo {
1803                client_ip: "127.0.0.1".to_string(),
1804                client_port: 12000,
1805                server_ip: "127.0.0.1".to_string(),
1806                server_port: 8080,
1807                protocol: TransportProtocol::TCP,
1808                tls: false,
1809                tls_version: None,
1810                sni: None,
1811            },
1812            layer: Layer::Http(HttpLayer {
1813                request: HttpRequest {
1814                    method: method.to_string(),
1815                    url: request_url,
1816                    version: "HTTP/1.1".to_string(),
1817                    headers: vec![],
1818                    cookies: vec![],
1819                    query: vec![],
1820                    body: None,
1821                },
1822                response: Some(HttpResponse {
1823                    status,
1824                    status_text: "OK".to_string(),
1825                    version: "HTTP/1.1".to_string(),
1826                    headers: vec![],
1827                    cookies: vec![],
1828                    body: None,
1829                    timing: ResponseTiming {
1830                        time_to_first_byte: None,
1831                        time_to_last_byte: None,
1832                        connect_time_ms: None,
1833                        ssl_time_ms: None,
1834                    },
1835                }),
1836                error: None,
1837            }),
1838            tags: vec![],
1839            meta: std::collections::HashMap::new(),
1840            resilience_trace: None,
1841            rule_variables: std::collections::HashMap::new(),
1842            matched_rules: vec![],
1843        }
1844    }
1845
1846    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1847        let start_time =
1848            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1849        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1850            .expect("url should parse");
1851        Flow {
1852            id: Uuid::new_v4(),
1853            start_time,
1854            end_time: Some(start_time),
1855            network: NetworkInfo {
1856                client_ip: "127.0.0.1".to_string(),
1857                client_port: 12000,
1858                server_ip: "127.0.0.1".to_string(),
1859                server_port: 8080,
1860                protocol: TransportProtocol::TCP,
1861                tls: false,
1862                tls_version: None,
1863                sni: None,
1864            },
1865            layer: Layer::Http(HttpLayer {
1866                request: HttpRequest {
1867                    method: "GET".to_string(),
1868                    url: request_url,
1869                    version: "HTTP/1.1".to_string(),
1870                    headers: vec![
1871                        (
1872                            "Authorization".to_string(),
1873                            "Bearer secret-token".to_string(),
1874                        ),
1875                        ("X-Normal".to_string(), "visible".to_string()),
1876                    ],
1877                    cookies: vec![],
1878                    query: vec![
1879                        ("token".to_string(), "abc123".to_string()),
1880                        ("ok".to_string(), "1".to_string()),
1881                    ],
1882                    body: Some(BodyData {
1883                        encoding: "utf-8".to_string(),
1884                        content: "secret request body".to_string(),
1885                        size: 19,
1886                    }),
1887                },
1888                response: Some(HttpResponse {
1889                    status: 200,
1890                    status_text: "OK".to_string(),
1891                    version: "HTTP/1.1".to_string(),
1892                    headers: vec![
1893                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1894                        ("X-Response".to_string(), "visible".to_string()),
1895                    ],
1896                    cookies: vec![],
1897                    body: Some(BodyData {
1898                        encoding: "utf-8".to_string(),
1899                        content: "secret response body".to_string(),
1900                        size: 20,
1901                    }),
1902                    timing: ResponseTiming {
1903                        time_to_first_byte: None,
1904                        time_to_last_byte: None,
1905                        connect_time_ms: None,
1906                        ssl_time_ms: None,
1907                    },
1908                }),
1909                error: None,
1910            }),
1911            tags: vec![],
1912            meta: std::collections::HashMap::new(),
1913            resilience_trace: None,
1914            rule_variables: std::collections::HashMap::new(),
1915            matched_rules: vec![],
1916        }
1917    }
1918
1919    #[tokio::test]
1920    async fn set_rules_from_records_audit_event() {
1921        let state = CoreState::new(None).await;
1922
1923        state
1924            .set_rules_from(
1925                AuditActor::Http,
1926                "rule.upsert",
1927                "rule-1".to_string(),
1928                json!({ "route": "/api/v1/rules" }),
1929                Vec::new(),
1930            )
1931            .await
1932            .expect("set rules should succeed");
1933
1934        let events = state.recent_audit_events();
1935        let event = events.last().expect("audit event should exist");
1936        assert_eq!(event.actor, AuditActor::Http);
1937        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1938        assert_eq!(event.outcome, AuditOutcome::Success);
1939        assert_eq!(event.target, "rule-1");
1940        assert_eq!(event.details["operation"], "rule.upsert");
1941        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1942    }
1943
1944    #[tokio::test]
1945    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1946        let state = CoreState::new(None).await;
1947
1948        state
1949            .upsert_rule_from(
1950                AuditActor::Probe,
1951                "rule.upsert",
1952                "rule-1".to_string(),
1953                json!({ "tool": "set_rule" }),
1954                Rule {
1955                    id: "rule-1".to_string(),
1956                    name: "first".to_string(),
1957                    active: true,
1958                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1959                    priority: 1,
1960                    termination: relay_core_lib::rule::RuleTermination::Continue,
1961                    filter: relay_core_lib::rule::Filter::Url(
1962                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1963                    ),
1964                    actions: vec![],
1965                    constraints: None,
1966                },
1967            )
1968            .await
1969            .expect("initial upsert should succeed");
1970
1971        state
1972            .upsert_rule_from(
1973                AuditActor::Probe,
1974                "rule.upsert",
1975                "rule-1".to_string(),
1976                json!({ "tool": "set_rule" }),
1977                Rule {
1978                    id: "rule-1".to_string(),
1979                    name: "second".to_string(),
1980                    active: true,
1981                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1982                    priority: 2,
1983                    termination: relay_core_lib::rule::RuleTermination::Continue,
1984                    filter: relay_core_lib::rule::Filter::Url(
1985                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1986                    ),
1987                    actions: vec![],
1988                    constraints: None,
1989                },
1990            )
1991            .await
1992            .expect("replacement upsert should succeed");
1993
1994        let rules = state.get_rules().await;
1995        assert_eq!(rules.len(), 1);
1996        assert_eq!(rules[0].name, "second");
1997        let event = state
1998            .recent_audit_events()
1999            .last()
2000            .cloned()
2001            .expect("audit event");
2002        assert_eq!(event.details["operation"], "rule.upsert");
2003        assert_eq!(event.details["details"]["tool"], "set_rule");
2004    }
2005
2006    #[tokio::test]
2007    async fn delete_rule_from_returns_false_when_rule_missing() {
2008        let state = CoreState::new(None).await;
2009
2010        let deleted = state
2011            .delete_rule_from(
2012                AuditActor::Http,
2013                "rule.delete",
2014                "missing".to_string(),
2015                json!({ "route": "/api/v1/rules/{id}" }),
2016                "missing",
2017            )
2018            .await
2019            .expect("delete should not fail");
2020
2021        assert!(!deleted);
2022        assert!(state.recent_audit_events().is_empty());
2023    }
2024
2025    #[tokio::test]
2026    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2027        let state = CoreState::new(None).await;
2028
2029        let rule_id = state
2030            .create_mock_response_rule_from(
2031                AuditActor::Http,
2032                "api-mock-1".to_string(),
2033                json!({ "route": "/api/v1/mock", "status": 201 }),
2034                MockResponseRuleConfig {
2035                    rule_id: "api-mock-1".to_string(),
2036                    url_pattern: "example.com".to_string(),
2037                    name: "api-mock:example.com".to_string(),
2038                    status: 201,
2039                    content_type: "application/json".to_string(),
2040                    body: "{\"ok\":true}".to_string(),
2041                },
2042            )
2043            .await
2044            .expect("mock rule should be created");
2045
2046        assert_eq!(rule_id, "api-mock-1");
2047        let rules = state.get_rules().await;
2048        assert_eq!(rules.len(), 1);
2049        assert_eq!(rules[0].id, "api-mock-1");
2050        let event = state
2051            .recent_audit_events()
2052            .last()
2053            .cloned()
2054            .expect("audit event");
2055        assert_eq!(event.details["operation"], "rule.mock_create");
2056        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2057    }
2058
2059    #[tokio::test]
2060    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2061        let state = CoreState::new(None).await;
2062
2063        let rule_id = state
2064            .create_intercept_rule_from(
2065                AuditActor::Http,
2066                "intercept-1".to_string(),
2067                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2068                InterceptRuleConfig {
2069                    rule_id: "intercept-1".to_string(),
2070                    active: true,
2071                    url_pattern: "example.com".to_string(),
2072                    method: None,
2073                    phase: "request".to_string(),
2074                    name: "api-intercept:example.com".to_string(),
2075                    priority: 100,
2076                    termination: relay_core_lib::rule::RuleTermination::Stop,
2077                },
2078            )
2079            .await
2080            .expect("intercept rule should be created");
2081
2082        assert_eq!(rule_id, "intercept-1");
2083        let rules = state.get_rules().await;
2084        assert_eq!(rules.len(), 1);
2085        assert_eq!(rules[0].id, "intercept-1");
2086        assert_eq!(rules[0].name, "api-intercept:example.com");
2087        let event = state
2088            .recent_audit_events()
2089            .last()
2090            .cloned()
2091            .expect("audit event");
2092        assert_eq!(event.details["operation"], "rule.intercept_create");
2093        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2094    }
2095
2096    #[tokio::test]
2097    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2098        let state = CoreState::new(None).await;
2099
2100        state
2101            .upsert_legacy_intercept_rule_from(
2102                AuditActor::Tauri,
2103                "legacy-1".to_string(),
2104                json!({ "command": "set_intercept_rule" }),
2105                InterceptRule {
2106                    id: "legacy-1".to_string(),
2107                    active: true,
2108                    url_pattern: "example.com".to_string(),
2109                    method: None,
2110                    phase: "both".to_string(),
2111                },
2112            )
2113            .await
2114            .expect("initial family upsert should succeed");
2115        assert_eq!(state.get_rules().await.len(), 2);
2116
2117        state
2118            .upsert_legacy_intercept_rule_from(
2119                AuditActor::Tauri,
2120                "legacy-1".to_string(),
2121                json!({ "command": "set_intercept_rule" }),
2122                InterceptRule {
2123                    id: "legacy-1".to_string(),
2124                    active: true,
2125                    url_pattern: "example.org".to_string(),
2126                    method: Some("POST".to_string()),
2127                    phase: "request".to_string(),
2128                },
2129            )
2130            .await
2131            .expect("replacement family upsert should succeed");
2132
2133        let rules = state.get_rules().await;
2134        assert_eq!(rules.len(), 1);
2135        assert_eq!(rules[0].id, "legacy-1");
2136        let event = state
2137            .recent_audit_events()
2138            .last()
2139            .cloned()
2140            .expect("audit event");
2141        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2142        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2143    }
2144
2145    #[tokio::test]
2146    async fn resolve_intercept_failure_records_failed_audit_event() {
2147        let state = CoreState::new(None).await;
2148
2149        let result = state
2150            .resolve_intercept_with_modifications_from(
2151                AuditActor::Probe,
2152                "missing-flow:request".to_string(),
2153                "drop",
2154                None,
2155            )
2156            .await;
2157
2158        assert!(result.is_err());
2159        let events = state.recent_audit_events();
2160        let event = events.last().expect("audit event should exist");
2161        assert_eq!(event.actor, AuditActor::Probe);
2162        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2163        assert_eq!(event.outcome, AuditOutcome::Failed);
2164        assert_eq!(event.details["action"], "drop");
2165        assert!(
2166            event.details["error"]
2167                .as_str()
2168                .unwrap_or_default()
2169                .contains("Interception not found")
2170        );
2171    }
2172
2173    #[tokio::test]
2174    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2175        let state = CoreState::new(None).await;
2176        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2177
2178        state
2179            .prepare_start(8080, shutdown_tx)
2180            .expect("prepare start should succeed");
2181        let lifecycle = state.lifecycle();
2182        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2183        assert_eq!(lifecycle.port, Some(8080));
2184        assert!(lifecycle.started_at_ms.is_none());
2185        assert!(lifecycle.last_error.is_none());
2186
2187        assert_eq!(
2188            state.stop_proxy().expect("stop should succeed"),
2189            ProxyStopResult::Stopping
2190        );
2191        let lifecycle = state.lifecycle();
2192        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2193        assert_eq!(lifecycle.port, Some(8080));
2194        assert!(shutdown_rx.await.is_ok());
2195    }
2196
2197    #[tokio::test]
2198    async fn status_snapshot_derives_runtime_facing_fields() {
2199        let state = CoreState::new(None).await;
2200        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2201        state
2202            .prepare_start(8080, shutdown_tx)
2203            .expect("prepare start should succeed");
2204
2205        let status = state.status_snapshot();
2206        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2207        assert!(status.running);
2208        assert_eq!(status.port, Some(8080));
2209        assert!(status.uptime.is_none());
2210        assert!(status.last_error.is_none());
2211    }
2212
2213    #[tokio::test]
2214    async fn status_report_combines_status_and_metrics() {
2215        let state = CoreState::new(None).await;
2216        let report = state.status_report().await;
2217
2218        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2219        assert!(!report.status.running);
2220        assert_eq!(report.metrics.intercepts_pending, 0);
2221        assert_eq!(report.metrics.ws_pending_messages, 0);
2222        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2223        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2224        assert_eq!(report.metrics.audit_events_total, 0);
2225        assert_eq!(report.metrics.audit_events_failed, 0);
2226        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2227        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2228    }
2229
2230    #[test]
2231    fn proxy_config_new_and_transport_setters_preserve_values() {
2232        let config = ProxyConfig::new(
2233            8080,
2234            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2235            std::path::PathBuf::from("/tmp/ca_key.pem"),
2236        )
2237        .with_transparent(true)
2238        .with_udp_tproxy_port(Some(15000));
2239
2240        assert_eq!(config.port, 8080);
2241        assert_eq!(
2242            config.ca_cert_path,
2243            std::path::PathBuf::from("/tmp/ca_cert.pem")
2244        );
2245        assert_eq!(
2246            config.ca_key_path,
2247            std::path::PathBuf::from("/tmp/ca_key.pem")
2248        );
2249        assert!(config.transparent);
2250        assert_eq!(config.udp_tproxy_port, Some(15000));
2251    }
2252
2253    #[test]
2254    fn proxy_config_from_app_data_dir_creates_default_paths() {
2255        let unique = SystemTime::now()
2256            .duration_since(UNIX_EPOCH)
2257            .expect("clock drift")
2258            .as_nanos();
2259        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2260
2261        let config =
2262            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2263
2264        assert!(dir.exists());
2265        assert_eq!(config.port, 8899);
2266        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2267        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2268        assert!(!config.transparent);
2269        assert!(config.udp_tproxy_port.is_none());
2270    }
2271
2272    #[tokio::test]
2273    async fn intercept_snapshot_maps_pending_counts() {
2274        let state = CoreState::new(None).await;
2275        let snapshot = state.intercept_snapshot().await;
2276
2277        assert_eq!(snapshot.pending_count, 0);
2278        assert_eq!(snapshot.ws_pending_count, 0);
2279    }
2280
2281    #[tokio::test]
2282    async fn audit_snapshot_returns_latest_events_in_order() {
2283        let state = CoreState::new(None).await;
2284        state.record_audit_event(AuditEvent::new(
2285            AuditActor::Runtime,
2286            AuditEventKind::RuleChanged,
2287            "first",
2288            AuditOutcome::Success,
2289            json!({ "index": 1 }),
2290        ));
2291        state.record_audit_event(AuditEvent::new(
2292            AuditActor::Http,
2293            AuditEventKind::PolicyUpdated,
2294            "second",
2295            AuditOutcome::Success,
2296            json!({ "index": 2 }),
2297        ));
2298
2299        let snapshot = state.audit_snapshot(1);
2300
2301        assert_eq!(snapshot.events.len(), 1);
2302        assert_eq!(snapshot.events[0].target, "second");
2303        assert_eq!(snapshot.events[0].details["index"], 2);
2304    }
2305
2306    #[tokio::test]
2307    async fn query_audit_snapshot_filters_in_memory_events() {
2308        let state = CoreState::new(None).await;
2309        state.record_audit_event(AuditEvent::new(
2310            AuditActor::Http,
2311            AuditEventKind::RuleChanged,
2312            "rule-1",
2313            AuditOutcome::Success,
2314            json!({ "idx": 1 }),
2315        ));
2316        state.record_audit_event(AuditEvent::new(
2317            AuditActor::Probe,
2318            AuditEventKind::PolicyUpdated,
2319            "policy",
2320            AuditOutcome::Failed,
2321            json!({ "idx": 2 }),
2322        ));
2323
2324        let snapshot = state
2325            .query_audit_snapshot(CoreAuditQuery {
2326                actor: Some(AuditActor::Probe),
2327                kind: Some(AuditEventKind::PolicyUpdated),
2328                outcome: Some(AuditOutcome::Failed),
2329                limit: 10,
2330                ..Default::default()
2331            })
2332            .await;
2333
2334        assert_eq!(snapshot.events.len(), 1);
2335        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2336        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2337        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2338    }
2339
2340    #[tokio::test]
2341    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2342        let state = CoreState::new(Some(sqlite_url())).await;
2343        state.update_policy_from(
2344            AuditActor::Http,
2345            "policy".to_string(),
2346            ProxyPolicy {
2347                transparent_enabled: true,
2348                ..Default::default()
2349            },
2350        );
2351
2352        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2353        for _ in 0..10 {
2354            snapshot = state
2355                .query_audit_snapshot(CoreAuditQuery {
2356                    actor: Some(AuditActor::Http),
2357                    kind: Some(AuditEventKind::PolicyUpdated),
2358                    limit: 10,
2359                    ..Default::default()
2360                })
2361                .await;
2362            if !snapshot.events.is_empty() {
2363                break;
2364            }
2365            sleep(Duration::from_millis(20)).await;
2366        }
2367
2368        assert!(!snapshot.events.is_empty());
2369        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2370        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2371    }
2372
2373    #[tokio::test]
2374    async fn prepare_start_rejects_second_active_start() {
2375        let state = CoreState::new(None).await;
2376        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2377        state
2378            .prepare_start(8080, shutdown_tx)
2379            .expect("first start should succeed");
2380
2381        let (second_tx, _second_rx) = oneshot::channel();
2382        let error = state
2383            .prepare_start(8081, second_tx)
2384            .expect_err("second active start should be rejected");
2385        assert!(error.contains("already"));
2386    }
2387
2388    #[test]
2389    fn update_policy_records_audit_event() {
2390        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2391        let state = runtime.block_on(CoreState::new(None));
2392
2393        state.update_policy_from(
2394            AuditActor::Runtime,
2395            "policy".to_string(),
2396            ProxyPolicy {
2397                transparent_enabled: true,
2398                ..Default::default()
2399            },
2400        );
2401
2402        let events = state.recent_audit_events();
2403        let event = events.last().expect("audit event should exist");
2404        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2405        assert_eq!(event.outcome, AuditOutcome::Success);
2406        assert_eq!(event.details["transparent_enabled"], true);
2407    }
2408
2409    #[test]
2410    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2411        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2412        let state = runtime.block_on(CoreState::new(None));
2413        let original_timeout = state.policy_snapshot().request_timeout_ms;
2414
2415        state.patch_policy_from(
2416            AuditActor::Runtime,
2417            "policy.patch".to_string(),
2418            relay_core_api::policy::ProxyPolicyPatch {
2419                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2420                    enabled: Some(true),
2421                    redact_bodies: Some(true),
2422                    ..Default::default()
2423                }),
2424                upstream: None,
2425            },
2426        );
2427
2428        let policy = state.policy_snapshot();
2429        assert_eq!(policy.request_timeout_ms, original_timeout);
2430        assert!(policy.redaction.enabled);
2431        assert!(policy.redaction.redact_bodies);
2432
2433        let events = state.recent_audit_events();
2434        let event = events.last().expect("audit event should exist");
2435        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2436        assert_eq!(event.details["redaction_enabled"], true);
2437    }
2438
2439    #[tokio::test]
2440    async fn metrics_include_audit_and_lagged_event_counters() {
2441        let state = CoreState::new(None).await;
2442
2443        state.update_policy_from(
2444            AuditActor::Runtime,
2445            "policy".to_string(),
2446            ProxyPolicy::default(),
2447        );
2448        let _ = state
2449            .resolve_intercept_with_modifications_from(
2450                AuditActor::Probe,
2451                "missing-flow:request".to_string(),
2452                "drop",
2453                None,
2454            )
2455            .await;
2456
2457        state.record_flow_events_lagged(3);
2458        state.record_audit_events_lagged(5);
2459
2460        let metrics = state.get_metrics().await;
2461        assert_eq!(metrics.audit_events_total, 2);
2462        assert_eq!(metrics.audit_events_failed, 1);
2463        assert_eq!(metrics.flow_events_lagged_total, 3);
2464        assert_eq!(metrics.audit_events_lagged_total, 5);
2465    }
2466
2467    #[tokio::test]
2468    async fn prometheus_metrics_text_contains_observability_fields() {
2469        let state = CoreState::new(None).await;
2470        state.record_flow_events_lagged(2);
2471        state.record_audit_events_lagged(4);
2472
2473        let text = state.get_metrics_prometheus_text().await;
2474        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2475        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2476        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2477        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2478    }
2479
2480    #[tokio::test]
2481    async fn search_flows_uses_store_with_offset_pagination() {
2482        let state = CoreState::new(Some(sqlite_url())).await;
2483        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2484        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2485        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2486
2487        state.upsert_flow(Box::new(flow_a));
2488        state.upsert_flow(Box::new(flow_b));
2489        state.upsert_flow(Box::new(flow_c));
2490
2491        let mut baseline = Vec::new();
2492        for _ in 0..20 {
2493            baseline = state
2494                .search_flows(FlowQuery {
2495                    host: Some("api.example.com".to_string()),
2496                    path_contains: None,
2497                    method: None,
2498                    status_min: None,
2499                    status_max: None,
2500                    has_error: None,
2501                    is_websocket: None,
2502                    limit: Some(3),
2503                    offset: Some(0),
2504                })
2505                .await;
2506            if baseline.len() == 3 {
2507                break;
2508            }
2509            sleep(Duration::from_millis(20)).await;
2510        }
2511
2512        assert_eq!(baseline.len(), 3);
2513        let page = state
2514            .search_flows(FlowQuery {
2515                host: Some("api.example.com".to_string()),
2516                path_contains: None,
2517                method: None,
2518                status_min: None,
2519                status_max: None,
2520                has_error: None,
2521                is_websocket: None,
2522                limit: Some(1),
2523                offset: Some(1),
2524            })
2525            .await;
2526        assert_eq!(page.len(), 1);
2527        assert_eq!(page[0].id, baseline[1].id);
2528    }
2529
2530    #[tokio::test]
2531    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2532        let state = CoreState::new(Some(sqlite_url())).await;
2533        let first_flow = sample_http_flow(
2534            "persist.example.com",
2535            "/first",
2536            "GET",
2537            200,
2538            1_700_000_010_000,
2539        );
2540        let first_id = first_flow.id.to_string();
2541        state.upsert_flow(Box::new(first_flow));
2542        for i in 0..240 {
2543            state.upsert_flow(Box::new(sample_http_flow(
2544                "persist.example.com",
2545                &format!("/{}", i),
2546                "GET",
2547                200,
2548                1_700_000_020_000 + i,
2549            )));
2550        }
2551
2552        sleep(Duration::from_millis(200)).await;
2553
2554        let loaded = state.get_flow(first_id).await;
2555        assert!(loaded.is_some());
2556    }
2557
2558    #[tokio::test]
2559    async fn search_flows_redacts_summary_url_when_enabled() {
2560        let state = CoreState::new(None).await;
2561        state.update_policy_from(
2562            AuditActor::Runtime,
2563            "policy.redaction".to_string(),
2564            ProxyPolicy {
2565                redaction: RedactionPolicy {
2566                    enabled: true,
2567                    sensitive_query_keys: vec!["token".to_string()],
2568                    redact_bodies: false,
2569                    ..Default::default()
2570                },
2571                ..Default::default()
2572            },
2573        );
2574        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2575
2576        let mut items = Vec::new();
2577        for _ in 0..20 {
2578            items = state
2579                .search_flows(FlowQuery {
2580                    host: Some("api.example.com".to_string()),
2581                    path_contains: Some("/private".to_string()),
2582                    ..Default::default()
2583                })
2584                .await;
2585            if !items.is_empty() {
2586                break;
2587            }
2588            sleep(Duration::from_millis(20)).await;
2589        }
2590
2591        assert!(!items.is_empty());
2592        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2593        let token = redacted
2594            .query_pairs()
2595            .find(|(k, _)| k == "token")
2596            .map(|(_, v)| v.to_string());
2597        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2598    }
2599
2600    #[tokio::test]
2601    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2602        let state = CoreState::new(Some(sqlite_url())).await;
2603        state.update_policy_from(
2604            AuditActor::Runtime,
2605            "policy.redaction".to_string(),
2606            ProxyPolicy {
2607                redaction: RedactionPolicy {
2608                    enabled: true,
2609                    sensitive_header_names: vec![
2610                        "authorization".to_string(),
2611                        "set-cookie".to_string(),
2612                    ],
2613                    sensitive_query_keys: vec!["token".to_string()],
2614                    redact_bodies: true,
2615                },
2616                ..Default::default()
2617            },
2618        );
2619
2620        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2621        let flow_id = flow.id.to_string();
2622        state.upsert_flow(Box::new(flow));
2623        sleep(Duration::from_millis(80)).await;
2624
2625        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2626        let Layer::Http(http) = loaded.layer else {
2627            panic!("expected http layer");
2628        };
2629
2630        let auth = http
2631            .request
2632            .headers
2633            .iter()
2634            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2635            .map(|(_, v)| v.as_str());
2636        assert_eq!(auth, Some("[REDACTED]"));
2637
2638        let req_query_token = http
2639            .request
2640            .query
2641            .iter()
2642            .find(|(k, _)| k == "token")
2643            .map(|(_, v)| v.as_str());
2644        assert_eq!(req_query_token, Some("[REDACTED]"));
2645
2646        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2647        assert_eq!(req_body, Some("[REDACTED]"));
2648
2649        let response = http.response.expect("response should exist");
2650        let set_cookie = response
2651            .headers
2652            .iter()
2653            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2654            .map(|(_, v)| v.as_str());
2655        assert_eq!(set_cookie, Some("[REDACTED]"));
2656        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2657        assert_eq!(res_body, Some("[REDACTED]"));
2658    }
2659
2660    #[test]
2661    fn redact_flow_update_masks_http_body_when_enabled() {
2662        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2663        let state = runtime.block_on(CoreState::new(None));
2664        state.update_policy_from(
2665            AuditActor::Runtime,
2666            "policy.redaction".to_string(),
2667            ProxyPolicy {
2668                redaction: RedactionPolicy {
2669                    enabled: true,
2670                    redact_bodies: true,
2671                    ..Default::default()
2672                },
2673                ..Default::default()
2674            },
2675        );
2676
2677        let update = FlowUpdate::HttpBody {
2678            flow_id: "f-1".to_string(),
2679            direction: relay_core_api::flow::Direction::ClientToServer,
2680            body: BodyData {
2681                encoding: "utf-8".to_string(),
2682                content: "super-secret".to_string(),
2683                size: 12,
2684            },
2685        };
2686        let redacted = state.redact_flow_update_for_output(update);
2687        match redacted {
2688            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2689            _ => panic!("expected http body update"),
2690        }
2691    }
2692
2693    #[cfg(feature = "script")]
2694    #[tokio::test]
2695    async fn load_script_from_records_audit_event() {
2696        let state = CoreState::new(None).await;
2697
2698        state
2699            .load_script_from(
2700                AuditActor::Tauri,
2701                "tauri.load_script".to_string(),
2702                "globalThis.onRequestHeaders = (_flow) => {};",
2703            )
2704            .await
2705            .expect("script should load");
2706
2707        let events = state.recent_audit_events();
2708        let event = events.last().expect("audit event should exist");
2709        assert_eq!(event.actor, AuditActor::Tauri);
2710        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2711        assert_eq!(event.outcome, AuditOutcome::Success);
2712        assert_eq!(event.target, "tauri.load_script");
2713    }
2714}