Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62mod log_format;
63pub mod modification;
64pub mod paths;
65pub mod rule;
66pub mod services;
67
68// ── Re-exports for user convenience ──
69// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
70pub use paths::CaPaths;
71pub use relay_core_api::{flow, policy};
72pub use relay_core_lib::InterceptionResult;
73pub use relay_core_lib::rule as lib_rule;
74
75use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
76use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
77use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
78
79pub mod rule_engine {
80    pub use relay_core_lib::rule_engine::*;
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
84pub struct CoreMetrics {
85    pub flows_total: usize,
86    pub flows_in_memory: usize,
87    pub flows_dropped: usize,
88    pub intercepts_pending: usize,
89    pub ws_pending_messages: usize,
90    pub oldest_intercept_age_ms: Option<u64>,
91    pub oldest_ws_message_age_ms: Option<u64>,
92    pub rule_exec_errors: usize,
93    pub audit_events_total: usize,
94    pub audit_events_failed: usize,
95    pub flow_events_lagged_total: usize,
96    pub audit_events_lagged_total: usize,
97    /// O4: Total bodies degraded (budget exceeded)
98    pub proxy_body_degraded_total: usize,
99    /// O4: Total HTTP requests processed through streaming pipeline
100    pub proxy_http_request_total: usize,
101    /// O4: Total sandbox rejections
102    pub proxy_sandbox_reject_total: usize,
103    /// O4: Total invalid method rejections (strict_http_semantics)
104    pub proxy_invalid_method_total: usize,
105    /// O4: Total invalid status code rejections (strict_http_semantics)
106    pub proxy_invalid_status_total: usize,
107    /// O4: Total idempotent request retries
108    pub proxy_retry_total: usize,
109    /// O4: Total bodies processed in tap (streaming) mode
110    pub proxy_stream_mode_tap_total: usize,
111    /// O4: Total bodies degraded from tap to pass-through
112    pub proxy_stream_mode_degrade_total: usize,
113}
114
115impl CoreMetrics {
116    pub fn to_prometheus_text(&self) -> String {
117        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
118        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
119        format!(
120            "relay_core_flows_total {}\n\
121relay_core_flows_in_memory {}\n\
122relay_core_flows_dropped_total {}\n\
123relay_core_intercepts_pending {}\n\
124relay_core_ws_pending_messages {}\n\
125relay_core_oldest_intercept_age_ms {}\n\
126relay_core_oldest_ws_message_age_ms {}\n\
127relay_core_rule_exec_errors_total {}\n\
128relay_core_audit_events_total {}\n\
129relay_core_audit_events_failed_total {}\n\
130relay_core_flow_events_lagged_total {}\n\
131relay_core_audit_events_lagged_total {}\n\
132relay_core_proxy_body_degraded_total {}\n\
133relay_core_proxy_http_request_total {}\n\
134relay_core_proxy_sandbox_reject_total {}\n\
135relay_core_proxy_invalid_method_total {}\n\
136relay_core_proxy_invalid_status_total {}\n\
137relay_core_proxy_retry_total {}\n\
138relay_core_proxy_stream_mode_tap_total {}\n\
139relay_core_proxy_stream_mode_degrade_total {}\n",
140            self.flows_total,
141            self.flows_in_memory,
142            self.flows_dropped,
143            self.intercepts_pending,
144            self.ws_pending_messages,
145            oldest_intercept_age_ms,
146            oldest_ws_message_age_ms,
147            self.rule_exec_errors,
148            self.audit_events_total,
149            self.audit_events_failed,
150            self.flow_events_lagged_total,
151            self.audit_events_lagged_total,
152            self.proxy_body_degraded_total,
153            self.proxy_http_request_total,
154            self.proxy_sandbox_reject_total,
155            self.proxy_invalid_method_total,
156            self.proxy_invalid_status_total,
157            self.proxy_retry_total,
158            self.proxy_stream_mode_tap_total,
159            self.proxy_stream_mode_degrade_total,
160        )
161    }
162}
163
164#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
165pub struct CoreStatusSnapshot {
166    pub phase: RuntimeLifecyclePhase,
167    pub running: bool,
168    pub port: Option<u16>,
169    pub uptime: Option<u64>,
170    pub last_error: Option<String>,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
174pub struct CoreStatusReport {
175    pub status: CoreStatusSnapshot,
176    pub metrics: CoreMetrics,
177}
178
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
180pub struct CoreInterceptSnapshot {
181    pub pending_count: usize,
182    pub ws_pending_count: usize,
183}
184
185#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
186pub struct CoreAuditSnapshot {
187    pub events: Vec<AuditEvent>,
188}
189
190#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
191pub struct CoreAuditQuery {
192    pub since_ms: Option<u64>,
193    pub until_ms: Option<u64>,
194    pub actor: Option<AuditActor>,
195    pub kind: Option<AuditEventKind>,
196    pub outcome: Option<AuditOutcome>,
197    pub limit: usize,
198}
199
200impl Default for CoreAuditQuery {
201    fn default() -> Self {
202        Self {
203            since_ms: None,
204            until_ms: None,
205            actor: None,
206            kind: None,
207            outcome: None,
208            limit: 50,
209        }
210    }
211}
212
213#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
214#[serde(rename_all = "snake_case")]
215pub enum RuntimeLifecyclePhase {
216    Created,
217    Starting,
218    Running,
219    Stopping,
220    Stopped,
221    Failed,
222}
223
224impl RuntimeLifecyclePhase {
225    pub fn as_str(&self) -> &'static str {
226        match self {
227            Self::Created => "created",
228            Self::Starting => "starting",
229            Self::Running => "running",
230            Self::Stopping => "stopping",
231            Self::Stopped => "stopped",
232            Self::Failed => "failed",
233        }
234    }
235
236    pub fn is_active(&self) -> bool {
237        matches!(self, Self::Starting | Self::Running | Self::Stopping)
238    }
239}
240
241#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
242pub struct RuntimeLifecycle {
243    pub phase: RuntimeLifecyclePhase,
244    pub port: Option<u16>,
245    pub started_at_ms: Option<u64>,
246    pub last_error: Option<String>,
247}
248
249impl RuntimeLifecycle {
250    pub fn created() -> Self {
251        Self {
252            phase: RuntimeLifecyclePhase::Created,
253            port: None,
254            started_at_ms: None,
255            last_error: None,
256        }
257    }
258
259    pub fn is_active(&self) -> bool {
260        self.phase.is_active()
261    }
262
263    pub fn uptime_seconds(&self) -> Option<u64> {
264        let started_at_ms = self.started_at_ms?;
265        let now_ms = now_unix_ms();
266        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
267    }
268}
269
270pub enum ProxySpawnResult {
271    Started(tokio::task::JoinHandle<()>),
272    AlreadyRunning,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum ProxyStopResult {
277    Stopping,
278    NotRunning,
279}
280
281impl From<RuntimeLifecycle> for CoreStatusSnapshot {
282    fn from(lifecycle: RuntimeLifecycle) -> Self {
283        Self {
284            running: lifecycle.is_active(),
285            uptime: lifecycle.uptime_seconds(),
286            port: lifecycle.port,
287            last_error: lifecycle.last_error,
288            phase: lifecycle.phase,
289        }
290    }
291}
292
293pub struct CoreState {
294    flow_store: mpsc::Sender<FlowStoreMessage>,
295    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
296    rule_store: mpsc::Sender<RuleStoreMessage>,
297    store: Option<Store>,
298    #[cfg(feature = "script")]
299    pub script_interceptor: Arc<ScriptInterceptor>,
300    pub policy_tx: watch::Sender<ProxyPolicy>,
301    pub flows_dropped: Arc<AtomicUsize>,
302    audit_events_total: Arc<AtomicUsize>,
303    audit_events_failed: Arc<AtomicUsize>,
304    flow_events_lagged_total: Arc<AtomicUsize>,
305    audit_events_lagged_total: Arc<AtomicUsize>,
306    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
307    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
308    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
309    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
310    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
311    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
312}
313
314impl CoreState {
315    pub async fn new(db_url: Option<String>) -> Self {
316        const AUDIT_HISTORY_LIMIT: usize = 200;
317
318        let store = if let Some(url) = db_url {
319            match Store::connect(&url).await {
320                Ok(s) => {
321                    if let Err(e) = s.init().await {
322                        tracing::error!("Failed to init store: {}", e);
323                    }
324                    Some(s)
325                }
326                Err(e) => {
327                    tracing::error!("Failed to connect to store: {}", e);
328                    None
329                }
330            }
331        } else {
332            None
333        };
334
335        let (flow_tx, flow_rx) = mpsc::channel(10000);
336        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
337        tokio::spawn(flow_actor.run());
338
339        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
340        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
341        tokio::spawn(intercept_actor.run());
342
343        let (rule_tx, rule_rx) = mpsc::channel(100);
344        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
345        tokio::spawn(rule_actor.run());
346
347        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
348        let (flow_broadcast_tx, _) = broadcast::channel(1000);
349        let (audit_broadcast_tx, _) = broadcast::channel(256);
350        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
351
352        #[cfg(feature = "script")]
353        let script_interceptor = ScriptInterceptor::new()
354            .await
355            .expect("Failed to initialize ScriptInterceptor");
356        Self {
357            flow_store: flow_tx,
358            intercept_broker: intercept_tx,
359            rule_store: rule_tx,
360            store,
361            #[cfg(feature = "script")]
362            script_interceptor: Arc::new(script_interceptor),
363            policy_tx,
364            flows_dropped: Arc::new(AtomicUsize::new(0)),
365            audit_events_total: Arc::new(AtomicUsize::new(0)),
366            audit_events_failed: Arc::new(AtomicUsize::new(0)),
367            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
368            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
369            flow_broadcast_tx,
370            audit_broadcast_tx,
371            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
372            lifecycle_tx,
373            shutdown_tx: Mutex::new(None),
374        }
375    }
376
377    pub async fn get_metrics(&self) -> CoreMetrics {
378        let (flow_tx, flow_rx) = oneshot::channel();
379        let _ = self
380            .flow_store
381            .send(FlowStoreMessage::GetMetrics(flow_tx))
382            .await;
383        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
384
385        let (int_tx, int_rx) = oneshot::channel();
386        let _ = self
387            .intercept_broker
388            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
389            .await;
390        let (
391            intercepts_pending,
392            ws_pending_messages,
393            oldest_intercept_age_ms,
394            oldest_ws_message_age_ms,
395        ) = int_rx.await.unwrap_or((0, 0, None, None));
396
397        let (rule_tx, rule_rx) = oneshot::channel();
398        let _ = self
399            .rule_store
400            .send(RuleStoreMessage::GetMetrics(rule_tx))
401            .await;
402        let rule_exec_errors = rule_rx.await.unwrap_or(0);
403
404        CoreMetrics {
405            flows_total,
406            flows_in_memory,
407            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
408            intercepts_pending,
409            ws_pending_messages,
410            oldest_intercept_age_ms,
411            oldest_ws_message_age_ms,
412            rule_exec_errors,
413            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
414            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
415            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
416            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
417            proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
418            proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
419            proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
420            proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
421            proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
422            proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
423            proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
424            proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
425            ),
426        }
427    }
428
429    pub async fn get_metrics_prometheus_text(&self) -> String {
430        let mut text = self.get_metrics().await.to_prometheus_text();
431        #[cfg(feature = "script")]
432        {
433            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
434        }
435        text
436    }
437
438    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
439        self.lifecycle().into()
440    }
441
442    pub async fn status_report(&self) -> CoreStatusReport {
443        CoreStatusReport {
444            status: self.status_snapshot(),
445            metrics: self.get_metrics().await,
446        }
447    }
448
449    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
450        let metrics = self.get_metrics().await;
451        CoreInterceptSnapshot {
452            pending_count: metrics.intercepts_pending,
453            ws_pending_count: metrics.ws_pending_messages,
454        }
455    }
456
457    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
458        let events = self.recent_audit_events();
459        let start = events.len().saturating_sub(limit);
460        CoreAuditSnapshot {
461            events: events.into_iter().skip(start).collect(),
462        }
463    }
464
465    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
466        let limit = query.limit.clamp(1, 500);
467        if let Some(store) = &self.store {
468            let rows = store
469                .query_audit_events(
470                    query.since_ms,
471                    query.until_ms,
472                    query.actor.as_ref().map(AuditActor::as_str),
473                    query.kind.as_ref().map(AuditEventKind::as_str),
474                    query.outcome.as_ref().map(AuditOutcome::as_str),
475                    limit,
476                )
477                .await
478                .unwrap_or_default();
479
480            let mut events = Vec::with_capacity(rows.len());
481            for row in rows {
482                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
483                    events.push(event);
484                }
485            }
486            return CoreAuditSnapshot { events };
487        }
488
489        let mut events = self.recent_audit_events();
490        events.reverse();
491        let filtered = events
492            .into_iter()
493            .filter(|event| {
494                query
495                    .since_ms
496                    .map(|v| event.timestamp_ms >= v)
497                    .unwrap_or(true)
498            })
499            .filter(|event| {
500                query
501                    .until_ms
502                    .map(|v| event.timestamp_ms <= v)
503                    .unwrap_or(true)
504            })
505            .filter(|event| {
506                query
507                    .actor
508                    .as_ref()
509                    .map(|v| &event.actor == v)
510                    .unwrap_or(true)
511            })
512            .filter(|event| {
513                query
514                    .kind
515                    .as_ref()
516                    .map(|v| &event.kind == v)
517                    .unwrap_or(true)
518            })
519            .filter(|event| {
520                query
521                    .outcome
522                    .as_ref()
523                    .map(|v| &event.outcome == v)
524                    .unwrap_or(true)
525            })
526            .take(limit)
527            .collect();
528        CoreAuditSnapshot { events: filtered }
529    }
530
531    pub async fn get_flow(&self, id: String) -> Option<Flow> {
532        let (tx, rx) = oneshot::channel();
533        if let Err(e) = self
534            .flow_store
535            .send(FlowStoreMessage::GetFlow {
536                id: id.clone(),
537                respond_to: tx,
538            })
539            .await
540        {
541            error!("Failed to send GetFlow request: {}", e);
542            if let Some(store) = &self.store {
543                return store
544                    .load_flow(&id)
545                    .await
546                    .ok()
547                    .flatten()
548                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
549            }
550            return None;
551        }
552        let flow = rx
553            .await
554            .map_err(|e| {
555                error!("Failed to receive Flow response: {}", e);
556                e
557            })
558            .unwrap_or(None);
559        if let Some(flow) = flow {
560            return Some(redact_flow(flow, &self.current_redaction_policy()));
561        }
562        if let Some(store) = &self.store {
563            return store
564                .load_flow(&id)
565                .await
566                .ok()
567                .flatten()
568                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
569                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
570        }
571        None
572    }
573
574    pub async fn get_rules(&self) -> Vec<Rule> {
575        let (tx, rx) = oneshot::channel();
576        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
577            error!("Failed to send GetRules request: {}", e);
578            return Vec::new();
579        }
580        rx.await
581            .map_err(|e| {
582                error!("Failed to receive Rules response: {}", e);
583                e
584            })
585            .unwrap_or_default()
586    }
587
588    pub async fn set_rules(&self, rules: Vec<Rule>) {
589        let _ = self
590            .set_rules_from(
591                AuditActor::Runtime,
592                "rules.replace",
593                "rule_set".to_string(),
594                json!({}),
595                rules,
596            )
597            .await;
598    }
599
600    pub async fn upsert_rule_from(
601        &self,
602        actor: AuditActor,
603        operation: &str,
604        target: String,
605        details: serde_json::Value,
606        rule: Rule,
607    ) -> Result<(), String> {
608        let rule_id = rule.id.clone();
609        let mut rules = self.get_rules().await;
610        rules.retain(|existing| existing.id != rule_id);
611        rules.push(rule);
612        self.set_rules_from(actor, operation, target, details, rules)
613            .await
614    }
615
616    pub async fn delete_rule_from(
617        &self,
618        actor: AuditActor,
619        operation: &str,
620        target: String,
621        details: serde_json::Value,
622        rule_id: &str,
623    ) -> Result<bool, String> {
624        let mut rules = self.get_rules().await;
625        let before = rules.len();
626        rules.retain(|rule| rule.id != rule_id);
627        if rules.len() == before {
628            return Ok(false);
629        }
630        self.set_rules_from(actor, operation, target, details, rules)
631            .await
632            .map(|_| true)
633    }
634
635    pub async fn create_mock_response_rule_from(
636        &self,
637        actor: AuditActor,
638        target: String,
639        details: serde_json::Value,
640        config: MockResponseRuleConfig,
641    ) -> Result<String, String> {
642        let rule_id = config.rule_id.clone();
643        let rule = build_mock_response_rule(config);
644        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
645            .await
646            .map(|_| rule_id)
647    }
648
649    pub async fn create_intercept_rule_from(
650        &self,
651        actor: AuditActor,
652        target: String,
653        details: serde_json::Value,
654        config: InterceptRuleConfig,
655    ) -> Result<String, String> {
656        let rule_id = config.rule_id.clone();
657        let mut rules = self.get_rules().await;
658        rules.extend(build_intercept_rules(config));
659        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
660            .await
661            .map(|_| rule_id)
662    }
663
664    pub async fn upsert_legacy_intercept_rule_from(
665        &self,
666        actor: AuditActor,
667        target: String,
668        details: serde_json::Value,
669        rule: InterceptRule,
670    ) -> Result<(), String> {
671        let rule_id = rule.id.clone();
672        let mut rules = self.get_rules().await;
673        rules.retain(|existing| {
674            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
675        });
676        rules.extend(rule.to_rules());
677        self.set_rules_from(
678            actor,
679            "rule.intercept_legacy_upsert",
680            target,
681            details,
682            rules,
683        )
684        .await
685    }
686
687    pub async fn set_rules_from(
688        &self,
689        actor: AuditActor,
690        operation: &str,
691        target: String,
692        details: serde_json::Value,
693        rules: Vec<Rule>,
694    ) -> Result<(), String> {
695        let rule_count = rules.len();
696        if let Err(e) = self
697            .rule_store
698            .send(RuleStoreMessage::SetRules(rules))
699            .await
700        {
701            error!("Failed to send SetRules request: {}", e);
702            self.record_audit_event(AuditEvent::new(
703                actor,
704                AuditEventKind::RuleChanged,
705                target,
706                AuditOutcome::Failed,
707                json!({
708                    "operation": operation,
709                    "rule_count": rule_count,
710                    "details": details,
711                    "error": e.to_string()
712                }),
713            ));
714            return Err(e.to_string());
715        }
716
717        self.record_audit_event(AuditEvent::new(
718            actor,
719            AuditEventKind::RuleChanged,
720            target,
721            AuditOutcome::Success,
722            json!({
723                "operation": operation,
724                "rule_count": rule_count,
725                "details": details
726            }),
727        ));
728        Ok(())
729    }
730
731    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
732        let mut new_rules = Vec::new();
733        for rule in rules {
734            new_rules.extend(rule.to_rules());
735        }
736        self.set_rules(new_rules).await;
737    }
738
739    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
740        let (tx, rx) = oneshot::channel();
741        if let Err(e) = self
742            .rule_store
743            .send(RuleStoreMessage::GetRuleEngine(tx))
744            .await
745        {
746            error!("Failed to send GetRuleEngine request: {}", e);
747            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
748        }
749        rx.await
750            .map_err(|e| {
751                error!("Failed to receive RuleEngine response: {}", e);
752                e
753            })
754            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
755    }
756
757    pub fn update_policy(&self, policy: ProxyPolicy) {
758        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
759    }
760
761    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
762        let mut policy = self.policy_snapshot();
763        policy.apply_patch(patch);
764        self.update_policy_from(actor, target, policy);
765    }
766
767    pub fn policy_snapshot(&self) -> ProxyPolicy {
768        self.policy_tx.borrow().clone()
769    }
770
771    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
772        let details = json!({
773            "strict_http_semantics": policy.strict_http_semantics,
774            "request_timeout_ms": policy.request_timeout_ms,
775            "max_body_size": policy.max_body_size,
776            "transparent_enabled": policy.transparent_enabled,
777            "redaction_enabled": policy.redaction.enabled,
778            "redact_bodies": policy.redaction.redact_bodies
779        });
780
781        self.policy_tx.send_replace(policy);
782        self.record_audit_event(AuditEvent::new(
783            actor,
784            AuditEventKind::PolicyUpdated,
785            target,
786            AuditOutcome::Success,
787            details,
788        ));
789    }
790
791    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
792        if let Err(e) = self
793            .intercept_broker
794            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
795            .await
796        {
797            error!("Failed to send RegisterIntercept request: {}", e);
798        }
799    }
800
801    pub async fn resolve_intercept(
802        &self,
803        key: String,
804        result: InterceptionResult,
805    ) -> Result<(), String> {
806        let (tx, rx) = oneshot::channel();
807        if let Err(e) = self
808            .intercept_broker
809            .send(InterceptBrokerMessage::ResolveIntercept {
810                key,
811                result,
812                respond_to: tx,
813            })
814            .await
815        {
816            error!("Failed to send ResolveIntercept request: {}", e);
817            return Err(e.to_string());
818        }
819        rx.await.map_err(|_| "Actor dropped".to_string())?
820    }
821
822    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
823        let (tx, rx) = oneshot::channel();
824        if let Err(e) = self
825            .intercept_broker
826            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
827                key,
828                respond_to: tx,
829            })
830            .await
831        {
832            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
833            return None;
834        }
835        rx.await
836            .map_err(|e| {
837                error!("Failed to receive WebSocketMessage response: {}", e);
838                e
839            })
840            .unwrap_or(None)
841    }
842
843    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
844        if let Err(e) = self
845            .intercept_broker
846            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
847            .await
848        {
849            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
850        }
851    }
852
853    pub async fn is_intercept_pending(&self, key: String) -> bool {
854        let (tx, rx) = oneshot::channel();
855        if let Err(e) = self
856            .intercept_broker
857            .send(InterceptBrokerMessage::GetPendingIntercept {
858                key,
859                respond_to: tx,
860            })
861            .await
862        {
863            error!("Failed to send GetPendingIntercept request: {}", e);
864            return false;
865        }
866        rx.await
867            .map_err(|e| {
868                error!("Failed to receive PendingIntercept response: {}", e);
869                e
870            })
871            .unwrap_or(false)
872    }
873
874    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
875        let (tx, rx) = oneshot::channel();
876        if let Err(e) = self
877            .intercept_broker
878            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
879                flow_id,
880                respond_to: tx,
881            })
882            .await
883        {
884            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
885            return false;
886        }
887        rx.await
888            .map_err(|e| {
889                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
890                e
891            })
892            .unwrap_or(false)
893    }
894
895    pub fn upsert_flow(&self, flow: Box<Flow>) {
896        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
897            // Drop flow if channel is full
898            error!("FlowStore dropped flow: {}", e);
899            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
900        }
901    }
902
903    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
904        if let Err(e) = self
905            .flow_store
906            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
907        {
908            error!("FlowStore dropped WS message: {}", e);
909            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
910        }
911    }
912
913    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
914        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
915            flow_id,
916            body,
917            direction,
918        }) {
919            error!("FlowStore dropped HTTP body: {}", e);
920            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
921        }
922    }
923
924    /// P1: Tag a flow as budget-exceeded (body too large for full rule inspection)
925    pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
926        if let Err(e) = self
927            .flow_store
928            .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
929        {
930            error!("FlowStore dropped budget-exceeded tag: {}", e);
931            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
932        }
933    }
934
935    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
936    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
937        self.flow_broadcast_tx.subscribe()
938    }
939
940    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
941        self.audit_broadcast_tx.subscribe()
942    }
943
944    fn current_redaction_policy(&self) -> RedactionPolicy {
945        self.policy_tx.borrow().redaction.clone()
946    }
947
948    pub fn record_flow_events_lagged(&self, skipped: u64) {
949        self.flow_events_lagged_total
950            .fetch_add(skipped as usize, Ordering::Relaxed);
951    }
952
953    pub fn record_audit_events_lagged(&self, skipped: u64) {
954        self.audit_events_lagged_total
955            .fetch_add(skipped as usize, Ordering::Relaxed);
956    }
957
958    pub fn lifecycle(&self) -> RuntimeLifecycle {
959        self.lifecycle_tx.borrow().clone()
960    }
961
962    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
963        self.lifecycle_tx.subscribe()
964    }
965
966    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
967        let current = self.lifecycle();
968        if current.is_active() {
969            return Err(format!(
970                "Proxy is already {} on port {}",
971                current.phase.as_str(),
972                current.port.unwrap_or(port)
973            ));
974        }
975
976        let mut guard = self
977            .shutdown_tx
978            .lock()
979            .map_err(|_| "shutdown state poisoned".to_string())?;
980        *guard = Some(shutdown_tx);
981        drop(guard);
982
983        self.update_lifecycle(RuntimeLifecycle {
984            phase: RuntimeLifecyclePhase::Starting,
985            port: Some(port),
986            started_at_ms: None,
987            last_error: None,
988        });
989        Ok(())
990    }
991
992    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
993        let mut guard = self
994            .shutdown_tx
995            .lock()
996            .map_err(|_| "shutdown state poisoned".to_string())?;
997        let Some(tx) = guard.take() else {
998            return Ok(ProxyStopResult::NotRunning);
999        };
1000        drop(guard);
1001
1002        let current = self.lifecycle();
1003        self.update_lifecycle(RuntimeLifecycle {
1004            phase: RuntimeLifecyclePhase::Stopping,
1005            port: current.port,
1006            started_at_ms: current.started_at_ms,
1007            last_error: current.last_error,
1008        });
1009        let _ = tx.send(());
1010        Ok(ProxyStopResult::Stopping)
1011    }
1012
1013    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1014        self.audit_history
1015            .lock()
1016            .map(|events| events.iter().cloned().collect())
1017            .unwrap_or_default()
1018    }
1019
1020    /// 搜索内存中的 Flow 列表,返回轻量摘要。
1021    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1022        let redaction = self.current_redaction_policy();
1023        if let Some(store) = &self.store {
1024            return store
1025                .query_flow_summaries(&query)
1026                .await
1027                .unwrap_or_default()
1028                .into_iter()
1029                .map(|summary| redact_flow_summary(summary, &redaction))
1030                .collect();
1031        }
1032        let (tx, rx) = oneshot::channel();
1033        if let Err(e) = self
1034            .flow_store
1035            .send(FlowStoreMessage::SearchFlows {
1036                query,
1037                respond_to: tx,
1038            })
1039            .await
1040        {
1041            error!("Failed to send SearchFlows request: {}", e);
1042            return Vec::new();
1043        }
1044        rx.await
1045            .unwrap_or_default()
1046            .into_iter()
1047            .map(|summary| redact_flow_summary(summary, &redaction))
1048            .collect()
1049    }
1050
1051    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1052        let redaction = self.current_redaction_policy();
1053        redact_flow_update(update, &redaction)
1054    }
1055
1056    /// 解除截获并可选地应用修改。
1057    ///
1058    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1059    /// - 若 `mods` 为 None,直接 Continue;
1060    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1061    ///
1062    /// `key` 格式约定:
1063    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1064    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1065    pub async fn resolve_intercept_with_modifications(
1066        &self,
1067        key: String,
1068        action: &str,
1069        mods: Option<relay_core_api::modification::FlowModification>,
1070    ) -> Result<(), String> {
1071        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1072            .await
1073    }
1074
1075    pub async fn resolve_intercept_with_modifications_from(
1076        &self,
1077        actor: AuditActor,
1078        key: String,
1079        action: &str,
1080        mods: Option<relay_core_api::modification::FlowModification>,
1081    ) -> Result<(), String> {
1082        let modified_fields = modification_field_names(mods.as_ref());
1083        let result = match action {
1084            "drop" => InterceptionResult::Drop,
1085            _ => match mods {
1086                None => InterceptionResult::Continue,
1087                Some(m) => {
1088                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1089                    match parts.as_slice() {
1090                        [flow_id, phase] => {
1091                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1092                                modification::apply_flow_modification(&flow, phase, m)
1093                            } else {
1094                                InterceptionResult::Continue
1095                            }
1096                        }
1097                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1098                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1099                        [_, "ws_msg", _] => {
1100                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1101                                modification::apply_ws_modification(&msg, m)
1102                            } else {
1103                                InterceptionResult::Continue
1104                            }
1105                        }
1106                        _ => InterceptionResult::Continue,
1107                    }
1108                }
1109            },
1110        };
1111        let outcome = self.resolve_intercept(key.clone(), result).await;
1112        let audit_outcome = if outcome.is_ok() {
1113            AuditOutcome::Success
1114        } else {
1115            AuditOutcome::Failed
1116        };
1117        let error_message = outcome.as_ref().err().cloned();
1118
1119        self.record_audit_event(AuditEvent::new(
1120            actor,
1121            AuditEventKind::InterceptResolved,
1122            key,
1123            audit_outcome,
1124            json!({
1125                "action": action,
1126                "has_modifications": !modified_fields.is_empty(),
1127                "modified_fields": modified_fields,
1128                "error": error_message
1129            }),
1130        ));
1131        outcome
1132    }
1133
1134    #[cfg(feature = "script")]
1135    pub async fn load_script_from(
1136        &self,
1137        actor: AuditActor,
1138        target: String,
1139        script: &str,
1140    ) -> Result<(), String> {
1141        let result = self
1142            .script_interceptor
1143            .load_script(script)
1144            .await
1145            .map_err(|e| e.to_string());
1146
1147        let outcome = if result.is_ok() {
1148            AuditOutcome::Success
1149        } else {
1150            AuditOutcome::Failed
1151        };
1152        let error_message = result.as_ref().err().cloned();
1153
1154        self.record_audit_event(AuditEvent::new(
1155            actor,
1156            AuditEventKind::ScriptReloaded,
1157            target,
1158            outcome,
1159            json!({
1160                "script_bytes": script.len(),
1161                "error": error_message
1162            }),
1163        ));
1164
1165        result
1166    }
1167
1168    /// S5: Set the env var whitelist for relay.env() in user scripts.
1169    #[cfg(feature = "script")]
1170    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1171        self.script_interceptor.set_env_allow(env_allow).await;
1172    }
1173
1174    fn record_audit_event(&self, event: AuditEvent) {
1175        const AUDIT_HISTORY_LIMIT: usize = 200;
1176        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1177        if event.outcome == AuditOutcome::Failed {
1178            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1179        }
1180
1181        let details = log_format::audit_details_log(&event.kind, &event.details);
1182        tracing::info!(
1183            target: "relay_core_audit",
1184            event_id = %event.id,
1185            actor = %event.actor.as_str(),
1186            kind = %event.kind.as_str(),
1187            target = %event.target,
1188            outcome = %event.outcome.as_str(),
1189            details = %details
1190        );
1191
1192        if let Ok(mut history) = self.audit_history.lock() {
1193            if history.len() >= AUDIT_HISTORY_LIMIT {
1194                history.pop_front();
1195            }
1196            history.push_back(event.clone());
1197        }
1198
1199        if let Some(store) = self.store.clone() {
1200            let event_json = serde_json::to_value(&event).unwrap_or_default();
1201            let event_id = event.id.clone();
1202            let timestamp_ms = event.timestamp_ms;
1203            let actor = event.actor.as_str().to_string();
1204            let kind = event.kind.as_str().to_string();
1205            let target = event.target.clone();
1206            let outcome = event.outcome.as_str().to_string();
1207            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1208                handle.spawn(async move {
1209                    if let Err(e) = store
1210                        .save_audit_event(AuditEventRecord {
1211                            id: &event_id,
1212                            timestamp_ms,
1213                            actor: &actor,
1214                            kind: &kind,
1215                            target: &target,
1216                            outcome: &outcome,
1217                            content: &event_json,
1218                        })
1219                        .await
1220                    {
1221                        tracing::error!("Failed to persist audit event: {}", e);
1222                    }
1223                });
1224            }
1225        }
1226
1227        let _ = self.audit_broadcast_tx.send(event);
1228    }
1229
1230    fn transition_to_running(&self, port: u16) {
1231        self.update_lifecycle(RuntimeLifecycle {
1232            phase: RuntimeLifecyclePhase::Running,
1233            port: Some(port),
1234            started_at_ms: Some(now_unix_ms()),
1235            last_error: None,
1236        });
1237    }
1238
1239    fn transition_to_stopped(&self) {
1240        if let Ok(mut guard) = self.shutdown_tx.lock() {
1241            *guard = None;
1242        }
1243
1244        self.update_lifecycle(RuntimeLifecycle {
1245            phase: RuntimeLifecyclePhase::Stopped,
1246            port: None,
1247            started_at_ms: None,
1248            last_error: None,
1249        });
1250    }
1251
1252    fn transition_to_failed(&self, port: u16, error: String) {
1253        if let Ok(mut guard) = self.shutdown_tx.lock() {
1254            *guard = None;
1255        }
1256
1257        self.update_lifecycle(RuntimeLifecycle {
1258            phase: RuntimeLifecyclePhase::Failed,
1259            port: Some(port),
1260            started_at_ms: None,
1261            last_error: Some(error),
1262        });
1263    }
1264
1265    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1266        let err = lifecycle
1267            .last_error
1268            .as_deref()
1269            .filter(|s| !s.is_empty())
1270            .unwrap_or("-");
1271        tracing::info!(
1272            target: "relay_core_lifecycle",
1273            phase = %lifecycle.phase.as_str(),
1274            port = %log_format::opt_u16(lifecycle.port),
1275            started_at_ms = %log_format::opt_u64(lifecycle.started_at_ms),
1276            last_error = %err,
1277        );
1278        self.lifecycle_tx.send_replace(lifecycle);
1279    }
1280
1281    pub fn spawn_proxy(
1282        self: &Arc<Self>,
1283        config: ProxyConfig,
1284        sink: mpsc::Sender<FlowUpdate>,
1285        extra_interceptor: Option<Arc<dyn Interceptor>>,
1286    ) -> Result<ProxySpawnResult, String> {
1287        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1288        match self.prepare_start(config.port, shutdown_tx) {
1289            Ok(()) => {}
1290            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1291            Err(error) => return Err(error),
1292        }
1293        let state = self.clone();
1294        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1295            if let Err(error) = state
1296                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1297                .await
1298            {
1299                error!("Proxy failed: {}", error);
1300            }
1301        })))
1302    }
1303
1304    pub async fn start_proxy(
1305        self: &Arc<Self>,
1306        config: ProxyConfig,
1307        sink: mpsc::Sender<FlowUpdate>,
1308        extra_interceptor: Option<Arc<dyn Interceptor>>,
1309    ) -> Result<(), String> {
1310        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1311        self.prepare_start(config.port, shutdown_tx)?;
1312        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1313            .await
1314    }
1315
1316    async fn run_proxy(
1317        self: &Arc<Self>,
1318        config: ProxyConfig,
1319        sink: mpsc::Sender<FlowUpdate>,
1320        extra_interceptor: Option<Arc<dyn Interceptor>>,
1321        shutdown_rx: oneshot::Receiver<()>,
1322    ) -> Result<(), String> {
1323        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1324        let state = self.clone();
1325
1326        if let Some(parent) = config.ca_cert_path.parent()
1327            && !parent.exists()
1328        {
1329            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1330        }
1331
1332        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1333            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1334        let ca = Arc::new(ca);
1335
1336        #[cfg(feature = "script")]
1337        let script_interceptor = self.script_interceptor.clone();
1338
1339        #[cfg(feature = "script")]
1340        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1341
1342        #[cfg(not(feature = "script"))]
1343        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1344
1345        interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1346            self.clone(),
1347        )));
1348
1349        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1350            self.clone(),
1351        )));
1352
1353        if let Some(interceptor) = extra_interceptor {
1354            interceptors.push(interceptor);
1355        }
1356
1357        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1358        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1359
1360        tokio::spawn(async move {
1361            while let Some(update) = proxy_rx.recv().await {
1362                match update.clone() {
1363                    FlowUpdate::Full(flow) => {
1364                        state.upsert_flow(flow);
1365                    }
1366                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1367                        state.append_ws_message(flow_id, message);
1368                    }
1369                    FlowUpdate::HttpBody {
1370                        flow_id,
1371                        direction,
1372                        body,
1373                    } => {
1374                        state.update_http_body(flow_id, body, direction);
1375                    }
1376                    FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1377                        // P1: Tag the flow as budget-exceeded and update resilience trace
1378                        state.tag_flow_budget_exceeded(flow_id, direction);
1379                    }
1380                }
1381
1382                let _ = state.flow_broadcast_tx.send(update.clone());
1383
1384                if sink.try_send(update).is_err() {
1385                    relay_core_lib::metrics::inc_flows_dropped();
1386                }
1387            }
1388        });
1389
1390        if let Some(udp_port) = config.udp_tproxy_port {
1391            let udp_proxy_tx = proxy_tx.clone();
1392            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1393            tokio::spawn(async move {
1394                match tokio::net::UdpSocket::bind(udp_addr).await {
1395                    Ok(socket) => {
1396                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1397                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1398                            error!("UDP TPROXY failed: {}", e);
1399                        }
1400                    }
1401                    Err(e) => {
1402                        error!("Failed to bind UDP TPROXY socket: {}", e);
1403                    }
1404                }
1405            });
1406        }
1407
1408        let listener = match TcpListener::bind(addr).await {
1409            Ok(listener) => listener,
1410            Err(e) => {
1411                if let Ok(mut guard) = self.shutdown_tx.lock() {
1412                    *guard = None;
1413                }
1414                let message = format!("Failed to bind to address {}: {}", addr, e);
1415                self.transition_to_failed(config.port, message.clone());
1416                return Err(message);
1417            }
1418        };
1419
1420        self.transition_to_running(config.port);
1421        let shutdown_rx = Some(shutdown_rx);
1422
1423        if config.transparent {
1424            let policy = ProxyPolicy {
1425                transparent_enabled: true,
1426                ..Default::default()
1427            };
1428            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1429            let policy_rx = self.policy_tx.subscribe();
1430
1431            let provider: Arc<dyn OriginalDstProvider> = {
1432                let mut addrs = BTreeSet::new();
1433                if let Ok(local) = listener.local_addr() {
1434                    addrs.insert(local);
1435                }
1436
1437                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1438                {
1439                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1440                }
1441                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1442                {
1443                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1444                        Ok(provider) => Arc::new(provider),
1445                        Err(e) => {
1446                            error!("Failed to initialize macOS PF provider: {}", e);
1447                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1448                        }
1449                    }
1450                }
1451                #[cfg(target_os = "windows")]
1452                {
1453                    let filter =
1454                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1455                            .to_string();
1456                    let port = config.port;
1457                    tokio::spawn(async move {
1458                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1459                            .await;
1460                    });
1461
1462                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1463                }
1464                #[cfg(not(any(
1465                    all(target_os = "linux", feature = "transparent-linux"),
1466                    all(target_os = "macos", feature = "transparent-macos"),
1467                    target_os = "windows"
1468                )))]
1469                {
1470                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1471                }
1472            };
1473
1474            let source = TransparentTcpCaptureSource::new(listener, provider);
1475            let result = relay_core_lib::start_proxy(
1476                source,
1477                proxy_tx,
1478                interceptor,
1479                ca,
1480                policy_rx,
1481                None,
1482                shutdown_rx,
1483                Some(self.get_rule_engine().await),
1484            )
1485            .await
1486            .map_err(|e| e.to_string());
1487            if let Err(error) = &result {
1488                self.transition_to_failed(config.port, error.clone());
1489            } else {
1490                self.transition_to_stopped();
1491            }
1492            result
1493        } else {
1494            let source = TcpCaptureSource::new(listener);
1495            let policy = ProxyPolicy::default();
1496            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1497            let policy_rx = self.policy_tx.subscribe();
1498
1499            let result = relay_core_lib::start_proxy(
1500                source,
1501                proxy_tx,
1502                interceptor,
1503                ca,
1504                policy_rx,
1505                None,
1506                shutdown_rx,
1507                Some(self.get_rule_engine().await),
1508            )
1509            .await
1510            .map_err(|e| e.to_string());
1511            if let Err(error) = &result {
1512                self.transition_to_failed(config.port, error.clone());
1513            } else {
1514                self.transition_to_stopped();
1515            }
1516            result
1517        }
1518    }
1519}
1520
1521fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1522    match update {
1523        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1524        FlowUpdate::WebSocketMessage {
1525            flow_id,
1526            mut message,
1527        } => {
1528            message.content = redact_body(message.content, redaction);
1529            FlowUpdate::WebSocketMessage { flow_id, message }
1530        }
1531        FlowUpdate::HttpBody {
1532            flow_id,
1533            direction,
1534            body,
1535        } => FlowUpdate::HttpBody {
1536            flow_id,
1537            direction,
1538            body: redact_body(body, redaction),
1539        },
1540        FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1541            FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1542        }
1543    }
1544}
1545
1546fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1547    if !redaction.enabled {
1548        return flow;
1549    }
1550    match &mut flow.layer {
1551        Layer::Http(http) => {
1552            redact_http_request(&mut http.request, redaction);
1553            if let Some(response) = &mut http.response {
1554                redact_headers(&mut response.headers, redaction);
1555                response.body = response
1556                    .body
1557                    .take()
1558                    .map(|body| redact_body(body, redaction));
1559            }
1560        }
1561        Layer::WebSocket(ws) => {
1562            redact_http_request(&mut ws.handshake_request, redaction);
1563            redact_headers(&mut ws.handshake_response.headers, redaction);
1564            ws.handshake_response.body = ws
1565                .handshake_response
1566                .body
1567                .take()
1568                .map(|body| redact_body(body, redaction));
1569            for message in &mut ws.messages {
1570                message.content = redact_body(message.content.clone(), redaction);
1571            }
1572        }
1573        _ => {}
1574    }
1575    flow
1576}
1577
1578fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1579    if !redaction.enabled {
1580        return summary;
1581    }
1582    summary.url = redact_url_string(&summary.url, redaction);
1583    summary
1584}
1585
1586fn redact_http_request(
1587    request: &mut relay_core_api::flow::HttpRequest,
1588    redaction: &RedactionPolicy,
1589) {
1590    redact_headers(&mut request.headers, redaction);
1591    redact_query_pairs(&mut request.query, redaction);
1592    request.url = redact_url(&request.url, redaction);
1593    request.body = request.body.take().map(|body| redact_body(body, redaction));
1594}
1595
1596fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1597    if !redaction.enabled {
1598        return;
1599    }
1600    let sensitive = redaction_set(&redaction.sensitive_header_names);
1601    for (name, value) in headers.iter_mut() {
1602        if sensitive.contains(&name.to_ascii_lowercase()) {
1603            *value = "[REDACTED]".to_string();
1604        }
1605    }
1606}
1607
1608fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1609    if !redaction.enabled {
1610        return;
1611    }
1612    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1613    for (name, value) in query.iter_mut() {
1614        if sensitive.contains(&name.to_ascii_lowercase()) {
1615            *value = "[REDACTED]".to_string();
1616        }
1617    }
1618}
1619
1620fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1621    if !redaction.enabled {
1622        return url.clone();
1623    }
1624    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1625    let pairs: Vec<(String, String)> = url
1626        .query_pairs()
1627        .map(|(k, v)| {
1628            let key = k.to_string();
1629            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1630                "[REDACTED]".to_string()
1631            } else {
1632                v.to_string()
1633            };
1634            (key, value)
1635        })
1636        .collect();
1637    let mut next = url.clone();
1638    if pairs.is_empty() {
1639        return next;
1640    }
1641    next.query_pairs_mut().clear();
1642    for (k, v) in pairs {
1643        next.query_pairs_mut().append_pair(&k, &v);
1644    }
1645    next
1646}
1647
1648fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1649    match url::Url::parse(input) {
1650        Ok(url) => redact_url(&url, redaction).to_string(),
1651        Err(_) => input.to_string(),
1652    }
1653}
1654
1655fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1656    if redaction.enabled && redaction.redact_bodies {
1657        body.content = "[REDACTED]".to_string();
1658    }
1659    body
1660}
1661
1662fn redaction_set(values: &[String]) -> HashSet<String> {
1663    values
1664        .iter()
1665        .map(|value| value.to_ascii_lowercase())
1666        .collect()
1667}
1668
1669#[derive(Clone)]
1670pub struct ProxyConfig {
1671    pub port: u16,
1672    pub ca_cert_path: std::path::PathBuf,
1673    pub ca_key_path: std::path::PathBuf,
1674    pub transparent: bool,
1675    pub udp_tproxy_port: Option<u16>,
1676}
1677
1678impl ProxyConfig {
1679    pub fn new(
1680        port: u16,
1681        ca_cert_path: std::path::PathBuf,
1682        ca_key_path: std::path::PathBuf,
1683    ) -> Self {
1684        Self {
1685            port,
1686            ca_cert_path,
1687            ca_key_path,
1688            transparent: false,
1689            udp_tproxy_port: None,
1690        }
1691    }
1692
1693    pub fn from_app_data_dir(
1694        app_data_dir: impl Into<std::path::PathBuf>,
1695        port: u16,
1696    ) -> Result<Self, String> {
1697        let app_data_dir = app_data_dir.into();
1698        if !app_data_dir.exists() {
1699            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1700        }
1701
1702        Ok(Self::new(
1703            port,
1704            app_data_dir.join("ca_cert.pem"),
1705            app_data_dir.join("ca_key.pem"),
1706        ))
1707    }
1708
1709    pub fn with_transparent(mut self, transparent: bool) -> Self {
1710        self.transparent = transparent;
1711        self
1712    }
1713
1714    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1715        self.udp_tproxy_port = udp_tproxy_port;
1716        self
1717    }
1718}
1719
1720fn now_unix_ms() -> u64 {
1721    SystemTime::now()
1722        .duration_since(UNIX_EPOCH)
1723        .unwrap_or_default()
1724        .as_millis() as u64
1725}
1726
1727fn modification_field_names(
1728    mods: Option<&relay_core_api::modification::FlowModification>,
1729) -> Vec<&'static str> {
1730    let Some(mods) = mods else {
1731        return Vec::new();
1732    };
1733
1734    let mut fields = Vec::new();
1735    if mods.method.is_some() {
1736        fields.push("method");
1737    }
1738    if mods.url.is_some() {
1739        fields.push("url");
1740    }
1741    if mods.request_headers.is_some() {
1742        fields.push("request_headers");
1743    }
1744    if mods.request_body.is_some() {
1745        fields.push("request_body");
1746    }
1747    if mods.status_code.is_some() {
1748        fields.push("status_code");
1749    }
1750    if mods.response_headers.is_some() {
1751        fields.push("response_headers");
1752    }
1753    if mods.response_body.is_some() {
1754        fields.push("response_body");
1755    }
1756    if mods.message_content.is_some() {
1757        fields.push("message_content");
1758    }
1759    fields
1760}
1761
1762#[cfg(test)]
1763mod tests {
1764    use super::*;
1765    use chrono::Utc;
1766    use relay_core_api::flow::{
1767        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1768        ResponseTiming, TransportProtocol,
1769    };
1770    use std::sync::atomic::{AtomicU64, Ordering};
1771    use tokio::time::{Duration, sleep};
1772    use url::Url;
1773    use uuid::Uuid;
1774
1775    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1776
1777    fn sqlite_url() -> String {
1778        let nanos = SystemTime::now()
1779            .duration_since(UNIX_EPOCH)
1780            .expect("clock drift")
1781            .as_nanos();
1782        let pid = std::process::id();
1783        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1784        let db_dir = std::env::current_dir()
1785            .expect("cwd")
1786            .join("target")
1787            .join("test-dbs");
1788        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1789        let db_path = db_dir.join(format!(
1790            "relay-core-runtime-test-{}-{}-{}.db",
1791            pid, nanos, seq
1792        ));
1793        format!("sqlite://{}?mode=rwc", db_path.display())
1794    }
1795
1796    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1797        let start_time =
1798            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1799        let request_url =
1800            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1801        Flow {
1802            id: Uuid::new_v4(),
1803            start_time,
1804            end_time: Some(start_time),
1805            network: NetworkInfo {
1806                client_ip: "127.0.0.1".to_string(),
1807                client_port: 12000,
1808                server_ip: "127.0.0.1".to_string(),
1809                server_port: 8080,
1810                protocol: TransportProtocol::TCP,
1811                tls: false,
1812                tls_version: None,
1813                sni: None,
1814            },
1815            layer: Layer::Http(HttpLayer {
1816                request: HttpRequest {
1817                    method: method.to_string(),
1818                    url: request_url,
1819                    version: "HTTP/1.1".to_string(),
1820                    headers: vec![],
1821                    cookies: vec![],
1822                    query: vec![],
1823                    body: None,
1824                },
1825                response: Some(HttpResponse {
1826                    status,
1827                    status_text: "OK".to_string(),
1828                    version: "HTTP/1.1".to_string(),
1829                    headers: vec![],
1830                    cookies: vec![],
1831                    body: None,
1832                    timing: ResponseTiming {
1833                        time_to_first_byte: None,
1834                        time_to_last_byte: None,
1835                        connect_time_ms: None,
1836                        ssl_time_ms: None,
1837                    },
1838                }),
1839                error: None,
1840            }),
1841            tags: vec![],
1842            meta: std::collections::HashMap::new(),
1843            resilience_trace: None,
1844            rule_variables: std::collections::HashMap::new(),
1845            matched_rules: vec![],
1846        }
1847    }
1848
1849    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1850        let start_time =
1851            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1852        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1853            .expect("url should parse");
1854        Flow {
1855            id: Uuid::new_v4(),
1856            start_time,
1857            end_time: Some(start_time),
1858            network: NetworkInfo {
1859                client_ip: "127.0.0.1".to_string(),
1860                client_port: 12000,
1861                server_ip: "127.0.0.1".to_string(),
1862                server_port: 8080,
1863                protocol: TransportProtocol::TCP,
1864                tls: false,
1865                tls_version: None,
1866                sni: None,
1867            },
1868            layer: Layer::Http(HttpLayer {
1869                request: HttpRequest {
1870                    method: "GET".to_string(),
1871                    url: request_url,
1872                    version: "HTTP/1.1".to_string(),
1873                    headers: vec![
1874                        (
1875                            "Authorization".to_string(),
1876                            "Bearer secret-token".to_string(),
1877                        ),
1878                        ("X-Normal".to_string(), "visible".to_string()),
1879                    ],
1880                    cookies: vec![],
1881                    query: vec![
1882                        ("token".to_string(), "abc123".to_string()),
1883                        ("ok".to_string(), "1".to_string()),
1884                    ],
1885                    body: Some(BodyData {
1886                        encoding: "utf-8".to_string(),
1887                        content: "secret request body".to_string(),
1888                        size: 19,
1889                    }),
1890                },
1891                response: Some(HttpResponse {
1892                    status: 200,
1893                    status_text: "OK".to_string(),
1894                    version: "HTTP/1.1".to_string(),
1895                    headers: vec![
1896                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1897                        ("X-Response".to_string(), "visible".to_string()),
1898                    ],
1899                    cookies: vec![],
1900                    body: Some(BodyData {
1901                        encoding: "utf-8".to_string(),
1902                        content: "secret response body".to_string(),
1903                        size: 20,
1904                    }),
1905                    timing: ResponseTiming {
1906                        time_to_first_byte: None,
1907                        time_to_last_byte: None,
1908                        connect_time_ms: None,
1909                        ssl_time_ms: None,
1910                    },
1911                }),
1912                error: None,
1913            }),
1914            tags: vec![],
1915            meta: std::collections::HashMap::new(),
1916            resilience_trace: None,
1917            rule_variables: std::collections::HashMap::new(),
1918            matched_rules: vec![],
1919        }
1920    }
1921
1922    #[tokio::test]
1923    async fn set_rules_from_records_audit_event() {
1924        let state = CoreState::new(None).await;
1925
1926        state
1927            .set_rules_from(
1928                AuditActor::Http,
1929                "rule.upsert",
1930                "rule-1".to_string(),
1931                json!({ "route": "/api/v1/rules" }),
1932                Vec::new(),
1933            )
1934            .await
1935            .expect("set rules should succeed");
1936
1937        let events = state.recent_audit_events();
1938        let event = events.last().expect("audit event should exist");
1939        assert_eq!(event.actor, AuditActor::Http);
1940        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1941        assert_eq!(event.outcome, AuditOutcome::Success);
1942        assert_eq!(event.target, "rule-1");
1943        assert_eq!(event.details["operation"], "rule.upsert");
1944        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1945    }
1946
1947    #[tokio::test]
1948    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1949        let state = CoreState::new(None).await;
1950
1951        state
1952            .upsert_rule_from(
1953                AuditActor::Probe,
1954                "rule.upsert",
1955                "rule-1".to_string(),
1956                json!({ "tool": "set_rule" }),
1957                Rule {
1958                    id: "rule-1".to_string(),
1959                    name: "first".to_string(),
1960                    active: true,
1961                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1962                    priority: 1,
1963                    termination: relay_core_lib::rule::RuleTermination::Continue,
1964                    filter: relay_core_lib::rule::Filter::Url(
1965                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1966                    ),
1967                    actions: vec![],
1968                    constraints: None,
1969                },
1970            )
1971            .await
1972            .expect("initial upsert should succeed");
1973
1974        state
1975            .upsert_rule_from(
1976                AuditActor::Probe,
1977                "rule.upsert",
1978                "rule-1".to_string(),
1979                json!({ "tool": "set_rule" }),
1980                Rule {
1981                    id: "rule-1".to_string(),
1982                    name: "second".to_string(),
1983                    active: true,
1984                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1985                    priority: 2,
1986                    termination: relay_core_lib::rule::RuleTermination::Continue,
1987                    filter: relay_core_lib::rule::Filter::Url(
1988                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1989                    ),
1990                    actions: vec![],
1991                    constraints: None,
1992                },
1993            )
1994            .await
1995            .expect("replacement upsert should succeed");
1996
1997        let rules = state.get_rules().await;
1998        assert_eq!(rules.len(), 1);
1999        assert_eq!(rules[0].name, "second");
2000        let event = state
2001            .recent_audit_events()
2002            .last()
2003            .cloned()
2004            .expect("audit event");
2005        assert_eq!(event.details["operation"], "rule.upsert");
2006        assert_eq!(event.details["details"]["tool"], "set_rule");
2007    }
2008
2009    #[tokio::test]
2010    async fn delete_rule_from_returns_false_when_rule_missing() {
2011        let state = CoreState::new(None).await;
2012
2013        let deleted = state
2014            .delete_rule_from(
2015                AuditActor::Http,
2016                "rule.delete",
2017                "missing".to_string(),
2018                json!({ "route": "/api/v1/rules/{id}" }),
2019                "missing",
2020            )
2021            .await
2022            .expect("delete should not fail");
2023
2024        assert!(!deleted);
2025        assert!(state.recent_audit_events().is_empty());
2026    }
2027
2028    #[tokio::test]
2029    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2030        let state = CoreState::new(None).await;
2031
2032        let rule_id = state
2033            .create_mock_response_rule_from(
2034                AuditActor::Http,
2035                "api-mock-1".to_string(),
2036                json!({ "route": "/api/v1/mock", "status": 201 }),
2037                MockResponseRuleConfig {
2038                    rule_id: "api-mock-1".to_string(),
2039                    url_pattern: "example.com".to_string(),
2040                    name: "api-mock:example.com".to_string(),
2041                    status: 201,
2042                    content_type: "application/json".to_string(),
2043                    body: "{\"ok\":true}".to_string(),
2044                },
2045            )
2046            .await
2047            .expect("mock rule should be created");
2048
2049        assert_eq!(rule_id, "api-mock-1");
2050        let rules = state.get_rules().await;
2051        assert_eq!(rules.len(), 1);
2052        assert_eq!(rules[0].id, "api-mock-1");
2053        let event = state
2054            .recent_audit_events()
2055            .last()
2056            .cloned()
2057            .expect("audit event");
2058        assert_eq!(event.details["operation"], "rule.mock_create");
2059        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2060    }
2061
2062    #[tokio::test]
2063    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2064        let state = CoreState::new(None).await;
2065
2066        let rule_id = state
2067            .create_intercept_rule_from(
2068                AuditActor::Http,
2069                "intercept-1".to_string(),
2070                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2071                InterceptRuleConfig {
2072                    rule_id: "intercept-1".to_string(),
2073                    active: true,
2074                    url_pattern: "example.com".to_string(),
2075                    method: None,
2076                    phase: "request".to_string(),
2077                    name: "api-intercept:example.com".to_string(),
2078                    priority: 100,
2079                    termination: relay_core_lib::rule::RuleTermination::Stop,
2080                },
2081            )
2082            .await
2083            .expect("intercept rule should be created");
2084
2085        assert_eq!(rule_id, "intercept-1");
2086        let rules = state.get_rules().await;
2087        assert_eq!(rules.len(), 1);
2088        assert_eq!(rules[0].id, "intercept-1");
2089        assert_eq!(rules[0].name, "api-intercept:example.com");
2090        let event = state
2091            .recent_audit_events()
2092            .last()
2093            .cloned()
2094            .expect("audit event");
2095        assert_eq!(event.details["operation"], "rule.intercept_create");
2096        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2097    }
2098
2099    #[tokio::test]
2100    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2101        let state = CoreState::new(None).await;
2102
2103        state
2104            .upsert_legacy_intercept_rule_from(
2105                AuditActor::Tauri,
2106                "legacy-1".to_string(),
2107                json!({ "command": "set_intercept_rule" }),
2108                InterceptRule {
2109                    id: "legacy-1".to_string(),
2110                    active: true,
2111                    url_pattern: "example.com".to_string(),
2112                    method: None,
2113                    phase: "both".to_string(),
2114                },
2115            )
2116            .await
2117            .expect("initial family upsert should succeed");
2118        assert_eq!(state.get_rules().await.len(), 2);
2119
2120        state
2121            .upsert_legacy_intercept_rule_from(
2122                AuditActor::Tauri,
2123                "legacy-1".to_string(),
2124                json!({ "command": "set_intercept_rule" }),
2125                InterceptRule {
2126                    id: "legacy-1".to_string(),
2127                    active: true,
2128                    url_pattern: "example.org".to_string(),
2129                    method: Some("POST".to_string()),
2130                    phase: "request".to_string(),
2131                },
2132            )
2133            .await
2134            .expect("replacement family upsert should succeed");
2135
2136        let rules = state.get_rules().await;
2137        assert_eq!(rules.len(), 1);
2138        assert_eq!(rules[0].id, "legacy-1");
2139        let event = state
2140            .recent_audit_events()
2141            .last()
2142            .cloned()
2143            .expect("audit event");
2144        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2145        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2146    }
2147
2148    #[tokio::test]
2149    async fn resolve_intercept_failure_records_failed_audit_event() {
2150        let state = CoreState::new(None).await;
2151
2152        let result = state
2153            .resolve_intercept_with_modifications_from(
2154                AuditActor::Probe,
2155                "missing-flow:request".to_string(),
2156                "drop",
2157                None,
2158            )
2159            .await;
2160
2161        assert!(result.is_err());
2162        let events = state.recent_audit_events();
2163        let event = events.last().expect("audit event should exist");
2164        assert_eq!(event.actor, AuditActor::Probe);
2165        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2166        assert_eq!(event.outcome, AuditOutcome::Failed);
2167        assert_eq!(event.details["action"], "drop");
2168        assert!(
2169            event.details["error"]
2170                .as_str()
2171                .unwrap_or_default()
2172                .contains("Interception not found")
2173        );
2174    }
2175
2176    #[tokio::test]
2177    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2178        let state = CoreState::new(None).await;
2179        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2180
2181        state
2182            .prepare_start(8080, shutdown_tx)
2183            .expect("prepare start should succeed");
2184        let lifecycle = state.lifecycle();
2185        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2186        assert_eq!(lifecycle.port, Some(8080));
2187        assert!(lifecycle.started_at_ms.is_none());
2188        assert!(lifecycle.last_error.is_none());
2189
2190        assert_eq!(
2191            state.stop_proxy().expect("stop should succeed"),
2192            ProxyStopResult::Stopping
2193        );
2194        let lifecycle = state.lifecycle();
2195        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2196        assert_eq!(lifecycle.port, Some(8080));
2197        assert!(shutdown_rx.await.is_ok());
2198    }
2199
2200    #[tokio::test]
2201    async fn status_snapshot_derives_runtime_facing_fields() {
2202        let state = CoreState::new(None).await;
2203        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2204        state
2205            .prepare_start(8080, shutdown_tx)
2206            .expect("prepare start should succeed");
2207
2208        let status = state.status_snapshot();
2209        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2210        assert!(status.running);
2211        assert_eq!(status.port, Some(8080));
2212        assert!(status.uptime.is_none());
2213        assert!(status.last_error.is_none());
2214    }
2215
2216    #[tokio::test]
2217    async fn status_report_combines_status_and_metrics() {
2218        let state = CoreState::new(None).await;
2219        let report = state.status_report().await;
2220
2221        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2222        assert!(!report.status.running);
2223        assert_eq!(report.metrics.intercepts_pending, 0);
2224        assert_eq!(report.metrics.ws_pending_messages, 0);
2225        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2226        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2227        assert_eq!(report.metrics.audit_events_total, 0);
2228        assert_eq!(report.metrics.audit_events_failed, 0);
2229        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2230        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2231    }
2232
2233    #[test]
2234    fn proxy_config_new_and_transport_setters_preserve_values() {
2235        let config = ProxyConfig::new(
2236            8080,
2237            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2238            std::path::PathBuf::from("/tmp/ca_key.pem"),
2239        )
2240        .with_transparent(true)
2241        .with_udp_tproxy_port(Some(15000));
2242
2243        assert_eq!(config.port, 8080);
2244        assert_eq!(
2245            config.ca_cert_path,
2246            std::path::PathBuf::from("/tmp/ca_cert.pem")
2247        );
2248        assert_eq!(
2249            config.ca_key_path,
2250            std::path::PathBuf::from("/tmp/ca_key.pem")
2251        );
2252        assert!(config.transparent);
2253        assert_eq!(config.udp_tproxy_port, Some(15000));
2254    }
2255
2256    #[test]
2257    fn proxy_config_from_app_data_dir_creates_default_paths() {
2258        let unique = SystemTime::now()
2259            .duration_since(UNIX_EPOCH)
2260            .expect("clock drift")
2261            .as_nanos();
2262        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2263
2264        let config =
2265            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2266
2267        assert!(dir.exists());
2268        assert_eq!(config.port, 8899);
2269        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2270        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2271        assert!(!config.transparent);
2272        assert!(config.udp_tproxy_port.is_none());
2273    }
2274
2275    #[tokio::test]
2276    async fn intercept_snapshot_maps_pending_counts() {
2277        let state = CoreState::new(None).await;
2278        let snapshot = state.intercept_snapshot().await;
2279
2280        assert_eq!(snapshot.pending_count, 0);
2281        assert_eq!(snapshot.ws_pending_count, 0);
2282    }
2283
2284    #[tokio::test]
2285    async fn audit_snapshot_returns_latest_events_in_order() {
2286        let state = CoreState::new(None).await;
2287        state.record_audit_event(AuditEvent::new(
2288            AuditActor::Runtime,
2289            AuditEventKind::RuleChanged,
2290            "first",
2291            AuditOutcome::Success,
2292            json!({ "index": 1 }),
2293        ));
2294        state.record_audit_event(AuditEvent::new(
2295            AuditActor::Http,
2296            AuditEventKind::PolicyUpdated,
2297            "second",
2298            AuditOutcome::Success,
2299            json!({ "index": 2 }),
2300        ));
2301
2302        let snapshot = state.audit_snapshot(1);
2303
2304        assert_eq!(snapshot.events.len(), 1);
2305        assert_eq!(snapshot.events[0].target, "second");
2306        assert_eq!(snapshot.events[0].details["index"], 2);
2307    }
2308
2309    #[tokio::test]
2310    async fn query_audit_snapshot_filters_in_memory_events() {
2311        let state = CoreState::new(None).await;
2312        state.record_audit_event(AuditEvent::new(
2313            AuditActor::Http,
2314            AuditEventKind::RuleChanged,
2315            "rule-1",
2316            AuditOutcome::Success,
2317            json!({ "idx": 1 }),
2318        ));
2319        state.record_audit_event(AuditEvent::new(
2320            AuditActor::Probe,
2321            AuditEventKind::PolicyUpdated,
2322            "policy",
2323            AuditOutcome::Failed,
2324            json!({ "idx": 2 }),
2325        ));
2326
2327        let snapshot = state
2328            .query_audit_snapshot(CoreAuditQuery {
2329                actor: Some(AuditActor::Probe),
2330                kind: Some(AuditEventKind::PolicyUpdated),
2331                outcome: Some(AuditOutcome::Failed),
2332                limit: 10,
2333                ..Default::default()
2334            })
2335            .await;
2336
2337        assert_eq!(snapshot.events.len(), 1);
2338        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2339        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2340        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2341    }
2342
2343    #[tokio::test]
2344    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2345        let state = CoreState::new(Some(sqlite_url())).await;
2346        state.update_policy_from(
2347            AuditActor::Http,
2348            "policy".to_string(),
2349            ProxyPolicy {
2350                transparent_enabled: true,
2351                ..Default::default()
2352            },
2353        );
2354
2355        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2356        for _ in 0..10 {
2357            snapshot = state
2358                .query_audit_snapshot(CoreAuditQuery {
2359                    actor: Some(AuditActor::Http),
2360                    kind: Some(AuditEventKind::PolicyUpdated),
2361                    limit: 10,
2362                    ..Default::default()
2363                })
2364                .await;
2365            if !snapshot.events.is_empty() {
2366                break;
2367            }
2368            sleep(Duration::from_millis(20)).await;
2369        }
2370
2371        assert!(!snapshot.events.is_empty());
2372        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2373        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2374    }
2375
2376    #[tokio::test]
2377    async fn prepare_start_rejects_second_active_start() {
2378        let state = CoreState::new(None).await;
2379        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2380        state
2381            .prepare_start(8080, shutdown_tx)
2382            .expect("first start should succeed");
2383
2384        let (second_tx, _second_rx) = oneshot::channel();
2385        let error = state
2386            .prepare_start(8081, second_tx)
2387            .expect_err("second active start should be rejected");
2388        assert!(error.contains("already"));
2389    }
2390
2391    #[test]
2392    fn update_policy_records_audit_event() {
2393        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2394        let state = runtime.block_on(CoreState::new(None));
2395
2396        state.update_policy_from(
2397            AuditActor::Runtime,
2398            "policy".to_string(),
2399            ProxyPolicy {
2400                transparent_enabled: true,
2401                ..Default::default()
2402            },
2403        );
2404
2405        let events = state.recent_audit_events();
2406        let event = events.last().expect("audit event should exist");
2407        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2408        assert_eq!(event.outcome, AuditOutcome::Success);
2409        assert_eq!(event.details["transparent_enabled"], true);
2410    }
2411
2412    #[test]
2413    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2414        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2415        let state = runtime.block_on(CoreState::new(None));
2416        let original_timeout = state.policy_snapshot().request_timeout_ms;
2417
2418        state.patch_policy_from(
2419            AuditActor::Runtime,
2420            "policy.patch".to_string(),
2421            relay_core_api::policy::ProxyPolicyPatch {
2422                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2423                    enabled: Some(true),
2424                    redact_bodies: Some(true),
2425                    ..Default::default()
2426                }),
2427                upstream: None,
2428            },
2429        );
2430
2431        let policy = state.policy_snapshot();
2432        assert_eq!(policy.request_timeout_ms, original_timeout);
2433        assert!(policy.redaction.enabled);
2434        assert!(policy.redaction.redact_bodies);
2435
2436        let events = state.recent_audit_events();
2437        let event = events.last().expect("audit event should exist");
2438        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2439        assert_eq!(event.details["redaction_enabled"], true);
2440    }
2441
2442    #[tokio::test]
2443    async fn metrics_include_audit_and_lagged_event_counters() {
2444        let state = CoreState::new(None).await;
2445
2446        state.update_policy_from(
2447            AuditActor::Runtime,
2448            "policy".to_string(),
2449            ProxyPolicy::default(),
2450        );
2451        let _ = state
2452            .resolve_intercept_with_modifications_from(
2453                AuditActor::Probe,
2454                "missing-flow:request".to_string(),
2455                "drop",
2456                None,
2457            )
2458            .await;
2459
2460        state.record_flow_events_lagged(3);
2461        state.record_audit_events_lagged(5);
2462
2463        let metrics = state.get_metrics().await;
2464        assert_eq!(metrics.audit_events_total, 2);
2465        assert_eq!(metrics.audit_events_failed, 1);
2466        assert_eq!(metrics.flow_events_lagged_total, 3);
2467        assert_eq!(metrics.audit_events_lagged_total, 5);
2468    }
2469
2470    #[tokio::test]
2471    async fn prometheus_metrics_text_contains_observability_fields() {
2472        let state = CoreState::new(None).await;
2473        state.record_flow_events_lagged(2);
2474        state.record_audit_events_lagged(4);
2475
2476        let text = state.get_metrics_prometheus_text().await;
2477        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2478        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2479        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2480        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2481    }
2482
2483    #[tokio::test]
2484    async fn search_flows_uses_store_with_offset_pagination() {
2485        let state = CoreState::new(Some(sqlite_url())).await;
2486        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2487        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2488        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2489
2490        state.upsert_flow(Box::new(flow_a));
2491        state.upsert_flow(Box::new(flow_b));
2492        state.upsert_flow(Box::new(flow_c));
2493
2494        let mut baseline = Vec::new();
2495        for _ in 0..20 {
2496            baseline = state
2497                .search_flows(FlowQuery {
2498                    host: Some("api.example.com".to_string()),
2499                    path_contains: None,
2500                    method: None,
2501                    status_min: None,
2502                    status_max: None,
2503                    has_error: None,
2504                    is_websocket: None,
2505                    limit: Some(3),
2506                    offset: Some(0),
2507                })
2508                .await;
2509            if baseline.len() == 3 {
2510                break;
2511            }
2512            sleep(Duration::from_millis(20)).await;
2513        }
2514
2515        assert_eq!(baseline.len(), 3);
2516        let page = state
2517            .search_flows(FlowQuery {
2518                host: Some("api.example.com".to_string()),
2519                path_contains: None,
2520                method: None,
2521                status_min: None,
2522                status_max: None,
2523                has_error: None,
2524                is_websocket: None,
2525                limit: Some(1),
2526                offset: Some(1),
2527            })
2528            .await;
2529        assert_eq!(page.len(), 1);
2530        assert_eq!(page[0].id, baseline[1].id);
2531    }
2532
2533    #[tokio::test]
2534    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2535        let state = CoreState::new(Some(sqlite_url())).await;
2536        let first_flow = sample_http_flow(
2537            "persist.example.com",
2538            "/first",
2539            "GET",
2540            200,
2541            1_700_000_010_000,
2542        );
2543        let first_id = first_flow.id.to_string();
2544        state.upsert_flow(Box::new(first_flow));
2545        for i in 0..240 {
2546            state.upsert_flow(Box::new(sample_http_flow(
2547                "persist.example.com",
2548                &format!("/{}", i),
2549                "GET",
2550                200,
2551                1_700_000_020_000 + i,
2552            )));
2553        }
2554
2555        sleep(Duration::from_millis(200)).await;
2556
2557        let loaded = state.get_flow(first_id).await;
2558        assert!(loaded.is_some());
2559    }
2560
2561    #[tokio::test]
2562    async fn search_flows_redacts_summary_url_when_enabled() {
2563        let state = CoreState::new(None).await;
2564        state.update_policy_from(
2565            AuditActor::Runtime,
2566            "policy.redaction".to_string(),
2567            ProxyPolicy {
2568                redaction: RedactionPolicy {
2569                    enabled: true,
2570                    sensitive_query_keys: vec!["token".to_string()],
2571                    redact_bodies: false,
2572                    ..Default::default()
2573                },
2574                ..Default::default()
2575            },
2576        );
2577        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2578
2579        let mut items = Vec::new();
2580        for _ in 0..20 {
2581            items = state
2582                .search_flows(FlowQuery {
2583                    host: Some("api.example.com".to_string()),
2584                    path_contains: Some("/private".to_string()),
2585                    ..Default::default()
2586                })
2587                .await;
2588            if !items.is_empty() {
2589                break;
2590            }
2591            sleep(Duration::from_millis(20)).await;
2592        }
2593
2594        assert!(!items.is_empty());
2595        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2596        let token = redacted
2597            .query_pairs()
2598            .find(|(k, _)| k == "token")
2599            .map(|(_, v)| v.to_string());
2600        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2601    }
2602
2603    #[tokio::test]
2604    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2605        let state = CoreState::new(Some(sqlite_url())).await;
2606        state.update_policy_from(
2607            AuditActor::Runtime,
2608            "policy.redaction".to_string(),
2609            ProxyPolicy {
2610                redaction: RedactionPolicy {
2611                    enabled: true,
2612                    sensitive_header_names: vec![
2613                        "authorization".to_string(),
2614                        "set-cookie".to_string(),
2615                    ],
2616                    sensitive_query_keys: vec!["token".to_string()],
2617                    redact_bodies: true,
2618                },
2619                ..Default::default()
2620            },
2621        );
2622
2623        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2624        let flow_id = flow.id.to_string();
2625        state.upsert_flow(Box::new(flow));
2626        sleep(Duration::from_millis(80)).await;
2627
2628        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2629        let Layer::Http(http) = loaded.layer else {
2630            panic!("expected http layer");
2631        };
2632
2633        let auth = http
2634            .request
2635            .headers
2636            .iter()
2637            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2638            .map(|(_, v)| v.as_str());
2639        assert_eq!(auth, Some("[REDACTED]"));
2640
2641        let req_query_token = http
2642            .request
2643            .query
2644            .iter()
2645            .find(|(k, _)| k == "token")
2646            .map(|(_, v)| v.as_str());
2647        assert_eq!(req_query_token, Some("[REDACTED]"));
2648
2649        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2650        assert_eq!(req_body, Some("[REDACTED]"));
2651
2652        let response = http.response.expect("response should exist");
2653        let set_cookie = response
2654            .headers
2655            .iter()
2656            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2657            .map(|(_, v)| v.as_str());
2658        assert_eq!(set_cookie, Some("[REDACTED]"));
2659        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2660        assert_eq!(res_body, Some("[REDACTED]"));
2661    }
2662
2663    #[test]
2664    fn redact_flow_update_masks_http_body_when_enabled() {
2665        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2666        let state = runtime.block_on(CoreState::new(None));
2667        state.update_policy_from(
2668            AuditActor::Runtime,
2669            "policy.redaction".to_string(),
2670            ProxyPolicy {
2671                redaction: RedactionPolicy {
2672                    enabled: true,
2673                    redact_bodies: true,
2674                    ..Default::default()
2675                },
2676                ..Default::default()
2677            },
2678        );
2679
2680        let update = FlowUpdate::HttpBody {
2681            flow_id: "f-1".to_string(),
2682            direction: relay_core_api::flow::Direction::ClientToServer,
2683            body: BodyData {
2684                encoding: "utf-8".to_string(),
2685                content: "super-secret".to_string(),
2686                size: 12,
2687            },
2688        };
2689        let redacted = state.redact_flow_update_for_output(update);
2690        match redacted {
2691            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2692            _ => panic!("expected http body update"),
2693        }
2694    }
2695
2696    #[cfg(feature = "script")]
2697    #[tokio::test]
2698    async fn load_script_from_records_audit_event() {
2699        let state = CoreState::new(None).await;
2700
2701        state
2702            .load_script_from(
2703                AuditActor::Tauri,
2704                "tauri.load_script".to_string(),
2705                "globalThis.onRequestHeaders = (_flow) => {};",
2706            )
2707            .await
2708            .expect("script should load");
2709
2710        let events = state.recent_audit_events();
2711        let event = events.last().expect("audit event should exist");
2712        assert_eq!(event.actor, AuditActor::Tauri);
2713        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2714        assert_eq!(event.outcome, AuditOutcome::Success);
2715        assert_eq!(event.target, "tauri.load_script");
2716    }
2717}