Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62mod log_format;
63pub mod modification;
64pub mod paths;
65pub mod rule;
66pub mod services;
67
68// ── Re-exports for user convenience ──
69// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
70pub use paths::CaPaths;
71pub use relay_core_api::{flow, policy};
72pub use relay_core_lib::InterceptionResult;
73pub use relay_core_lib::rule as lib_rule;
74
75use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
76use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
77use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
78
79pub mod rule_engine {
80    pub use relay_core_lib::rule_engine::*;
81}
82
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
84pub struct CoreMetrics {
85    pub flows_total: usize,
86    pub flows_in_memory: usize,
87    pub flows_dropped: usize,
88    pub intercepts_pending: usize,
89    pub ws_pending_messages: usize,
90    pub oldest_intercept_age_ms: Option<u64>,
91    pub oldest_ws_message_age_ms: Option<u64>,
92    pub rule_exec_errors: usize,
93    pub audit_events_total: usize,
94    pub audit_events_failed: usize,
95    pub flow_events_lagged_total: usize,
96    pub audit_events_lagged_total: usize,
97    /// O4: Total bodies degraded (budget exceeded)
98    pub proxy_body_degraded_total: usize,
99    /// O4: Total HTTP requests processed through streaming pipeline
100    pub proxy_http_request_total: usize,
101    /// O4: Total sandbox rejections
102    pub proxy_sandbox_reject_total: usize,
103    /// O4: Total invalid method rejections (strict_http_semantics)
104    pub proxy_invalid_method_total: usize,
105    /// O4: Total invalid status code rejections (strict_http_semantics)
106    pub proxy_invalid_status_total: usize,
107    /// O4: Total idempotent request retries
108    pub proxy_retry_total: usize,
109    /// O4: Total bodies processed in tap (streaming) mode
110    pub proxy_stream_mode_tap_total: usize,
111    /// O4: Total bodies degraded from tap to pass-through
112    pub proxy_stream_mode_degrade_total: usize,
113}
114
115impl CoreMetrics {
116    pub fn to_prometheus_text(&self) -> String {
117        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
118        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
119        format!(
120            "relay_core_flows_total {}\n\
121relay_core_flows_in_memory {}\n\
122relay_core_flows_dropped_total {}\n\
123relay_core_intercepts_pending {}\n\
124relay_core_ws_pending_messages {}\n\
125relay_core_oldest_intercept_age_ms {}\n\
126relay_core_oldest_ws_message_age_ms {}\n\
127relay_core_rule_exec_errors_total {}\n\
128relay_core_audit_events_total {}\n\
129relay_core_audit_events_failed_total {}\n\
130relay_core_flow_events_lagged_total {}\n\
131relay_core_audit_events_lagged_total {}\n\
132relay_core_proxy_body_degraded_total {}\n\
133relay_core_proxy_http_request_total {}\n\
134relay_core_proxy_sandbox_reject_total {}\n\
135relay_core_proxy_invalid_method_total {}\n\
136relay_core_proxy_invalid_status_total {}\n\
137relay_core_proxy_retry_total {}\n\
138relay_core_proxy_stream_mode_tap_total {}\n\
139relay_core_proxy_stream_mode_degrade_total {}\n",
140            self.flows_total,
141            self.flows_in_memory,
142            self.flows_dropped,
143            self.intercepts_pending,
144            self.ws_pending_messages,
145            oldest_intercept_age_ms,
146            oldest_ws_message_age_ms,
147            self.rule_exec_errors,
148            self.audit_events_total,
149            self.audit_events_failed,
150            self.flow_events_lagged_total,
151            self.audit_events_lagged_total,
152            self.proxy_body_degraded_total,
153            self.proxy_http_request_total,
154            self.proxy_sandbox_reject_total,
155            self.proxy_invalid_method_total,
156            self.proxy_invalid_status_total,
157            self.proxy_retry_total,
158            self.proxy_stream_mode_tap_total,
159            self.proxy_stream_mode_degrade_total,
160        )
161    }
162}
163
164#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
165pub struct CoreStatusSnapshot {
166    pub phase: RuntimeLifecyclePhase,
167    pub running: bool,
168    pub port: Option<u16>,
169    pub uptime: Option<u64>,
170    pub last_error: Option<String>,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
174pub struct CoreStatusReport {
175    pub status: CoreStatusSnapshot,
176    pub metrics: CoreMetrics,
177}
178
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
180pub struct PendingInterceptItem {
181    pub key: String,
182    pub flow_id: String,
183    pub phase: String,
184    pub url: String,
185    pub method: String,
186    pub created_at_ms: u64,
187}
188
189#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
190pub struct CoreInterceptSnapshot {
191    pub pending_count: usize,
192    pub ws_pending_count: usize,
193    #[serde(default, skip_serializing_if = "Vec::is_empty")]
194    pub items: Vec<PendingInterceptItem>,
195}
196
197#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
198pub struct CoreAuditSnapshot {
199    pub events: Vec<AuditEvent>,
200}
201
202#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
203pub struct CoreAuditQuery {
204    pub since_ms: Option<u64>,
205    pub until_ms: Option<u64>,
206    pub actor: Option<AuditActor>,
207    pub kind: Option<AuditEventKind>,
208    pub outcome: Option<AuditOutcome>,
209    pub limit: usize,
210}
211
212impl Default for CoreAuditQuery {
213    fn default() -> Self {
214        Self {
215            since_ms: None,
216            until_ms: None,
217            actor: None,
218            kind: None,
219            outcome: None,
220            limit: 50,
221        }
222    }
223}
224
225#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
226#[serde(rename_all = "snake_case")]
227pub enum RuntimeLifecyclePhase {
228    Created,
229    Starting,
230    Running,
231    Stopping,
232    Stopped,
233    Failed,
234}
235
236impl RuntimeLifecyclePhase {
237    pub fn as_str(&self) -> &'static str {
238        match self {
239            Self::Created => "created",
240            Self::Starting => "starting",
241            Self::Running => "running",
242            Self::Stopping => "stopping",
243            Self::Stopped => "stopped",
244            Self::Failed => "failed",
245        }
246    }
247
248    pub fn is_active(&self) -> bool {
249        matches!(self, Self::Starting | Self::Running | Self::Stopping)
250    }
251}
252
253#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
254pub struct RuntimeLifecycle {
255    pub phase: RuntimeLifecyclePhase,
256    pub port: Option<u16>,
257    pub started_at_ms: Option<u64>,
258    pub last_error: Option<String>,
259}
260
261impl RuntimeLifecycle {
262    pub fn created() -> Self {
263        Self {
264            phase: RuntimeLifecyclePhase::Created,
265            port: None,
266            started_at_ms: None,
267            last_error: None,
268        }
269    }
270
271    pub fn is_active(&self) -> bool {
272        self.phase.is_active()
273    }
274
275    pub fn uptime_seconds(&self) -> Option<u64> {
276        let started_at_ms = self.started_at_ms?;
277        let now_ms = now_unix_ms();
278        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
279    }
280}
281
282pub enum ProxySpawnResult {
283    Started(tokio::task::JoinHandle<()>),
284    AlreadyRunning,
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum ProxyStopResult {
289    Stopping,
290    NotRunning,
291}
292
293impl From<RuntimeLifecycle> for CoreStatusSnapshot {
294    fn from(lifecycle: RuntimeLifecycle) -> Self {
295        Self {
296            running: lifecycle.is_active(),
297            uptime: lifecycle.uptime_seconds(),
298            port: lifecycle.port,
299            last_error: lifecycle.last_error,
300            phase: lifecycle.phase,
301        }
302    }
303}
304
305pub struct CoreState {
306    flow_store: mpsc::Sender<FlowStoreMessage>,
307    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
308    rule_store: mpsc::Sender<RuleStoreMessage>,
309    store: Option<Store>,
310    #[cfg(feature = "script")]
311    pub script_interceptor: Arc<ScriptInterceptor>,
312    pub policy_tx: watch::Sender<ProxyPolicy>,
313    pub flows_dropped: Arc<AtomicUsize>,
314    audit_events_total: Arc<AtomicUsize>,
315    audit_events_failed: Arc<AtomicUsize>,
316    flow_events_lagged_total: Arc<AtomicUsize>,
317    audit_events_lagged_total: Arc<AtomicUsize>,
318    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
319    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
320    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
321    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
322    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
323    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
324}
325
326impl CoreState {
327    pub async fn new(db_url: Option<String>) -> Self {
328        const AUDIT_HISTORY_LIMIT: usize = 200;
329
330        let store = if let Some(url) = db_url {
331            match Store::connect(&url).await {
332                Ok(s) => {
333                    if let Err(e) = s.init().await {
334                        tracing::error!("Failed to init store: {}", e);
335                    }
336                    Some(s)
337                }
338                Err(e) => {
339                    tracing::error!("Failed to connect to store: {}", e);
340                    None
341                }
342            }
343        } else {
344            None
345        };
346
347        let (flow_tx, flow_rx) = mpsc::channel(10000);
348        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
349        tokio::spawn(flow_actor.run());
350
351        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
352        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
353        tokio::spawn(intercept_actor.run());
354
355        let (rule_tx, rule_rx) = mpsc::channel(100);
356        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
357        tokio::spawn(rule_actor.run());
358
359        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
360        let (flow_broadcast_tx, _) = broadcast::channel(1000);
361        let (audit_broadcast_tx, _) = broadcast::channel(256);
362        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
363
364        #[cfg(feature = "script")]
365        let script_interceptor = ScriptInterceptor::new()
366            .await
367            .expect("Failed to initialize ScriptInterceptor");
368        Self {
369            flow_store: flow_tx,
370            intercept_broker: intercept_tx,
371            rule_store: rule_tx,
372            store,
373            #[cfg(feature = "script")]
374            script_interceptor: Arc::new(script_interceptor),
375            policy_tx,
376            flows_dropped: Arc::new(AtomicUsize::new(0)),
377            audit_events_total: Arc::new(AtomicUsize::new(0)),
378            audit_events_failed: Arc::new(AtomicUsize::new(0)),
379            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
380            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
381            flow_broadcast_tx,
382            audit_broadcast_tx,
383            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
384            lifecycle_tx,
385            shutdown_tx: Mutex::new(None),
386        }
387    }
388
389    pub async fn get_metrics(&self) -> CoreMetrics {
390        let (flow_tx, flow_rx) = oneshot::channel();
391        let _ = self
392            .flow_store
393            .send(FlowStoreMessage::GetMetrics(flow_tx))
394            .await;
395        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
396
397        let (int_tx, int_rx) = oneshot::channel();
398        let _ = self
399            .intercept_broker
400            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
401            .await;
402        let (
403            intercepts_pending,
404            ws_pending_messages,
405            oldest_intercept_age_ms,
406            oldest_ws_message_age_ms,
407        ) = int_rx.await.unwrap_or((0, 0, None, None));
408
409        let (rule_tx, rule_rx) = oneshot::channel();
410        let _ = self
411            .rule_store
412            .send(RuleStoreMessage::GetMetrics(rule_tx))
413            .await;
414        let rule_exec_errors = rule_rx.await.unwrap_or(0);
415
416        CoreMetrics {
417            flows_total,
418            flows_in_memory,
419            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
420            intercepts_pending,
421            ws_pending_messages,
422            oldest_intercept_age_ms,
423            oldest_ws_message_age_ms,
424            rule_exec_errors,
425            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
426            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
427            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
428            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
429            proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
430            proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
431            proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
432            proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
433            proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
434            proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
435            proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
436            proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
437            ),
438        }
439    }
440
441    pub async fn get_metrics_prometheus_text(&self) -> String {
442        let mut text = self.get_metrics().await.to_prometheus_text();
443        #[cfg(feature = "script")]
444        {
445            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
446        }
447        text
448    }
449
450    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
451        self.lifecycle().into()
452    }
453
454    pub async fn status_report(&self) -> CoreStatusReport {
455        CoreStatusReport {
456            status: self.status_snapshot(),
457            metrics: self.get_metrics().await,
458        }
459    }
460
461    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
462        let metrics = self.get_metrics().await;
463        let items = self.list_pending_intercept_items().await;
464        CoreInterceptSnapshot {
465            pending_count: metrics.intercepts_pending,
466            ws_pending_count: metrics.ws_pending_messages,
467            items,
468        }
469    }
470
471    pub async fn list_pending_intercept_items(&self) -> Vec<PendingInterceptItem> {
472        let keys = self.list_pending_intercept_keys().await;
473        let mut items = Vec::with_capacity(keys.len());
474        for (key, created_at_ms) in keys {
475            let (flow_id, phase) = parse_intercept_key(&key);
476            let (url, method) = if let Some(flow) = self.get_flow(flow_id.clone()).await {
477                flow_url_method(&flow)
478            } else {
479                (String::new(), String::new())
480            };
481            items.push(PendingInterceptItem {
482                key,
483                flow_id,
484                phase,
485                url,
486                method,
487                created_at_ms,
488            });
489        }
490        items
491    }
492
493    async fn list_pending_intercept_keys(&self) -> Vec<(String, u64)> {
494        let (tx, rx) = oneshot::channel();
495        if self
496            .intercept_broker
497            .send(InterceptBrokerMessage::ListPendingIntercepts { respond_to: tx })
498            .await
499            .is_err()
500        {
501            return Vec::new();
502        }
503        rx.await.unwrap_or_default()
504    }
505
506    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
507        let events = self.recent_audit_events();
508        let start = events.len().saturating_sub(limit);
509        CoreAuditSnapshot {
510            events: events.into_iter().skip(start).collect(),
511        }
512    }
513
514    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
515        let limit = query.limit.clamp(1, 500);
516        if let Some(store) = &self.store {
517            let rows = store
518                .query_audit_events(
519                    query.since_ms,
520                    query.until_ms,
521                    query.actor.as_ref().map(AuditActor::as_str),
522                    query.kind.as_ref().map(AuditEventKind::as_str),
523                    query.outcome.as_ref().map(AuditOutcome::as_str),
524                    limit,
525                )
526                .await
527                .unwrap_or_default();
528
529            let mut events = Vec::with_capacity(rows.len());
530            for row in rows {
531                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
532                    events.push(event);
533                }
534            }
535            return CoreAuditSnapshot { events };
536        }
537
538        let mut events = self.recent_audit_events();
539        events.reverse();
540        let filtered = events
541            .into_iter()
542            .filter(|event| {
543                query
544                    .since_ms
545                    .map(|v| event.timestamp_ms >= v)
546                    .unwrap_or(true)
547            })
548            .filter(|event| {
549                query
550                    .until_ms
551                    .map(|v| event.timestamp_ms <= v)
552                    .unwrap_or(true)
553            })
554            .filter(|event| {
555                query
556                    .actor
557                    .as_ref()
558                    .map(|v| &event.actor == v)
559                    .unwrap_or(true)
560            })
561            .filter(|event| {
562                query
563                    .kind
564                    .as_ref()
565                    .map(|v| &event.kind == v)
566                    .unwrap_or(true)
567            })
568            .filter(|event| {
569                query
570                    .outcome
571                    .as_ref()
572                    .map(|v| &event.outcome == v)
573                    .unwrap_or(true)
574            })
575            .take(limit)
576            .collect();
577        CoreAuditSnapshot { events: filtered }
578    }
579
580    pub async fn get_flow(&self, id: String) -> Option<Flow> {
581        let (tx, rx) = oneshot::channel();
582        if let Err(e) = self
583            .flow_store
584            .send(FlowStoreMessage::GetFlow {
585                id: id.clone(),
586                respond_to: tx,
587            })
588            .await
589        {
590            error!("Failed to send GetFlow request: {}", e);
591            if let Some(store) = &self.store {
592                return store
593                    .load_flow(&id)
594                    .await
595                    .ok()
596                    .flatten()
597                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
598            }
599            return None;
600        }
601        let flow = rx
602            .await
603            .map_err(|e| {
604                error!("Failed to receive Flow response: {}", e);
605                e
606            })
607            .unwrap_or(None);
608        if let Some(flow) = flow {
609            return Some(redact_flow(flow, &self.current_redaction_policy()));
610        }
611        if let Some(store) = &self.store {
612            return store
613                .load_flow(&id)
614                .await
615                .ok()
616                .flatten()
617                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
618                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
619        }
620        None
621    }
622
623    pub async fn get_rules(&self) -> Vec<Rule> {
624        let (tx, rx) = oneshot::channel();
625        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
626            error!("Failed to send GetRules request: {}", e);
627            return Vec::new();
628        }
629        rx.await
630            .map_err(|e| {
631                error!("Failed to receive Rules response: {}", e);
632                e
633            })
634            .unwrap_or_default()
635    }
636
637    pub async fn set_rules(&self, rules: Vec<Rule>) {
638        let _ = self
639            .set_rules_from(
640                AuditActor::Runtime,
641                "rules.replace",
642                "rule_set".to_string(),
643                json!({}),
644                rules,
645            )
646            .await;
647    }
648
649    pub async fn upsert_rule_from(
650        &self,
651        actor: AuditActor,
652        operation: &str,
653        target: String,
654        details: serde_json::Value,
655        rule: Rule,
656    ) -> Result<(), String> {
657        let rule_id = rule.id.clone();
658        let mut rules = self.get_rules().await;
659        rules.retain(|existing| existing.id != rule_id);
660        rules.push(rule);
661        self.set_rules_from(actor, operation, target, details, rules)
662            .await
663    }
664
665    pub async fn delete_rule_from(
666        &self,
667        actor: AuditActor,
668        operation: &str,
669        target: String,
670        details: serde_json::Value,
671        rule_id: &str,
672    ) -> Result<bool, String> {
673        let mut rules = self.get_rules().await;
674        let before = rules.len();
675        rules.retain(|rule| rule.id != rule_id);
676        if rules.len() == before {
677            return Ok(false);
678        }
679        self.set_rules_from(actor, operation, target, details, rules)
680            .await
681            .map(|_| true)
682    }
683
684    pub async fn create_mock_response_rule_from(
685        &self,
686        actor: AuditActor,
687        target: String,
688        details: serde_json::Value,
689        config: MockResponseRuleConfig,
690    ) -> Result<String, String> {
691        let rule_id = config.rule_id.clone();
692        let rule = build_mock_response_rule(config);
693        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
694            .await
695            .map(|_| rule_id)
696    }
697
698    pub async fn create_intercept_rule_from(
699        &self,
700        actor: AuditActor,
701        target: String,
702        details: serde_json::Value,
703        config: InterceptRuleConfig,
704    ) -> Result<String, String> {
705        let rule_id = config.rule_id.clone();
706        let mut rules = self.get_rules().await;
707        rules.extend(build_intercept_rules(config));
708        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
709            .await
710            .map(|_| rule_id)
711    }
712
713    pub async fn upsert_legacy_intercept_rule_from(
714        &self,
715        actor: AuditActor,
716        target: String,
717        details: serde_json::Value,
718        rule: InterceptRule,
719    ) -> Result<(), String> {
720        let rule_id = rule.id.clone();
721        let mut rules = self.get_rules().await;
722        rules.retain(|existing| {
723            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
724        });
725        rules.extend(rule.to_rules());
726        self.set_rules_from(
727            actor,
728            "rule.intercept_legacy_upsert",
729            target,
730            details,
731            rules,
732        )
733        .await
734    }
735
736    pub async fn set_rules_from(
737        &self,
738        actor: AuditActor,
739        operation: &str,
740        target: String,
741        details: serde_json::Value,
742        rules: Vec<Rule>,
743    ) -> Result<(), String> {
744        let rule_count = rules.len();
745        if let Err(e) = self
746            .rule_store
747            .send(RuleStoreMessage::SetRules(rules))
748            .await
749        {
750            error!("Failed to send SetRules request: {}", e);
751            self.record_audit_event(AuditEvent::new(
752                actor,
753                AuditEventKind::RuleChanged,
754                target,
755                AuditOutcome::Failed,
756                json!({
757                    "operation": operation,
758                    "rule_count": rule_count,
759                    "details": details,
760                    "error": e.to_string()
761                }),
762            ));
763            return Err(e.to_string());
764        }
765
766        self.record_audit_event(AuditEvent::new(
767            actor,
768            AuditEventKind::RuleChanged,
769            target,
770            AuditOutcome::Success,
771            json!({
772                "operation": operation,
773                "rule_count": rule_count,
774                "details": details
775            }),
776        ));
777        Ok(())
778    }
779
780    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
781        let mut new_rules = Vec::new();
782        for rule in rules {
783            new_rules.extend(rule.to_rules());
784        }
785        self.set_rules(new_rules).await;
786    }
787
788    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
789        let (tx, rx) = oneshot::channel();
790        if let Err(e) = self
791            .rule_store
792            .send(RuleStoreMessage::GetRuleEngine(tx))
793            .await
794        {
795            error!("Failed to send GetRuleEngine request: {}", e);
796            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
797        }
798        rx.await
799            .map_err(|e| {
800                error!("Failed to receive RuleEngine response: {}", e);
801                e
802            })
803            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
804    }
805
806    pub fn update_policy(&self, policy: ProxyPolicy) {
807        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
808    }
809
810    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
811        let mut policy = self.policy_snapshot();
812        policy.apply_patch(patch);
813        self.update_policy_from(actor, target, policy);
814    }
815
816    pub fn policy_snapshot(&self) -> ProxyPolicy {
817        self.policy_tx.borrow().clone()
818    }
819
820    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
821        let details = json!({
822            "strict_http_semantics": policy.strict_http_semantics,
823            "request_timeout_ms": policy.request_timeout_ms,
824            "max_body_size": policy.max_body_size,
825            "transparent_enabled": policy.transparent_enabled,
826            "redaction_enabled": policy.redaction.enabled,
827            "redact_bodies": policy.redaction.redact_bodies
828        });
829
830        self.policy_tx.send_replace(policy);
831        self.record_audit_event(AuditEvent::new(
832            actor,
833            AuditEventKind::PolicyUpdated,
834            target,
835            AuditOutcome::Success,
836            details,
837        ));
838    }
839
840    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
841        if let Err(e) = self
842            .intercept_broker
843            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
844            .await
845        {
846            error!("Failed to send RegisterIntercept request: {}", e);
847        }
848    }
849
850    pub async fn resolve_intercept(
851        &self,
852        key: String,
853        result: InterceptionResult,
854    ) -> Result<(), String> {
855        let (tx, rx) = oneshot::channel();
856        if let Err(e) = self
857            .intercept_broker
858            .send(InterceptBrokerMessage::ResolveIntercept {
859                key,
860                result,
861                respond_to: tx,
862            })
863            .await
864        {
865            error!("Failed to send ResolveIntercept request: {}", e);
866            return Err(e.to_string());
867        }
868        rx.await.map_err(|_| "Actor dropped".to_string())?
869    }
870
871    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
872        let (tx, rx) = oneshot::channel();
873        if let Err(e) = self
874            .intercept_broker
875            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
876                key,
877                respond_to: tx,
878            })
879            .await
880        {
881            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
882            return None;
883        }
884        rx.await
885            .map_err(|e| {
886                error!("Failed to receive WebSocketMessage response: {}", e);
887                e
888            })
889            .unwrap_or(None)
890    }
891
892    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
893        if let Err(e) = self
894            .intercept_broker
895            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
896            .await
897        {
898            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
899        }
900    }
901
902    pub async fn is_intercept_pending(&self, key: String) -> bool {
903        let (tx, rx) = oneshot::channel();
904        if let Err(e) = self
905            .intercept_broker
906            .send(InterceptBrokerMessage::GetPendingIntercept {
907                key,
908                respond_to: tx,
909            })
910            .await
911        {
912            error!("Failed to send GetPendingIntercept request: {}", e);
913            return false;
914        }
915        rx.await
916            .map_err(|e| {
917                error!("Failed to receive PendingIntercept response: {}", e);
918                e
919            })
920            .unwrap_or(false)
921    }
922
923    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
924        let (tx, rx) = oneshot::channel();
925        if let Err(e) = self
926            .intercept_broker
927            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
928                flow_id,
929                respond_to: tx,
930            })
931            .await
932        {
933            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
934            return false;
935        }
936        rx.await
937            .map_err(|e| {
938                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
939                e
940            })
941            .unwrap_or(false)
942    }
943
944    pub fn upsert_flow(&self, flow: Box<Flow>) {
945        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
946            // Drop flow if channel is full
947            error!("FlowStore dropped flow: {}", e);
948            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
949        }
950    }
951
952    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
953        if let Err(e) = self
954            .flow_store
955            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
956        {
957            error!("FlowStore dropped WS message: {}", e);
958            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
959        }
960    }
961
962    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
963        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
964            flow_id,
965            body,
966            direction,
967        }) {
968            error!("FlowStore dropped HTTP body: {}", e);
969            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
970        }
971    }
972
973    /// P1: Tag a flow as budget-exceeded (body too large for full rule inspection)
974    pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
975        if let Err(e) = self
976            .flow_store
977            .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
978        {
979            error!("FlowStore dropped budget-exceeded tag: {}", e);
980            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
981        }
982    }
983
984    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
985    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
986        self.flow_broadcast_tx.subscribe()
987    }
988
989    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
990        self.audit_broadcast_tx.subscribe()
991    }
992
993    fn current_redaction_policy(&self) -> RedactionPolicy {
994        self.policy_tx.borrow().redaction.clone()
995    }
996
997    pub fn record_flow_events_lagged(&self, skipped: u64) {
998        self.flow_events_lagged_total
999            .fetch_add(skipped as usize, Ordering::Relaxed);
1000    }
1001
1002    pub fn record_audit_events_lagged(&self, skipped: u64) {
1003        self.audit_events_lagged_total
1004            .fetch_add(skipped as usize, Ordering::Relaxed);
1005    }
1006
1007    pub fn lifecycle(&self) -> RuntimeLifecycle {
1008        self.lifecycle_tx.borrow().clone()
1009    }
1010
1011    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
1012        self.lifecycle_tx.subscribe()
1013    }
1014
1015    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
1016        let current = self.lifecycle();
1017        if current.is_active() {
1018            return Err(format!(
1019                "Proxy is already {} on port {}",
1020                current.phase.as_str(),
1021                current.port.unwrap_or(port)
1022            ));
1023        }
1024
1025        let mut guard = self
1026            .shutdown_tx
1027            .lock()
1028            .map_err(|_| "shutdown state poisoned".to_string())?;
1029        *guard = Some(shutdown_tx);
1030        drop(guard);
1031
1032        self.update_lifecycle(RuntimeLifecycle {
1033            phase: RuntimeLifecyclePhase::Starting,
1034            port: Some(port),
1035            started_at_ms: None,
1036            last_error: None,
1037        });
1038        Ok(())
1039    }
1040
1041    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
1042        let mut guard = self
1043            .shutdown_tx
1044            .lock()
1045            .map_err(|_| "shutdown state poisoned".to_string())?;
1046        let Some(tx) = guard.take() else {
1047            return Ok(ProxyStopResult::NotRunning);
1048        };
1049        drop(guard);
1050
1051        let current = self.lifecycle();
1052        self.update_lifecycle(RuntimeLifecycle {
1053            phase: RuntimeLifecyclePhase::Stopping,
1054            port: current.port,
1055            started_at_ms: current.started_at_ms,
1056            last_error: current.last_error,
1057        });
1058        let _ = tx.send(());
1059        Ok(ProxyStopResult::Stopping)
1060    }
1061
1062    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1063        self.audit_history
1064            .lock()
1065            .map(|events| events.iter().cloned().collect())
1066            .unwrap_or_default()
1067    }
1068
1069    /// 搜索内存中的 Flow 列表,返回轻量摘要。
1070    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1071        let redaction = self.current_redaction_policy();
1072        if let Some(store) = &self.store {
1073            return store
1074                .query_flow_summaries(&query)
1075                .await
1076                .unwrap_or_default()
1077                .into_iter()
1078                .map(|summary| redact_flow_summary(summary, &redaction))
1079                .collect();
1080        }
1081        let (tx, rx) = oneshot::channel();
1082        if let Err(e) = self
1083            .flow_store
1084            .send(FlowStoreMessage::SearchFlows {
1085                query,
1086                respond_to: tx,
1087            })
1088            .await
1089        {
1090            error!("Failed to send SearchFlows request: {}", e);
1091            return Vec::new();
1092        }
1093        rx.await
1094            .unwrap_or_default()
1095            .into_iter()
1096            .map(|summary| redact_flow_summary(summary, &redaction))
1097            .collect()
1098    }
1099
1100    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1101        let redaction = self.current_redaction_policy();
1102        redact_flow_update(update, &redaction)
1103    }
1104
1105    /// 解除截获并可选地应用修改。
1106    ///
1107    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1108    /// - 若 `mods` 为 None,直接 Continue;
1109    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1110    ///
1111    /// `key` 格式约定:
1112    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1113    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1114    pub async fn resolve_intercept_with_modifications(
1115        &self,
1116        key: String,
1117        action: &str,
1118        mods: Option<relay_core_api::modification::FlowModification>,
1119    ) -> Result<(), String> {
1120        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1121            .await
1122    }
1123
1124    pub async fn resolve_intercept_with_modifications_from(
1125        &self,
1126        actor: AuditActor,
1127        key: String,
1128        action: &str,
1129        mods: Option<relay_core_api::modification::FlowModification>,
1130    ) -> Result<(), String> {
1131        let modified_fields = modification_field_names(mods.as_ref());
1132        let result = match action {
1133            "drop" => InterceptionResult::Drop,
1134            _ => match mods {
1135                None => InterceptionResult::Continue,
1136                Some(m) => {
1137                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1138                    match parts.as_slice() {
1139                        [flow_id, phase] => {
1140                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1141                                modification::apply_flow_modification(&flow, phase, m)
1142                            } else {
1143                                InterceptionResult::Continue
1144                            }
1145                        }
1146                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1147                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1148                        [_, "ws_msg", _] => {
1149                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1150                                modification::apply_ws_modification(&msg, m)
1151                            } else {
1152                                InterceptionResult::Continue
1153                            }
1154                        }
1155                        _ => InterceptionResult::Continue,
1156                    }
1157                }
1158            },
1159        };
1160        let outcome = self.resolve_intercept(key.clone(), result).await;
1161        let audit_outcome = if outcome.is_ok() {
1162            AuditOutcome::Success
1163        } else {
1164            AuditOutcome::Failed
1165        };
1166        let error_message = outcome.as_ref().err().cloned();
1167
1168        self.record_audit_event(AuditEvent::new(
1169            actor,
1170            AuditEventKind::InterceptResolved,
1171            key,
1172            audit_outcome,
1173            json!({
1174                "action": action,
1175                "has_modifications": !modified_fields.is_empty(),
1176                "modified_fields": modified_fields,
1177                "error": error_message
1178            }),
1179        ));
1180        outcome
1181    }
1182
1183    #[cfg(feature = "script")]
1184    pub async fn load_script_from(
1185        &self,
1186        actor: AuditActor,
1187        target: String,
1188        script: &str,
1189    ) -> Result<(), String> {
1190        let result = self
1191            .script_interceptor
1192            .load_script(script)
1193            .await
1194            .map_err(|e| e.to_string());
1195
1196        let outcome = if result.is_ok() {
1197            AuditOutcome::Success
1198        } else {
1199            AuditOutcome::Failed
1200        };
1201        let error_message = result.as_ref().err().cloned();
1202
1203        self.record_audit_event(AuditEvent::new(
1204            actor,
1205            AuditEventKind::ScriptReloaded,
1206            target,
1207            outcome,
1208            json!({
1209                "script_bytes": script.len(),
1210                "error": error_message
1211            }),
1212        ));
1213
1214        result
1215    }
1216
1217    /// S5: Set the env var whitelist for relay.env() in user scripts.
1218    #[cfg(feature = "script")]
1219    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1220        self.script_interceptor.set_env_allow(env_allow).await;
1221    }
1222
1223    fn record_audit_event(&self, event: AuditEvent) {
1224        const AUDIT_HISTORY_LIMIT: usize = 200;
1225        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1226        if event.outcome == AuditOutcome::Failed {
1227            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1228        }
1229
1230        let details = log_format::audit_details_log(&event.kind, &event.details);
1231        tracing::info!(
1232            target: "relay_core_audit",
1233            event_id = %event.id,
1234            actor = %event.actor.as_str(),
1235            kind = %event.kind.as_str(),
1236            target = %event.target,
1237            outcome = %event.outcome.as_str(),
1238            details = %details
1239        );
1240
1241        if let Ok(mut history) = self.audit_history.lock() {
1242            if history.len() >= AUDIT_HISTORY_LIMIT {
1243                history.pop_front();
1244            }
1245            history.push_back(event.clone());
1246        }
1247
1248        if let Some(store) = self.store.clone() {
1249            let event_json = serde_json::to_value(&event).unwrap_or_default();
1250            let event_id = event.id.clone();
1251            let timestamp_ms = event.timestamp_ms;
1252            let actor = event.actor.as_str().to_string();
1253            let kind = event.kind.as_str().to_string();
1254            let target = event.target.clone();
1255            let outcome = event.outcome.as_str().to_string();
1256            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1257                handle.spawn(async move {
1258                    if let Err(e) = store
1259                        .save_audit_event(AuditEventRecord {
1260                            id: &event_id,
1261                            timestamp_ms,
1262                            actor: &actor,
1263                            kind: &kind,
1264                            target: &target,
1265                            outcome: &outcome,
1266                            content: &event_json,
1267                        })
1268                        .await
1269                    {
1270                        tracing::error!("Failed to persist audit event: {}", e);
1271                    }
1272                });
1273            }
1274        }
1275
1276        let _ = self.audit_broadcast_tx.send(event);
1277    }
1278
1279    fn transition_to_running(&self, port: u16) {
1280        self.update_lifecycle(RuntimeLifecycle {
1281            phase: RuntimeLifecyclePhase::Running,
1282            port: Some(port),
1283            started_at_ms: Some(now_unix_ms()),
1284            last_error: None,
1285        });
1286    }
1287
1288    fn transition_to_stopped(&self) {
1289        if let Ok(mut guard) = self.shutdown_tx.lock() {
1290            *guard = None;
1291        }
1292
1293        self.update_lifecycle(RuntimeLifecycle {
1294            phase: RuntimeLifecyclePhase::Stopped,
1295            port: None,
1296            started_at_ms: None,
1297            last_error: None,
1298        });
1299    }
1300
1301    fn transition_to_failed(&self, port: u16, error: String) {
1302        if let Ok(mut guard) = self.shutdown_tx.lock() {
1303            *guard = None;
1304        }
1305
1306        self.update_lifecycle(RuntimeLifecycle {
1307            phase: RuntimeLifecyclePhase::Failed,
1308            port: Some(port),
1309            started_at_ms: None,
1310            last_error: Some(error),
1311        });
1312    }
1313
1314    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1315        let err = lifecycle
1316            .last_error
1317            .as_deref()
1318            .filter(|s| !s.is_empty())
1319            .unwrap_or("-");
1320        tracing::info!(
1321            target: "relay_core_lifecycle",
1322            phase = %lifecycle.phase.as_str(),
1323            port = %log_format::opt_u16(lifecycle.port),
1324            started_at_ms = %log_format::opt_u64(lifecycle.started_at_ms),
1325            last_error = %err,
1326        );
1327        self.lifecycle_tx.send_replace(lifecycle);
1328    }
1329
1330    pub fn spawn_proxy(
1331        self: &Arc<Self>,
1332        config: ProxyConfig,
1333        sink: mpsc::Sender<FlowUpdate>,
1334        extra_interceptor: Option<Arc<dyn Interceptor>>,
1335    ) -> Result<ProxySpawnResult, String> {
1336        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1337        match self.prepare_start(config.port, shutdown_tx) {
1338            Ok(()) => {}
1339            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1340            Err(error) => return Err(error),
1341        }
1342        let state = self.clone();
1343        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1344            if let Err(error) = state
1345                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1346                .await
1347            {
1348                error!("Proxy failed: {}", error);
1349            }
1350        })))
1351    }
1352
1353    pub async fn start_proxy(
1354        self: &Arc<Self>,
1355        config: ProxyConfig,
1356        sink: mpsc::Sender<FlowUpdate>,
1357        extra_interceptor: Option<Arc<dyn Interceptor>>,
1358    ) -> Result<(), String> {
1359        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1360        self.prepare_start(config.port, shutdown_tx)?;
1361        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1362            .await
1363    }
1364
1365    async fn run_proxy(
1366        self: &Arc<Self>,
1367        config: ProxyConfig,
1368        sink: mpsc::Sender<FlowUpdate>,
1369        extra_interceptor: Option<Arc<dyn Interceptor>>,
1370        shutdown_rx: oneshot::Receiver<()>,
1371    ) -> Result<(), String> {
1372        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1373        let state = self.clone();
1374
1375        if let Some(parent) = config.ca_cert_path.parent()
1376            && !parent.exists()
1377        {
1378            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1379        }
1380
1381        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1382            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1383        let ca = Arc::new(ca);
1384
1385        #[cfg(feature = "script")]
1386        let script_interceptor = self.script_interceptor.clone();
1387
1388        #[cfg(feature = "script")]
1389        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1390
1391        #[cfg(not(feature = "script"))]
1392        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1393
1394        interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1395            self.clone(),
1396        )));
1397
1398        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1399            self.clone(),
1400        )));
1401
1402        if let Some(interceptor) = extra_interceptor {
1403            interceptors.push(interceptor);
1404        }
1405
1406        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1407        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1408
1409        tokio::spawn(async move {
1410            while let Some(update) = proxy_rx.recv().await {
1411                match update.clone() {
1412                    FlowUpdate::Full(flow) => {
1413                        state.upsert_flow(flow);
1414                    }
1415                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1416                        state.append_ws_message(flow_id, message);
1417                    }
1418                    FlowUpdate::HttpBody {
1419                        flow_id,
1420                        direction,
1421                        body,
1422                    } => {
1423                        state.update_http_body(flow_id, body, direction);
1424                    }
1425                    FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1426                        // P1: Tag the flow as budget-exceeded and update resilience trace
1427                        state.tag_flow_budget_exceeded(flow_id, direction);
1428                    }
1429                }
1430
1431                let _ = state.flow_broadcast_tx.send(update.clone());
1432
1433                if sink.try_send(update).is_err() {
1434                    relay_core_lib::metrics::inc_flows_dropped();
1435                }
1436            }
1437        });
1438
1439        if let Some(udp_port) = config.udp_tproxy_port {
1440            let udp_proxy_tx = proxy_tx.clone();
1441            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1442            tokio::spawn(async move {
1443                match tokio::net::UdpSocket::bind(udp_addr).await {
1444                    Ok(socket) => {
1445                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1446                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1447                            error!("UDP TPROXY failed: {}", e);
1448                        }
1449                    }
1450                    Err(e) => {
1451                        error!("Failed to bind UDP TPROXY socket: {}", e);
1452                    }
1453                }
1454            });
1455        }
1456
1457        let listener = match TcpListener::bind(addr).await {
1458            Ok(listener) => listener,
1459            Err(e) => {
1460                if let Ok(mut guard) = self.shutdown_tx.lock() {
1461                    *guard = None;
1462                }
1463                let message = format!("Failed to bind to address {}: {}", addr, e);
1464                self.transition_to_failed(config.port, message.clone());
1465                return Err(message);
1466            }
1467        };
1468
1469        self.transition_to_running(config.port);
1470        let shutdown_rx = Some(shutdown_rx);
1471
1472        if config.transparent {
1473            let policy = ProxyPolicy {
1474                transparent_enabled: true,
1475                ..Default::default()
1476            };
1477            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1478            let policy_rx = self.policy_tx.subscribe();
1479
1480            let provider: Arc<dyn OriginalDstProvider> = {
1481                let mut addrs = BTreeSet::new();
1482                if let Ok(local) = listener.local_addr() {
1483                    addrs.insert(local);
1484                }
1485
1486                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1487                {
1488                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1489                }
1490                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1491                {
1492                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1493                        Ok(provider) => Arc::new(provider),
1494                        Err(e) => {
1495                            error!("Failed to initialize macOS PF provider: {}", e);
1496                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1497                        }
1498                    }
1499                }
1500                #[cfg(target_os = "windows")]
1501                {
1502                    let filter =
1503                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1504                            .to_string();
1505                    let port = config.port;
1506                    tokio::spawn(async move {
1507                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1508                            .await;
1509                    });
1510
1511                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1512                }
1513                #[cfg(not(any(
1514                    all(target_os = "linux", feature = "transparent-linux"),
1515                    all(target_os = "macos", feature = "transparent-macos"),
1516                    target_os = "windows"
1517                )))]
1518                {
1519                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1520                }
1521            };
1522
1523            let source = TransparentTcpCaptureSource::new(listener, provider);
1524            let result = relay_core_lib::start_proxy(
1525                source,
1526                proxy_tx,
1527                interceptor,
1528                ca,
1529                policy_rx,
1530                None,
1531                shutdown_rx,
1532                Some(self.get_rule_engine().await),
1533            )
1534            .await
1535            .map_err(|e| e.to_string());
1536            if let Err(error) = &result {
1537                self.transition_to_failed(config.port, error.clone());
1538            } else {
1539                self.transition_to_stopped();
1540            }
1541            result
1542        } else {
1543            let source = TcpCaptureSource::new(listener);
1544            let policy = ProxyPolicy::default();
1545            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1546            let policy_rx = self.policy_tx.subscribe();
1547
1548            let result = relay_core_lib::start_proxy(
1549                source,
1550                proxy_tx,
1551                interceptor,
1552                ca,
1553                policy_rx,
1554                None,
1555                shutdown_rx,
1556                Some(self.get_rule_engine().await),
1557            )
1558            .await
1559            .map_err(|e| e.to_string());
1560            if let Err(error) = &result {
1561                self.transition_to_failed(config.port, error.clone());
1562            } else {
1563                self.transition_to_stopped();
1564            }
1565            result
1566        }
1567    }
1568}
1569
1570fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1571    match update {
1572        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1573        FlowUpdate::WebSocketMessage {
1574            flow_id,
1575            mut message,
1576        } => {
1577            message.content = redact_body(message.content, redaction);
1578            FlowUpdate::WebSocketMessage { flow_id, message }
1579        }
1580        FlowUpdate::HttpBody {
1581            flow_id,
1582            direction,
1583            body,
1584        } => FlowUpdate::HttpBody {
1585            flow_id,
1586            direction,
1587            body: redact_body(body, redaction),
1588        },
1589        FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1590            FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1591        }
1592    }
1593}
1594
1595fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1596    if !redaction.enabled {
1597        return flow;
1598    }
1599    match &mut flow.layer {
1600        Layer::Http(http) => {
1601            redact_http_request(&mut http.request, redaction);
1602            if let Some(response) = &mut http.response {
1603                redact_headers(&mut response.headers, redaction);
1604                response.body = response
1605                    .body
1606                    .take()
1607                    .map(|body| redact_body(body, redaction));
1608            }
1609        }
1610        Layer::WebSocket(ws) => {
1611            redact_http_request(&mut ws.handshake_request, redaction);
1612            redact_headers(&mut ws.handshake_response.headers, redaction);
1613            ws.handshake_response.body = ws
1614                .handshake_response
1615                .body
1616                .take()
1617                .map(|body| redact_body(body, redaction));
1618            for message in &mut ws.messages {
1619                message.content = redact_body(message.content.clone(), redaction);
1620            }
1621        }
1622        _ => {}
1623    }
1624    flow
1625}
1626
1627fn parse_intercept_key(key: &str) -> (String, String) {
1628    if let Some((flow_id, rest)) = key.split_once(':') {
1629        (flow_id.to_string(), rest.to_string())
1630    } else {
1631        (key.to_string(), String::new())
1632    }
1633}
1634
1635fn flow_url_method(flow: &Flow) -> (String, String) {
1636    match &flow.layer {
1637        Layer::Http(http) => (http.request.url.to_string(), http.request.method.clone()),
1638        Layer::WebSocket(ws) => (
1639            ws.handshake_request.url.to_string(),
1640            ws.handshake_request.method.clone(),
1641        ),
1642        _ => (String::new(), String::new()),
1643    }
1644}
1645
1646fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1647    if !redaction.enabled {
1648        return summary;
1649    }
1650    summary.url = redact_url_string(&summary.url, redaction);
1651    summary
1652}
1653
1654fn redact_http_request(
1655    request: &mut relay_core_api::flow::HttpRequest,
1656    redaction: &RedactionPolicy,
1657) {
1658    redact_headers(&mut request.headers, redaction);
1659    redact_query_pairs(&mut request.query, redaction);
1660    request.url = redact_url(&request.url, redaction);
1661    request.body = request.body.take().map(|body| redact_body(body, redaction));
1662}
1663
1664fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1665    if !redaction.enabled {
1666        return;
1667    }
1668    let sensitive = redaction_set(&redaction.sensitive_header_names);
1669    for (name, value) in headers.iter_mut() {
1670        if sensitive.contains(&name.to_ascii_lowercase()) {
1671            *value = "[REDACTED]".to_string();
1672        }
1673    }
1674}
1675
1676fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1677    if !redaction.enabled {
1678        return;
1679    }
1680    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1681    for (name, value) in query.iter_mut() {
1682        if sensitive.contains(&name.to_ascii_lowercase()) {
1683            *value = "[REDACTED]".to_string();
1684        }
1685    }
1686}
1687
1688fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1689    if !redaction.enabled {
1690        return url.clone();
1691    }
1692    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1693    let pairs: Vec<(String, String)> = url
1694        .query_pairs()
1695        .map(|(k, v)| {
1696            let key = k.to_string();
1697            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1698                "[REDACTED]".to_string()
1699            } else {
1700                v.to_string()
1701            };
1702            (key, value)
1703        })
1704        .collect();
1705    let mut next = url.clone();
1706    if pairs.is_empty() {
1707        return next;
1708    }
1709    next.query_pairs_mut().clear();
1710    for (k, v) in pairs {
1711        next.query_pairs_mut().append_pair(&k, &v);
1712    }
1713    next
1714}
1715
1716fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1717    match url::Url::parse(input) {
1718        Ok(url) => redact_url(&url, redaction).to_string(),
1719        Err(_) => input.to_string(),
1720    }
1721}
1722
1723fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1724    if redaction.enabled && redaction.redact_bodies {
1725        body.content = "[REDACTED]".to_string();
1726    }
1727    body
1728}
1729
1730fn redaction_set(values: &[String]) -> HashSet<String> {
1731    values
1732        .iter()
1733        .map(|value| value.to_ascii_lowercase())
1734        .collect()
1735}
1736
1737#[derive(Clone)]
1738pub struct ProxyConfig {
1739    pub port: u16,
1740    pub ca_cert_path: std::path::PathBuf,
1741    pub ca_key_path: std::path::PathBuf,
1742    pub transparent: bool,
1743    pub udp_tproxy_port: Option<u16>,
1744}
1745
1746impl ProxyConfig {
1747    pub fn new(
1748        port: u16,
1749        ca_cert_path: std::path::PathBuf,
1750        ca_key_path: std::path::PathBuf,
1751    ) -> Self {
1752        Self {
1753            port,
1754            ca_cert_path,
1755            ca_key_path,
1756            transparent: false,
1757            udp_tproxy_port: None,
1758        }
1759    }
1760
1761    pub fn from_app_data_dir(
1762        app_data_dir: impl Into<std::path::PathBuf>,
1763        port: u16,
1764    ) -> Result<Self, String> {
1765        let app_data_dir = app_data_dir.into();
1766        if !app_data_dir.exists() {
1767            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1768        }
1769
1770        Ok(Self::new(
1771            port,
1772            app_data_dir.join("ca_cert.pem"),
1773            app_data_dir.join("ca_key.pem"),
1774        ))
1775    }
1776
1777    pub fn with_transparent(mut self, transparent: bool) -> Self {
1778        self.transparent = transparent;
1779        self
1780    }
1781
1782    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1783        self.udp_tproxy_port = udp_tproxy_port;
1784        self
1785    }
1786}
1787
1788fn now_unix_ms() -> u64 {
1789    SystemTime::now()
1790        .duration_since(UNIX_EPOCH)
1791        .unwrap_or_default()
1792        .as_millis() as u64
1793}
1794
1795fn modification_field_names(
1796    mods: Option<&relay_core_api::modification::FlowModification>,
1797) -> Vec<&'static str> {
1798    let Some(mods) = mods else {
1799        return Vec::new();
1800    };
1801
1802    let mut fields = Vec::new();
1803    if mods.method.is_some() {
1804        fields.push("method");
1805    }
1806    if mods.url.is_some() {
1807        fields.push("url");
1808    }
1809    if mods.request_headers.is_some() {
1810        fields.push("request_headers");
1811    }
1812    if mods.request_body.is_some() {
1813        fields.push("request_body");
1814    }
1815    if mods.status_code.is_some() {
1816        fields.push("status_code");
1817    }
1818    if mods.response_headers.is_some() {
1819        fields.push("response_headers");
1820    }
1821    if mods.response_body.is_some() {
1822        fields.push("response_body");
1823    }
1824    if mods.message_content.is_some() {
1825        fields.push("message_content");
1826    }
1827    fields
1828}
1829
1830#[cfg(test)]
1831mod tests {
1832    use super::*;
1833    use chrono::Utc;
1834    use relay_core_api::flow::{
1835        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1836        ResponseTiming, TransportProtocol,
1837    };
1838    use std::sync::atomic::{AtomicU64, Ordering};
1839    use tokio::time::{Duration, sleep};
1840    use url::Url;
1841    use uuid::Uuid;
1842
1843    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1844
1845    fn sqlite_url() -> String {
1846        let nanos = SystemTime::now()
1847            .duration_since(UNIX_EPOCH)
1848            .expect("clock drift")
1849            .as_nanos();
1850        let pid = std::process::id();
1851        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1852        let db_dir = std::env::current_dir()
1853            .expect("cwd")
1854            .join("target")
1855            .join("test-dbs");
1856        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1857        let db_path = db_dir.join(format!(
1858            "relay-core-runtime-test-{}-{}-{}.db",
1859            pid, nanos, seq
1860        ));
1861        format!("sqlite://{}?mode=rwc", db_path.display())
1862    }
1863
1864    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1865        let start_time =
1866            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1867        let request_url =
1868            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1869        Flow {
1870            id: Uuid::new_v4(),
1871            start_time,
1872            end_time: Some(start_time),
1873            network: NetworkInfo {
1874                client_ip: "127.0.0.1".to_string(),
1875                client_port: 12000,
1876                server_ip: "127.0.0.1".to_string(),
1877                server_port: 8080,
1878                protocol: TransportProtocol::TCP,
1879                tls: false,
1880                tls_version: None,
1881                sni: None,
1882            },
1883            layer: Layer::Http(HttpLayer {
1884                request: HttpRequest {
1885                    method: method.to_string(),
1886                    url: request_url,
1887                    version: "HTTP/1.1".to_string(),
1888                    headers: vec![],
1889                    cookies: vec![],
1890                    query: vec![],
1891                    body: None,
1892                },
1893                response: Some(HttpResponse {
1894                    status,
1895                    status_text: "OK".to_string(),
1896                    version: "HTTP/1.1".to_string(),
1897                    headers: vec![],
1898                    cookies: vec![],
1899                    body: None,
1900                    timing: ResponseTiming {
1901                        time_to_first_byte: None,
1902                        time_to_last_byte: None,
1903                        connect_time_ms: None,
1904                        ssl_time_ms: None,
1905                    },
1906                }),
1907                error: None,
1908            }),
1909            tags: vec![],
1910            meta: std::collections::HashMap::new(),
1911            resilience_trace: None,
1912            rule_variables: std::collections::HashMap::new(),
1913            matched_rules: vec![],
1914        }
1915    }
1916
1917    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1918        let start_time =
1919            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1920        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1921            .expect("url should parse");
1922        Flow {
1923            id: Uuid::new_v4(),
1924            start_time,
1925            end_time: Some(start_time),
1926            network: NetworkInfo {
1927                client_ip: "127.0.0.1".to_string(),
1928                client_port: 12000,
1929                server_ip: "127.0.0.1".to_string(),
1930                server_port: 8080,
1931                protocol: TransportProtocol::TCP,
1932                tls: false,
1933                tls_version: None,
1934                sni: None,
1935            },
1936            layer: Layer::Http(HttpLayer {
1937                request: HttpRequest {
1938                    method: "GET".to_string(),
1939                    url: request_url,
1940                    version: "HTTP/1.1".to_string(),
1941                    headers: vec![
1942                        (
1943                            "Authorization".to_string(),
1944                            "Bearer secret-token".to_string(),
1945                        ),
1946                        ("X-Normal".to_string(), "visible".to_string()),
1947                    ],
1948                    cookies: vec![],
1949                    query: vec![
1950                        ("token".to_string(), "abc123".to_string()),
1951                        ("ok".to_string(), "1".to_string()),
1952                    ],
1953                    body: Some(BodyData {
1954                        encoding: "utf-8".to_string(),
1955                        content: "secret request body".to_string(),
1956                        size: 19,
1957                    }),
1958                },
1959                response: Some(HttpResponse {
1960                    status: 200,
1961                    status_text: "OK".to_string(),
1962                    version: "HTTP/1.1".to_string(),
1963                    headers: vec![
1964                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1965                        ("X-Response".to_string(), "visible".to_string()),
1966                    ],
1967                    cookies: vec![],
1968                    body: Some(BodyData {
1969                        encoding: "utf-8".to_string(),
1970                        content: "secret response body".to_string(),
1971                        size: 20,
1972                    }),
1973                    timing: ResponseTiming {
1974                        time_to_first_byte: None,
1975                        time_to_last_byte: None,
1976                        connect_time_ms: None,
1977                        ssl_time_ms: None,
1978                    },
1979                }),
1980                error: None,
1981            }),
1982            tags: vec![],
1983            meta: std::collections::HashMap::new(),
1984            resilience_trace: None,
1985            rule_variables: std::collections::HashMap::new(),
1986            matched_rules: vec![],
1987        }
1988    }
1989
1990    #[tokio::test]
1991    async fn set_rules_from_records_audit_event() {
1992        let state = CoreState::new(None).await;
1993
1994        state
1995            .set_rules_from(
1996                AuditActor::Http,
1997                "rule.upsert",
1998                "rule-1".to_string(),
1999                json!({ "route": "/api/v1/rules" }),
2000                Vec::new(),
2001            )
2002            .await
2003            .expect("set rules should succeed");
2004
2005        let events = state.recent_audit_events();
2006        let event = events.last().expect("audit event should exist");
2007        assert_eq!(event.actor, AuditActor::Http);
2008        assert_eq!(event.kind, AuditEventKind::RuleChanged);
2009        assert_eq!(event.outcome, AuditOutcome::Success);
2010        assert_eq!(event.target, "rule-1");
2011        assert_eq!(event.details["operation"], "rule.upsert");
2012        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
2013    }
2014
2015    #[tokio::test]
2016    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
2017        let state = CoreState::new(None).await;
2018
2019        state
2020            .upsert_rule_from(
2021                AuditActor::Probe,
2022                "rule.upsert",
2023                "rule-1".to_string(),
2024                json!({ "tool": "set_rule" }),
2025                Rule {
2026                    id: "rule-1".to_string(),
2027                    name: "first".to_string(),
2028                    active: true,
2029                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
2030                    priority: 1,
2031                    termination: relay_core_lib::rule::RuleTermination::Continue,
2032                    filter: relay_core_lib::rule::Filter::Url(
2033                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
2034                    ),
2035                    actions: vec![],
2036                    constraints: None,
2037                },
2038            )
2039            .await
2040            .expect("initial upsert should succeed");
2041
2042        state
2043            .upsert_rule_from(
2044                AuditActor::Probe,
2045                "rule.upsert",
2046                "rule-1".to_string(),
2047                json!({ "tool": "set_rule" }),
2048                Rule {
2049                    id: "rule-1".to_string(),
2050                    name: "second".to_string(),
2051                    active: true,
2052                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
2053                    priority: 2,
2054                    termination: relay_core_lib::rule::RuleTermination::Continue,
2055                    filter: relay_core_lib::rule::Filter::Url(
2056                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
2057                    ),
2058                    actions: vec![],
2059                    constraints: None,
2060                },
2061            )
2062            .await
2063            .expect("replacement upsert should succeed");
2064
2065        let rules = state.get_rules().await;
2066        assert_eq!(rules.len(), 1);
2067        assert_eq!(rules[0].name, "second");
2068        let event = state
2069            .recent_audit_events()
2070            .last()
2071            .cloned()
2072            .expect("audit event");
2073        assert_eq!(event.details["operation"], "rule.upsert");
2074        assert_eq!(event.details["details"]["tool"], "set_rule");
2075    }
2076
2077    #[tokio::test]
2078    async fn delete_rule_from_returns_false_when_rule_missing() {
2079        let state = CoreState::new(None).await;
2080
2081        let deleted = state
2082            .delete_rule_from(
2083                AuditActor::Http,
2084                "rule.delete",
2085                "missing".to_string(),
2086                json!({ "route": "/api/v1/rules/{id}" }),
2087                "missing",
2088            )
2089            .await
2090            .expect("delete should not fail");
2091
2092        assert!(!deleted);
2093        assert!(state.recent_audit_events().is_empty());
2094    }
2095
2096    #[tokio::test]
2097    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2098        let state = CoreState::new(None).await;
2099
2100        let rule_id = state
2101            .create_mock_response_rule_from(
2102                AuditActor::Http,
2103                "api-mock-1".to_string(),
2104                json!({ "route": "/api/v1/mock", "status": 201 }),
2105                MockResponseRuleConfig {
2106                    rule_id: "api-mock-1".to_string(),
2107                    url_pattern: "example.com".to_string(),
2108                    name: "api-mock:example.com".to_string(),
2109                    status: 201,
2110                    content_type: "application/json".to_string(),
2111                    body: "{\"ok\":true}".to_string(),
2112                },
2113            )
2114            .await
2115            .expect("mock rule should be created");
2116
2117        assert_eq!(rule_id, "api-mock-1");
2118        let rules = state.get_rules().await;
2119        assert_eq!(rules.len(), 1);
2120        assert_eq!(rules[0].id, "api-mock-1");
2121        let event = state
2122            .recent_audit_events()
2123            .last()
2124            .cloned()
2125            .expect("audit event");
2126        assert_eq!(event.details["operation"], "rule.mock_create");
2127        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2128    }
2129
2130    #[tokio::test]
2131    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2132        let state = CoreState::new(None).await;
2133
2134        let rule_id = state
2135            .create_intercept_rule_from(
2136                AuditActor::Http,
2137                "intercept-1".to_string(),
2138                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2139                InterceptRuleConfig {
2140                    rule_id: "intercept-1".to_string(),
2141                    active: true,
2142                    url_pattern: "example.com".to_string(),
2143                    method: None,
2144                    phase: "request".to_string(),
2145                    name: "api-intercept:example.com".to_string(),
2146                    priority: 100,
2147                    termination: relay_core_lib::rule::RuleTermination::Stop,
2148                },
2149            )
2150            .await
2151            .expect("intercept rule should be created");
2152
2153        assert_eq!(rule_id, "intercept-1");
2154        let rules = state.get_rules().await;
2155        assert_eq!(rules.len(), 1);
2156        assert_eq!(rules[0].id, "intercept-1");
2157        assert_eq!(rules[0].name, "api-intercept:example.com");
2158        let event = state
2159            .recent_audit_events()
2160            .last()
2161            .cloned()
2162            .expect("audit event");
2163        assert_eq!(event.details["operation"], "rule.intercept_create");
2164        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2165    }
2166
2167    #[tokio::test]
2168    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2169        let state = CoreState::new(None).await;
2170
2171        state
2172            .upsert_legacy_intercept_rule_from(
2173                AuditActor::Tauri,
2174                "legacy-1".to_string(),
2175                json!({ "command": "set_intercept_rule" }),
2176                InterceptRule {
2177                    id: "legacy-1".to_string(),
2178                    active: true,
2179                    url_pattern: "example.com".to_string(),
2180                    method: None,
2181                    phase: "both".to_string(),
2182                },
2183            )
2184            .await
2185            .expect("initial family upsert should succeed");
2186        assert_eq!(state.get_rules().await.len(), 2);
2187
2188        state
2189            .upsert_legacy_intercept_rule_from(
2190                AuditActor::Tauri,
2191                "legacy-1".to_string(),
2192                json!({ "command": "set_intercept_rule" }),
2193                InterceptRule {
2194                    id: "legacy-1".to_string(),
2195                    active: true,
2196                    url_pattern: "example.org".to_string(),
2197                    method: Some("POST".to_string()),
2198                    phase: "request".to_string(),
2199                },
2200            )
2201            .await
2202            .expect("replacement family upsert should succeed");
2203
2204        let rules = state.get_rules().await;
2205        assert_eq!(rules.len(), 1);
2206        assert_eq!(rules[0].id, "legacy-1");
2207        let event = state
2208            .recent_audit_events()
2209            .last()
2210            .cloned()
2211            .expect("audit event");
2212        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2213        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2214    }
2215
2216    #[tokio::test]
2217    async fn resolve_intercept_failure_records_failed_audit_event() {
2218        let state = CoreState::new(None).await;
2219
2220        let result = state
2221            .resolve_intercept_with_modifications_from(
2222                AuditActor::Probe,
2223                "missing-flow:request".to_string(),
2224                "drop",
2225                None,
2226            )
2227            .await;
2228
2229        assert!(result.is_err());
2230        let events = state.recent_audit_events();
2231        let event = events.last().expect("audit event should exist");
2232        assert_eq!(event.actor, AuditActor::Probe);
2233        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2234        assert_eq!(event.outcome, AuditOutcome::Failed);
2235        assert_eq!(event.details["action"], "drop");
2236        assert!(
2237            event.details["error"]
2238                .as_str()
2239                .unwrap_or_default()
2240                .contains("Interception not found")
2241        );
2242    }
2243
2244    #[tokio::test]
2245    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2246        let state = CoreState::new(None).await;
2247        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2248
2249        state
2250            .prepare_start(8080, shutdown_tx)
2251            .expect("prepare start should succeed");
2252        let lifecycle = state.lifecycle();
2253        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2254        assert_eq!(lifecycle.port, Some(8080));
2255        assert!(lifecycle.started_at_ms.is_none());
2256        assert!(lifecycle.last_error.is_none());
2257
2258        assert_eq!(
2259            state.stop_proxy().expect("stop should succeed"),
2260            ProxyStopResult::Stopping
2261        );
2262        let lifecycle = state.lifecycle();
2263        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2264        assert_eq!(lifecycle.port, Some(8080));
2265        assert!(shutdown_rx.await.is_ok());
2266    }
2267
2268    #[tokio::test]
2269    async fn status_snapshot_derives_runtime_facing_fields() {
2270        let state = CoreState::new(None).await;
2271        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2272        state
2273            .prepare_start(8080, shutdown_tx)
2274            .expect("prepare start should succeed");
2275
2276        let status = state.status_snapshot();
2277        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2278        assert!(status.running);
2279        assert_eq!(status.port, Some(8080));
2280        assert!(status.uptime.is_none());
2281        assert!(status.last_error.is_none());
2282    }
2283
2284    #[tokio::test]
2285    async fn status_report_combines_status_and_metrics() {
2286        let state = CoreState::new(None).await;
2287        let report = state.status_report().await;
2288
2289        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2290        assert!(!report.status.running);
2291        assert_eq!(report.metrics.intercepts_pending, 0);
2292        assert_eq!(report.metrics.ws_pending_messages, 0);
2293        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2294        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2295        assert_eq!(report.metrics.audit_events_total, 0);
2296        assert_eq!(report.metrics.audit_events_failed, 0);
2297        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2298        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2299    }
2300
2301    #[test]
2302    fn proxy_config_new_and_transport_setters_preserve_values() {
2303        let config = ProxyConfig::new(
2304            8080,
2305            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2306            std::path::PathBuf::from("/tmp/ca_key.pem"),
2307        )
2308        .with_transparent(true)
2309        .with_udp_tproxy_port(Some(15000));
2310
2311        assert_eq!(config.port, 8080);
2312        assert_eq!(
2313            config.ca_cert_path,
2314            std::path::PathBuf::from("/tmp/ca_cert.pem")
2315        );
2316        assert_eq!(
2317            config.ca_key_path,
2318            std::path::PathBuf::from("/tmp/ca_key.pem")
2319        );
2320        assert!(config.transparent);
2321        assert_eq!(config.udp_tproxy_port, Some(15000));
2322    }
2323
2324    #[test]
2325    fn proxy_config_from_app_data_dir_creates_default_paths() {
2326        let unique = SystemTime::now()
2327            .duration_since(UNIX_EPOCH)
2328            .expect("clock drift")
2329            .as_nanos();
2330        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2331
2332        let config =
2333            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2334
2335        assert!(dir.exists());
2336        assert_eq!(config.port, 8899);
2337        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2338        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2339        assert!(!config.transparent);
2340        assert!(config.udp_tproxy_port.is_none());
2341    }
2342
2343    #[tokio::test]
2344    async fn intercept_snapshot_maps_pending_counts() {
2345        let state = CoreState::new(None).await;
2346        let snapshot = state.intercept_snapshot().await;
2347
2348        assert_eq!(snapshot.pending_count, 0);
2349        assert_eq!(snapshot.ws_pending_count, 0);
2350    }
2351
2352    #[tokio::test]
2353    async fn audit_snapshot_returns_latest_events_in_order() {
2354        let state = CoreState::new(None).await;
2355        state.record_audit_event(AuditEvent::new(
2356            AuditActor::Runtime,
2357            AuditEventKind::RuleChanged,
2358            "first",
2359            AuditOutcome::Success,
2360            json!({ "index": 1 }),
2361        ));
2362        state.record_audit_event(AuditEvent::new(
2363            AuditActor::Http,
2364            AuditEventKind::PolicyUpdated,
2365            "second",
2366            AuditOutcome::Success,
2367            json!({ "index": 2 }),
2368        ));
2369
2370        let snapshot = state.audit_snapshot(1);
2371
2372        assert_eq!(snapshot.events.len(), 1);
2373        assert_eq!(snapshot.events[0].target, "second");
2374        assert_eq!(snapshot.events[0].details["index"], 2);
2375    }
2376
2377    #[tokio::test]
2378    async fn query_audit_snapshot_filters_in_memory_events() {
2379        let state = CoreState::new(None).await;
2380        state.record_audit_event(AuditEvent::new(
2381            AuditActor::Http,
2382            AuditEventKind::RuleChanged,
2383            "rule-1",
2384            AuditOutcome::Success,
2385            json!({ "idx": 1 }),
2386        ));
2387        state.record_audit_event(AuditEvent::new(
2388            AuditActor::Probe,
2389            AuditEventKind::PolicyUpdated,
2390            "policy",
2391            AuditOutcome::Failed,
2392            json!({ "idx": 2 }),
2393        ));
2394
2395        let snapshot = state
2396            .query_audit_snapshot(CoreAuditQuery {
2397                actor: Some(AuditActor::Probe),
2398                kind: Some(AuditEventKind::PolicyUpdated),
2399                outcome: Some(AuditOutcome::Failed),
2400                limit: 10,
2401                ..Default::default()
2402            })
2403            .await;
2404
2405        assert_eq!(snapshot.events.len(), 1);
2406        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2407        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2408        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2409    }
2410
2411    #[tokio::test]
2412    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2413        let state = CoreState::new(Some(sqlite_url())).await;
2414        state.update_policy_from(
2415            AuditActor::Http,
2416            "policy".to_string(),
2417            ProxyPolicy {
2418                transparent_enabled: true,
2419                ..Default::default()
2420            },
2421        );
2422
2423        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2424        for _ in 0..10 {
2425            snapshot = state
2426                .query_audit_snapshot(CoreAuditQuery {
2427                    actor: Some(AuditActor::Http),
2428                    kind: Some(AuditEventKind::PolicyUpdated),
2429                    limit: 10,
2430                    ..Default::default()
2431                })
2432                .await;
2433            if !snapshot.events.is_empty() {
2434                break;
2435            }
2436            sleep(Duration::from_millis(20)).await;
2437        }
2438
2439        assert!(!snapshot.events.is_empty());
2440        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2441        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2442    }
2443
2444    #[tokio::test]
2445    async fn prepare_start_rejects_second_active_start() {
2446        let state = CoreState::new(None).await;
2447        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2448        state
2449            .prepare_start(8080, shutdown_tx)
2450            .expect("first start should succeed");
2451
2452        let (second_tx, _second_rx) = oneshot::channel();
2453        let error = state
2454            .prepare_start(8081, second_tx)
2455            .expect_err("second active start should be rejected");
2456        assert!(error.contains("already"));
2457    }
2458
2459    #[test]
2460    fn update_policy_records_audit_event() {
2461        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2462        let state = runtime.block_on(CoreState::new(None));
2463
2464        state.update_policy_from(
2465            AuditActor::Runtime,
2466            "policy".to_string(),
2467            ProxyPolicy {
2468                transparent_enabled: true,
2469                ..Default::default()
2470            },
2471        );
2472
2473        let events = state.recent_audit_events();
2474        let event = events.last().expect("audit event should exist");
2475        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2476        assert_eq!(event.outcome, AuditOutcome::Success);
2477        assert_eq!(event.details["transparent_enabled"], true);
2478    }
2479
2480    #[test]
2481    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2482        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2483        let state = runtime.block_on(CoreState::new(None));
2484        let original_timeout = state.policy_snapshot().request_timeout_ms;
2485
2486        state.patch_policy_from(
2487            AuditActor::Runtime,
2488            "policy.patch".to_string(),
2489            relay_core_api::policy::ProxyPolicyPatch {
2490                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2491                    enabled: Some(true),
2492                    redact_bodies: Some(true),
2493                    ..Default::default()
2494                }),
2495                upstream: None,
2496            },
2497        );
2498
2499        let policy = state.policy_snapshot();
2500        assert_eq!(policy.request_timeout_ms, original_timeout);
2501        assert!(policy.redaction.enabled);
2502        assert!(policy.redaction.redact_bodies);
2503
2504        let events = state.recent_audit_events();
2505        let event = events.last().expect("audit event should exist");
2506        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2507        assert_eq!(event.details["redaction_enabled"], true);
2508    }
2509
2510    #[tokio::test]
2511    async fn metrics_include_audit_and_lagged_event_counters() {
2512        let state = CoreState::new(None).await;
2513
2514        state.update_policy_from(
2515            AuditActor::Runtime,
2516            "policy".to_string(),
2517            ProxyPolicy::default(),
2518        );
2519        let _ = state
2520            .resolve_intercept_with_modifications_from(
2521                AuditActor::Probe,
2522                "missing-flow:request".to_string(),
2523                "drop",
2524                None,
2525            )
2526            .await;
2527
2528        state.record_flow_events_lagged(3);
2529        state.record_audit_events_lagged(5);
2530
2531        let metrics = state.get_metrics().await;
2532        assert_eq!(metrics.audit_events_total, 2);
2533        assert_eq!(metrics.audit_events_failed, 1);
2534        assert_eq!(metrics.flow_events_lagged_total, 3);
2535        assert_eq!(metrics.audit_events_lagged_total, 5);
2536    }
2537
2538    #[tokio::test]
2539    async fn prometheus_metrics_text_contains_observability_fields() {
2540        let state = CoreState::new(None).await;
2541        state.record_flow_events_lagged(2);
2542        state.record_audit_events_lagged(4);
2543
2544        let text = state.get_metrics_prometheus_text().await;
2545        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2546        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2547        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2548        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2549    }
2550
2551    #[tokio::test]
2552    async fn search_flows_uses_store_with_offset_pagination() {
2553        let state = CoreState::new(Some(sqlite_url())).await;
2554        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2555        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2556        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2557
2558        state.upsert_flow(Box::new(flow_a));
2559        state.upsert_flow(Box::new(flow_b));
2560        state.upsert_flow(Box::new(flow_c));
2561
2562        let mut baseline = Vec::new();
2563        for _ in 0..20 {
2564            baseline = state
2565                .search_flows(FlowQuery {
2566                    host: Some("api.example.com".to_string()),
2567                    path_contains: None,
2568                    method: None,
2569                    status_min: None,
2570                    status_max: None,
2571                    has_error: None,
2572                    is_websocket: None,
2573                    limit: Some(3),
2574                    offset: Some(0),
2575                })
2576                .await;
2577            if baseline.len() == 3 {
2578                break;
2579            }
2580            sleep(Duration::from_millis(20)).await;
2581        }
2582
2583        assert_eq!(baseline.len(), 3);
2584        let page = state
2585            .search_flows(FlowQuery {
2586                host: Some("api.example.com".to_string()),
2587                path_contains: None,
2588                method: None,
2589                status_min: None,
2590                status_max: None,
2591                has_error: None,
2592                is_websocket: None,
2593                limit: Some(1),
2594                offset: Some(1),
2595            })
2596            .await;
2597        assert_eq!(page.len(), 1);
2598        assert_eq!(page[0].id, baseline[1].id);
2599    }
2600
2601    #[tokio::test]
2602    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2603        let state = CoreState::new(Some(sqlite_url())).await;
2604        let first_flow = sample_http_flow(
2605            "persist.example.com",
2606            "/first",
2607            "GET",
2608            200,
2609            1_700_000_010_000,
2610        );
2611        let first_id = first_flow.id.to_string();
2612        state.upsert_flow(Box::new(first_flow));
2613        for i in 0..240 {
2614            state.upsert_flow(Box::new(sample_http_flow(
2615                "persist.example.com",
2616                &format!("/{}", i),
2617                "GET",
2618                200,
2619                1_700_000_020_000 + i,
2620            )));
2621        }
2622
2623        sleep(Duration::from_millis(200)).await;
2624
2625        let loaded = state.get_flow(first_id).await;
2626        assert!(loaded.is_some());
2627    }
2628
2629    #[tokio::test]
2630    async fn search_flows_redacts_summary_url_when_enabled() {
2631        let state = CoreState::new(None).await;
2632        state.update_policy_from(
2633            AuditActor::Runtime,
2634            "policy.redaction".to_string(),
2635            ProxyPolicy {
2636                redaction: RedactionPolicy {
2637                    enabled: true,
2638                    sensitive_query_keys: vec!["token".to_string()],
2639                    redact_bodies: false,
2640                    ..Default::default()
2641                },
2642                ..Default::default()
2643            },
2644        );
2645        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2646
2647        let mut items = Vec::new();
2648        for _ in 0..20 {
2649            items = state
2650                .search_flows(FlowQuery {
2651                    host: Some("api.example.com".to_string()),
2652                    path_contains: Some("/private".to_string()),
2653                    ..Default::default()
2654                })
2655                .await;
2656            if !items.is_empty() {
2657                break;
2658            }
2659            sleep(Duration::from_millis(20)).await;
2660        }
2661
2662        assert!(!items.is_empty());
2663        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2664        let token = redacted
2665            .query_pairs()
2666            .find(|(k, _)| k == "token")
2667            .map(|(_, v)| v.to_string());
2668        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2669    }
2670
2671    #[tokio::test]
2672    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2673        let state = CoreState::new(Some(sqlite_url())).await;
2674        state.update_policy_from(
2675            AuditActor::Runtime,
2676            "policy.redaction".to_string(),
2677            ProxyPolicy {
2678                redaction: RedactionPolicy {
2679                    enabled: true,
2680                    sensitive_header_names: vec![
2681                        "authorization".to_string(),
2682                        "set-cookie".to_string(),
2683                    ],
2684                    sensitive_query_keys: vec!["token".to_string()],
2685                    redact_bodies: true,
2686                },
2687                ..Default::default()
2688            },
2689        );
2690
2691        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2692        let flow_id = flow.id.to_string();
2693        state.upsert_flow(Box::new(flow));
2694        sleep(Duration::from_millis(80)).await;
2695
2696        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2697        let Layer::Http(http) = loaded.layer else {
2698            panic!("expected http layer");
2699        };
2700
2701        let auth = http
2702            .request
2703            .headers
2704            .iter()
2705            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2706            .map(|(_, v)| v.as_str());
2707        assert_eq!(auth, Some("[REDACTED]"));
2708
2709        let req_query_token = http
2710            .request
2711            .query
2712            .iter()
2713            .find(|(k, _)| k == "token")
2714            .map(|(_, v)| v.as_str());
2715        assert_eq!(req_query_token, Some("[REDACTED]"));
2716
2717        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2718        assert_eq!(req_body, Some("[REDACTED]"));
2719
2720        let response = http.response.expect("response should exist");
2721        let set_cookie = response
2722            .headers
2723            .iter()
2724            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2725            .map(|(_, v)| v.as_str());
2726        assert_eq!(set_cookie, Some("[REDACTED]"));
2727        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2728        assert_eq!(res_body, Some("[REDACTED]"));
2729    }
2730
2731    #[test]
2732    fn redact_flow_update_masks_http_body_when_enabled() {
2733        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2734        let state = runtime.block_on(CoreState::new(None));
2735        state.update_policy_from(
2736            AuditActor::Runtime,
2737            "policy.redaction".to_string(),
2738            ProxyPolicy {
2739                redaction: RedactionPolicy {
2740                    enabled: true,
2741                    redact_bodies: true,
2742                    ..Default::default()
2743                },
2744                ..Default::default()
2745            },
2746        );
2747
2748        let update = FlowUpdate::HttpBody {
2749            flow_id: "f-1".to_string(),
2750            direction: relay_core_api::flow::Direction::ClientToServer,
2751            body: BodyData {
2752                encoding: "utf-8".to_string(),
2753                content: "super-secret".to_string(),
2754                size: 12,
2755            },
2756        };
2757        let redacted = state.redact_flow_update_for_output(update);
2758        match redacted {
2759            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2760            _ => panic!("expected http body update"),
2761        }
2762    }
2763
2764    #[cfg(feature = "script")]
2765    #[tokio::test]
2766    async fn load_script_from_records_audit_event() {
2767        let state = CoreState::new(None).await;
2768
2769        state
2770            .load_script_from(
2771                AuditActor::Tauri,
2772                "tauri.load_script".to_string(),
2773                "globalThis.onRequestHeaders = (_flow) => {};",
2774            )
2775            .await
2776            .expect("script should load");
2777
2778        let events = state.recent_audit_events();
2779        let event = events.last().expect("audit event should exist");
2780        assert_eq!(event.actor, AuditActor::Tauri);
2781        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2782        assert_eq!(event.outcome, AuditOutcome::Success);
2783        assert_eq!(event.target, "tauri.load_script");
2784    }
2785}