Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66// ── Re-exports for user convenience ──
67// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
68pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77    pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82    pub flows_total: usize,
83    pub flows_in_memory: usize,
84    pub flows_dropped: usize,
85    pub intercepts_pending: usize,
86    pub ws_pending_messages: usize,
87    pub oldest_intercept_age_ms: Option<u64>,
88    pub oldest_ws_message_age_ms: Option<u64>,
89    pub rule_exec_errors: usize,
90    pub audit_events_total: usize,
91    pub audit_events_failed: usize,
92    pub flow_events_lagged_total: usize,
93    pub audit_events_lagged_total: usize,
94    /// O4: Total bodies degraded (budget exceeded)
95    pub proxy_body_degraded_total: usize,
96    /// O4: Total HTTP requests processed through streaming pipeline
97    pub proxy_http_request_total: usize,
98    /// O4: Total sandbox rejections
99    pub proxy_sandbox_reject_total: usize,
100    /// O4: Total invalid method rejections (strict_http_semantics)
101    pub proxy_invalid_method_total: usize,
102    /// O4: Total invalid status code rejections (strict_http_semantics)
103    pub proxy_invalid_status_total: usize,
104    /// O4: Total idempotent request retries
105    pub proxy_retry_total: usize,
106    /// O4: Total bodies processed in tap (streaming) mode
107    pub proxy_stream_mode_tap_total: usize,
108    /// O4: Total bodies degraded from tap to pass-through
109    pub proxy_stream_mode_degrade_total: usize,
110}
111
112impl CoreMetrics {
113    pub fn to_prometheus_text(&self) -> String {
114        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
115        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
116        format!(
117            "relay_core_flows_total {}\n\
118relay_core_flows_in_memory {}\n\
119relay_core_flows_dropped_total {}\n\
120relay_core_intercepts_pending {}\n\
121relay_core_ws_pending_messages {}\n\
122relay_core_oldest_intercept_age_ms {}\n\
123relay_core_oldest_ws_message_age_ms {}\n\
124relay_core_rule_exec_errors_total {}\n\
125relay_core_audit_events_total {}\n\
126relay_core_audit_events_failed_total {}\n\
127relay_core_flow_events_lagged_total {}\n\
128relay_core_audit_events_lagged_total {}\n\
129relay_core_proxy_body_degraded_total {}\n\
130relay_core_proxy_http_request_total {}\n\
131relay_core_proxy_sandbox_reject_total {}\n\
132relay_core_proxy_invalid_method_total {}\n\
133relay_core_proxy_invalid_status_total {}\n\
134relay_core_proxy_retry_total {}\n\
135relay_core_proxy_stream_mode_tap_total {}\n\
136relay_core_proxy_stream_mode_degrade_total {}\n",
137            self.flows_total,
138            self.flows_in_memory,
139            self.flows_dropped,
140            self.intercepts_pending,
141            self.ws_pending_messages,
142            oldest_intercept_age_ms,
143            oldest_ws_message_age_ms,
144            self.rule_exec_errors,
145            self.audit_events_total,
146            self.audit_events_failed,
147            self.flow_events_lagged_total,
148            self.audit_events_lagged_total,
149            self.proxy_body_degraded_total,
150            self.proxy_http_request_total,
151            self.proxy_sandbox_reject_total,
152            self.proxy_invalid_method_total,
153            self.proxy_invalid_status_total,
154            self.proxy_retry_total,
155            self.proxy_stream_mode_tap_total,
156            self.proxy_stream_mode_degrade_total,
157        )
158    }
159}
160
161#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
162pub struct CoreStatusSnapshot {
163    pub phase: RuntimeLifecyclePhase,
164    pub running: bool,
165    pub port: Option<u16>,
166    pub uptime: Option<u64>,
167    pub last_error: Option<String>,
168}
169
170#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
171pub struct CoreStatusReport {
172    pub status: CoreStatusSnapshot,
173    pub metrics: CoreMetrics,
174}
175
176#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
177pub struct CoreInterceptSnapshot {
178    pub pending_count: usize,
179    pub ws_pending_count: usize,
180}
181
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
183pub struct CoreAuditSnapshot {
184    pub events: Vec<AuditEvent>,
185}
186
187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
188pub struct CoreAuditQuery {
189    pub since_ms: Option<u64>,
190    pub until_ms: Option<u64>,
191    pub actor: Option<AuditActor>,
192    pub kind: Option<AuditEventKind>,
193    pub outcome: Option<AuditOutcome>,
194    pub limit: usize,
195}
196
197impl Default for CoreAuditQuery {
198    fn default() -> Self {
199        Self {
200            since_ms: None,
201            until_ms: None,
202            actor: None,
203            kind: None,
204            outcome: None,
205            limit: 50,
206        }
207    }
208}
209
210#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
211#[serde(rename_all = "snake_case")]
212pub enum RuntimeLifecyclePhase {
213    Created,
214    Starting,
215    Running,
216    Stopping,
217    Stopped,
218    Failed,
219}
220
221impl RuntimeLifecyclePhase {
222    pub fn as_str(&self) -> &'static str {
223        match self {
224            Self::Created => "created",
225            Self::Starting => "starting",
226            Self::Running => "running",
227            Self::Stopping => "stopping",
228            Self::Stopped => "stopped",
229            Self::Failed => "failed",
230        }
231    }
232
233    pub fn is_active(&self) -> bool {
234        matches!(self, Self::Starting | Self::Running | Self::Stopping)
235    }
236}
237
238#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
239pub struct RuntimeLifecycle {
240    pub phase: RuntimeLifecyclePhase,
241    pub port: Option<u16>,
242    pub started_at_ms: Option<u64>,
243    pub last_error: Option<String>,
244}
245
246impl RuntimeLifecycle {
247    pub fn created() -> Self {
248        Self {
249            phase: RuntimeLifecyclePhase::Created,
250            port: None,
251            started_at_ms: None,
252            last_error: None,
253        }
254    }
255
256    pub fn is_active(&self) -> bool {
257        self.phase.is_active()
258    }
259
260    pub fn uptime_seconds(&self) -> Option<u64> {
261        let started_at_ms = self.started_at_ms?;
262        let now_ms = now_unix_ms();
263        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
264    }
265}
266
267pub enum ProxySpawnResult {
268    Started(tokio::task::JoinHandle<()>),
269    AlreadyRunning,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub enum ProxyStopResult {
274    Stopping,
275    NotRunning,
276}
277
278impl From<RuntimeLifecycle> for CoreStatusSnapshot {
279    fn from(lifecycle: RuntimeLifecycle) -> Self {
280        Self {
281            running: lifecycle.is_active(),
282            uptime: lifecycle.uptime_seconds(),
283            port: lifecycle.port,
284            last_error: lifecycle.last_error,
285            phase: lifecycle.phase,
286        }
287    }
288}
289
290pub struct CoreState {
291    flow_store: mpsc::Sender<FlowStoreMessage>,
292    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
293    rule_store: mpsc::Sender<RuleStoreMessage>,
294    store: Option<Store>,
295    #[cfg(feature = "script")]
296    pub script_interceptor: Arc<ScriptInterceptor>,
297    pub policy_tx: watch::Sender<ProxyPolicy>,
298    pub flows_dropped: Arc<AtomicUsize>,
299    audit_events_total: Arc<AtomicUsize>,
300    audit_events_failed: Arc<AtomicUsize>,
301    flow_events_lagged_total: Arc<AtomicUsize>,
302    audit_events_lagged_total: Arc<AtomicUsize>,
303    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
304    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
305    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
306    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
307    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
308    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
309}
310
311impl CoreState {
312    pub async fn new(db_url: Option<String>) -> Self {
313        const AUDIT_HISTORY_LIMIT: usize = 200;
314
315        let store = if let Some(url) = db_url {
316            match Store::connect(&url).await {
317                Ok(s) => {
318                    if let Err(e) = s.init().await {
319                        tracing::error!("Failed to init store: {}", e);
320                    }
321                    Some(s)
322                }
323                Err(e) => {
324                    tracing::error!("Failed to connect to store: {}", e);
325                    None
326                }
327            }
328        } else {
329            None
330        };
331
332        let (flow_tx, flow_rx) = mpsc::channel(10000);
333        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
334        tokio::spawn(flow_actor.run());
335
336        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
337        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
338        tokio::spawn(intercept_actor.run());
339
340        let (rule_tx, rule_rx) = mpsc::channel(100);
341        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
342        tokio::spawn(rule_actor.run());
343
344        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
345        let (flow_broadcast_tx, _) = broadcast::channel(1000);
346        let (audit_broadcast_tx, _) = broadcast::channel(256);
347        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
348
349        #[cfg(feature = "script")]
350        let script_interceptor = ScriptInterceptor::new()
351            .await
352            .expect("Failed to initialize ScriptInterceptor");
353        Self {
354            flow_store: flow_tx,
355            intercept_broker: intercept_tx,
356            rule_store: rule_tx,
357            store,
358            #[cfg(feature = "script")]
359            script_interceptor: Arc::new(script_interceptor),
360            policy_tx,
361            flows_dropped: Arc::new(AtomicUsize::new(0)),
362            audit_events_total: Arc::new(AtomicUsize::new(0)),
363            audit_events_failed: Arc::new(AtomicUsize::new(0)),
364            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
365            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
366            flow_broadcast_tx,
367            audit_broadcast_tx,
368            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
369            lifecycle_tx,
370            shutdown_tx: Mutex::new(None),
371        }
372    }
373
374    pub async fn get_metrics(&self) -> CoreMetrics {
375        let (flow_tx, flow_rx) = oneshot::channel();
376        let _ = self
377            .flow_store
378            .send(FlowStoreMessage::GetMetrics(flow_tx))
379            .await;
380        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
381
382        let (int_tx, int_rx) = oneshot::channel();
383        let _ = self
384            .intercept_broker
385            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
386            .await;
387        let (
388            intercepts_pending,
389            ws_pending_messages,
390            oldest_intercept_age_ms,
391            oldest_ws_message_age_ms,
392        ) = int_rx.await.unwrap_or((0, 0, None, None));
393
394        let (rule_tx, rule_rx) = oneshot::channel();
395        let _ = self
396            .rule_store
397            .send(RuleStoreMessage::GetMetrics(rule_tx))
398            .await;
399        let rule_exec_errors = rule_rx.await.unwrap_or(0);
400
401        CoreMetrics {
402            flows_total,
403            flows_in_memory,
404            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
405            intercepts_pending,
406            ws_pending_messages,
407            oldest_intercept_age_ms,
408            oldest_ws_message_age_ms,
409            rule_exec_errors,
410            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
411            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
412            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
413            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
414            proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
415            proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
416            proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
417            proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
418            proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
419            proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
420            proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
421            proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
422            ),
423        }
424    }
425
426    pub async fn get_metrics_prometheus_text(&self) -> String {
427        let mut text = self.get_metrics().await.to_prometheus_text();
428        #[cfg(feature = "script")]
429        {
430            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
431        }
432        text
433    }
434
435    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
436        self.lifecycle().into()
437    }
438
439    pub async fn status_report(&self) -> CoreStatusReport {
440        CoreStatusReport {
441            status: self.status_snapshot(),
442            metrics: self.get_metrics().await,
443        }
444    }
445
446    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
447        let metrics = self.get_metrics().await;
448        CoreInterceptSnapshot {
449            pending_count: metrics.intercepts_pending,
450            ws_pending_count: metrics.ws_pending_messages,
451        }
452    }
453
454    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
455        let events = self.recent_audit_events();
456        let start = events.len().saturating_sub(limit);
457        CoreAuditSnapshot {
458            events: events.into_iter().skip(start).collect(),
459        }
460    }
461
462    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
463        let limit = query.limit.clamp(1, 500);
464        if let Some(store) = &self.store {
465            let rows = store
466                .query_audit_events(
467                    query.since_ms,
468                    query.until_ms,
469                    query.actor.as_ref().map(AuditActor::as_str),
470                    query.kind.as_ref().map(AuditEventKind::as_str),
471                    query.outcome.as_ref().map(AuditOutcome::as_str),
472                    limit,
473                )
474                .await
475                .unwrap_or_default();
476
477            let mut events = Vec::with_capacity(rows.len());
478            for row in rows {
479                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
480                    events.push(event);
481                }
482            }
483            return CoreAuditSnapshot { events };
484        }
485
486        let mut events = self.recent_audit_events();
487        events.reverse();
488        let filtered = events
489            .into_iter()
490            .filter(|event| {
491                query
492                    .since_ms
493                    .map(|v| event.timestamp_ms >= v)
494                    .unwrap_or(true)
495            })
496            .filter(|event| {
497                query
498                    .until_ms
499                    .map(|v| event.timestamp_ms <= v)
500                    .unwrap_or(true)
501            })
502            .filter(|event| {
503                query
504                    .actor
505                    .as_ref()
506                    .map(|v| &event.actor == v)
507                    .unwrap_or(true)
508            })
509            .filter(|event| {
510                query
511                    .kind
512                    .as_ref()
513                    .map(|v| &event.kind == v)
514                    .unwrap_or(true)
515            })
516            .filter(|event| {
517                query
518                    .outcome
519                    .as_ref()
520                    .map(|v| &event.outcome == v)
521                    .unwrap_or(true)
522            })
523            .take(limit)
524            .collect();
525        CoreAuditSnapshot { events: filtered }
526    }
527
528    pub async fn get_flow(&self, id: String) -> Option<Flow> {
529        let (tx, rx) = oneshot::channel();
530        if let Err(e) = self
531            .flow_store
532            .send(FlowStoreMessage::GetFlow {
533                id: id.clone(),
534                respond_to: tx,
535            })
536            .await
537        {
538            error!("Failed to send GetFlow request: {}", e);
539            if let Some(store) = &self.store {
540                return store
541                    .load_flow(&id)
542                    .await
543                    .ok()
544                    .flatten()
545                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
546            }
547            return None;
548        }
549        let flow = rx
550            .await
551            .map_err(|e| {
552                error!("Failed to receive Flow response: {}", e);
553                e
554            })
555            .unwrap_or(None);
556        if let Some(flow) = flow {
557            return Some(redact_flow(flow, &self.current_redaction_policy()));
558        }
559        if let Some(store) = &self.store {
560            return store
561                .load_flow(&id)
562                .await
563                .ok()
564                .flatten()
565                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
566                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
567        }
568        None
569    }
570
571    pub async fn get_rules(&self) -> Vec<Rule> {
572        let (tx, rx) = oneshot::channel();
573        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
574            error!("Failed to send GetRules request: {}", e);
575            return Vec::new();
576        }
577        rx.await
578            .map_err(|e| {
579                error!("Failed to receive Rules response: {}", e);
580                e
581            })
582            .unwrap_or_default()
583    }
584
585    pub async fn set_rules(&self, rules: Vec<Rule>) {
586        let _ = self
587            .set_rules_from(
588                AuditActor::Runtime,
589                "rules.replace",
590                "rule_set".to_string(),
591                json!({}),
592                rules,
593            )
594            .await;
595    }
596
597    pub async fn upsert_rule_from(
598        &self,
599        actor: AuditActor,
600        operation: &str,
601        target: String,
602        details: serde_json::Value,
603        rule: Rule,
604    ) -> Result<(), String> {
605        let rule_id = rule.id.clone();
606        let mut rules = self.get_rules().await;
607        rules.retain(|existing| existing.id != rule_id);
608        rules.push(rule);
609        self.set_rules_from(actor, operation, target, details, rules)
610            .await
611    }
612
613    pub async fn delete_rule_from(
614        &self,
615        actor: AuditActor,
616        operation: &str,
617        target: String,
618        details: serde_json::Value,
619        rule_id: &str,
620    ) -> Result<bool, String> {
621        let mut rules = self.get_rules().await;
622        let before = rules.len();
623        rules.retain(|rule| rule.id != rule_id);
624        if rules.len() == before {
625            return Ok(false);
626        }
627        self.set_rules_from(actor, operation, target, details, rules)
628            .await
629            .map(|_| true)
630    }
631
632    pub async fn create_mock_response_rule_from(
633        &self,
634        actor: AuditActor,
635        target: String,
636        details: serde_json::Value,
637        config: MockResponseRuleConfig,
638    ) -> Result<String, String> {
639        let rule_id = config.rule_id.clone();
640        let rule = build_mock_response_rule(config);
641        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
642            .await
643            .map(|_| rule_id)
644    }
645
646    pub async fn create_intercept_rule_from(
647        &self,
648        actor: AuditActor,
649        target: String,
650        details: serde_json::Value,
651        config: InterceptRuleConfig,
652    ) -> Result<String, String> {
653        let rule_id = config.rule_id.clone();
654        let mut rules = self.get_rules().await;
655        rules.extend(build_intercept_rules(config));
656        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
657            .await
658            .map(|_| rule_id)
659    }
660
661    pub async fn upsert_legacy_intercept_rule_from(
662        &self,
663        actor: AuditActor,
664        target: String,
665        details: serde_json::Value,
666        rule: InterceptRule,
667    ) -> Result<(), String> {
668        let rule_id = rule.id.clone();
669        let mut rules = self.get_rules().await;
670        rules.retain(|existing| {
671            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
672        });
673        rules.extend(rule.to_rules());
674        self.set_rules_from(
675            actor,
676            "rule.intercept_legacy_upsert",
677            target,
678            details,
679            rules,
680        )
681        .await
682    }
683
684    pub async fn set_rules_from(
685        &self,
686        actor: AuditActor,
687        operation: &str,
688        target: String,
689        details: serde_json::Value,
690        rules: Vec<Rule>,
691    ) -> Result<(), String> {
692        let rule_count = rules.len();
693        if let Err(e) = self
694            .rule_store
695            .send(RuleStoreMessage::SetRules(rules))
696            .await
697        {
698            error!("Failed to send SetRules request: {}", e);
699            self.record_audit_event(AuditEvent::new(
700                actor,
701                AuditEventKind::RuleChanged,
702                target,
703                AuditOutcome::Failed,
704                json!({
705                    "operation": operation,
706                    "rule_count": rule_count,
707                    "details": details,
708                    "error": e.to_string()
709                }),
710            ));
711            return Err(e.to_string());
712        }
713
714        self.record_audit_event(AuditEvent::new(
715            actor,
716            AuditEventKind::RuleChanged,
717            target,
718            AuditOutcome::Success,
719            json!({
720                "operation": operation,
721                "rule_count": rule_count,
722                "details": details
723            }),
724        ));
725        Ok(())
726    }
727
728    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
729        let mut new_rules = Vec::new();
730        for rule in rules {
731            new_rules.extend(rule.to_rules());
732        }
733        self.set_rules(new_rules).await;
734    }
735
736    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
737        let (tx, rx) = oneshot::channel();
738        if let Err(e) = self
739            .rule_store
740            .send(RuleStoreMessage::GetRuleEngine(tx))
741            .await
742        {
743            error!("Failed to send GetRuleEngine request: {}", e);
744            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
745        }
746        rx.await
747            .map_err(|e| {
748                error!("Failed to receive RuleEngine response: {}", e);
749                e
750            })
751            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
752    }
753
754    pub fn update_policy(&self, policy: ProxyPolicy) {
755        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
756    }
757
758    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
759        let mut policy = self.policy_snapshot();
760        policy.apply_patch(patch);
761        self.update_policy_from(actor, target, policy);
762    }
763
764    pub fn policy_snapshot(&self) -> ProxyPolicy {
765        self.policy_tx.borrow().clone()
766    }
767
768    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
769        let details = json!({
770            "strict_http_semantics": policy.strict_http_semantics,
771            "request_timeout_ms": policy.request_timeout_ms,
772            "max_body_size": policy.max_body_size,
773            "transparent_enabled": policy.transparent_enabled,
774            "redaction_enabled": policy.redaction.enabled,
775            "redact_bodies": policy.redaction.redact_bodies
776        });
777
778        self.policy_tx.send_replace(policy);
779        self.record_audit_event(AuditEvent::new(
780            actor,
781            AuditEventKind::PolicyUpdated,
782            target,
783            AuditOutcome::Success,
784            details,
785        ));
786    }
787
788    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
789        if let Err(e) = self
790            .intercept_broker
791            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
792            .await
793        {
794            error!("Failed to send RegisterIntercept request: {}", e);
795        }
796    }
797
798    pub async fn resolve_intercept(
799        &self,
800        key: String,
801        result: InterceptionResult,
802    ) -> Result<(), String> {
803        let (tx, rx) = oneshot::channel();
804        if let Err(e) = self
805            .intercept_broker
806            .send(InterceptBrokerMessage::ResolveIntercept {
807                key,
808                result,
809                respond_to: tx,
810            })
811            .await
812        {
813            error!("Failed to send ResolveIntercept request: {}", e);
814            return Err(e.to_string());
815        }
816        rx.await.map_err(|_| "Actor dropped".to_string())?
817    }
818
819    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
820        let (tx, rx) = oneshot::channel();
821        if let Err(e) = self
822            .intercept_broker
823            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
824                key,
825                respond_to: tx,
826            })
827            .await
828        {
829            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
830            return None;
831        }
832        rx.await
833            .map_err(|e| {
834                error!("Failed to receive WebSocketMessage response: {}", e);
835                e
836            })
837            .unwrap_or(None)
838    }
839
840    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
841        if let Err(e) = self
842            .intercept_broker
843            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
844            .await
845        {
846            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
847        }
848    }
849
850    pub async fn is_intercept_pending(&self, key: String) -> bool {
851        let (tx, rx) = oneshot::channel();
852        if let Err(e) = self
853            .intercept_broker
854            .send(InterceptBrokerMessage::GetPendingIntercept {
855                key,
856                respond_to: tx,
857            })
858            .await
859        {
860            error!("Failed to send GetPendingIntercept request: {}", e);
861            return false;
862        }
863        rx.await
864            .map_err(|e| {
865                error!("Failed to receive PendingIntercept response: {}", e);
866                e
867            })
868            .unwrap_or(false)
869    }
870
871    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
872        let (tx, rx) = oneshot::channel();
873        if let Err(e) = self
874            .intercept_broker
875            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
876                flow_id,
877                respond_to: tx,
878            })
879            .await
880        {
881            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
882            return false;
883        }
884        rx.await
885            .map_err(|e| {
886                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
887                e
888            })
889            .unwrap_or(false)
890    }
891
892    pub fn upsert_flow(&self, flow: Box<Flow>) {
893        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
894            // Drop flow if channel is full
895            error!("FlowStore dropped flow: {}", e);
896            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
897        }
898    }
899
900    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
901        if let Err(e) = self
902            .flow_store
903            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
904        {
905            error!("FlowStore dropped WS message: {}", e);
906            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
907        }
908    }
909
910    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
911        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
912            flow_id,
913            body,
914            direction,
915        }) {
916            error!("FlowStore dropped HTTP body: {}", e);
917            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
918        }
919    }
920
921    /// P1: Tag a flow as budget-exceeded (body too large for full rule inspection)
922    pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
923        if let Err(e) = self
924            .flow_store
925            .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
926        {
927            error!("FlowStore dropped budget-exceeded tag: {}", e);
928            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
929        }
930    }
931
932    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
933    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
934        self.flow_broadcast_tx.subscribe()
935    }
936
937    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
938        self.audit_broadcast_tx.subscribe()
939    }
940
941    fn current_redaction_policy(&self) -> RedactionPolicy {
942        self.policy_tx.borrow().redaction.clone()
943    }
944
945    pub fn record_flow_events_lagged(&self, skipped: u64) {
946        self.flow_events_lagged_total
947            .fetch_add(skipped as usize, Ordering::Relaxed);
948    }
949
950    pub fn record_audit_events_lagged(&self, skipped: u64) {
951        self.audit_events_lagged_total
952            .fetch_add(skipped as usize, Ordering::Relaxed);
953    }
954
955    pub fn lifecycle(&self) -> RuntimeLifecycle {
956        self.lifecycle_tx.borrow().clone()
957    }
958
959    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
960        self.lifecycle_tx.subscribe()
961    }
962
963    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
964        let current = self.lifecycle();
965        if current.is_active() {
966            return Err(format!(
967                "Proxy is already {} on port {}",
968                current.phase.as_str(),
969                current.port.unwrap_or(port)
970            ));
971        }
972
973        let mut guard = self
974            .shutdown_tx
975            .lock()
976            .map_err(|_| "shutdown state poisoned".to_string())?;
977        *guard = Some(shutdown_tx);
978        drop(guard);
979
980        self.update_lifecycle(RuntimeLifecycle {
981            phase: RuntimeLifecyclePhase::Starting,
982            port: Some(port),
983            started_at_ms: None,
984            last_error: None,
985        });
986        Ok(())
987    }
988
989    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
990        let mut guard = self
991            .shutdown_tx
992            .lock()
993            .map_err(|_| "shutdown state poisoned".to_string())?;
994        let Some(tx) = guard.take() else {
995            return Ok(ProxyStopResult::NotRunning);
996        };
997        drop(guard);
998
999        let current = self.lifecycle();
1000        self.update_lifecycle(RuntimeLifecycle {
1001            phase: RuntimeLifecyclePhase::Stopping,
1002            port: current.port,
1003            started_at_ms: current.started_at_ms,
1004            last_error: current.last_error,
1005        });
1006        let _ = tx.send(());
1007        Ok(ProxyStopResult::Stopping)
1008    }
1009
1010    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1011        self.audit_history
1012            .lock()
1013            .map(|events| events.iter().cloned().collect())
1014            .unwrap_or_default()
1015    }
1016
1017    /// 搜索内存中的 Flow 列表,返回轻量摘要。
1018    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1019        let redaction = self.current_redaction_policy();
1020        if let Some(store) = &self.store {
1021            return store
1022                .query_flow_summaries(&query)
1023                .await
1024                .unwrap_or_default()
1025                .into_iter()
1026                .map(|summary| redact_flow_summary(summary, &redaction))
1027                .collect();
1028        }
1029        let (tx, rx) = oneshot::channel();
1030        if let Err(e) = self
1031            .flow_store
1032            .send(FlowStoreMessage::SearchFlows {
1033                query,
1034                respond_to: tx,
1035            })
1036            .await
1037        {
1038            error!("Failed to send SearchFlows request: {}", e);
1039            return Vec::new();
1040        }
1041        rx.await
1042            .unwrap_or_default()
1043            .into_iter()
1044            .map(|summary| redact_flow_summary(summary, &redaction))
1045            .collect()
1046    }
1047
1048    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1049        let redaction = self.current_redaction_policy();
1050        redact_flow_update(update, &redaction)
1051    }
1052
1053    /// 解除截获并可选地应用修改。
1054    ///
1055    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1056    /// - 若 `mods` 为 None,直接 Continue;
1057    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1058    ///
1059    /// `key` 格式约定:
1060    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1061    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1062    pub async fn resolve_intercept_with_modifications(
1063        &self,
1064        key: String,
1065        action: &str,
1066        mods: Option<relay_core_api::modification::FlowModification>,
1067    ) -> Result<(), String> {
1068        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1069            .await
1070    }
1071
1072    pub async fn resolve_intercept_with_modifications_from(
1073        &self,
1074        actor: AuditActor,
1075        key: String,
1076        action: &str,
1077        mods: Option<relay_core_api::modification::FlowModification>,
1078    ) -> Result<(), String> {
1079        let modified_fields = modification_field_names(mods.as_ref());
1080        let result = match action {
1081            "drop" => InterceptionResult::Drop,
1082            _ => match mods {
1083                None => InterceptionResult::Continue,
1084                Some(m) => {
1085                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1086                    match parts.as_slice() {
1087                        [flow_id, phase] => {
1088                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1089                                modification::apply_flow_modification(&flow, phase, m)
1090                            } else {
1091                                InterceptionResult::Continue
1092                            }
1093                        }
1094                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1095                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1096                        [_, "ws_msg", _] => {
1097                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1098                                modification::apply_ws_modification(&msg, m)
1099                            } else {
1100                                InterceptionResult::Continue
1101                            }
1102                        }
1103                        _ => InterceptionResult::Continue,
1104                    }
1105                }
1106            },
1107        };
1108        let outcome = self.resolve_intercept(key.clone(), result).await;
1109        let audit_outcome = if outcome.is_ok() {
1110            AuditOutcome::Success
1111        } else {
1112            AuditOutcome::Failed
1113        };
1114        let error_message = outcome.as_ref().err().cloned();
1115
1116        self.record_audit_event(AuditEvent::new(
1117            actor,
1118            AuditEventKind::InterceptResolved,
1119            key,
1120            audit_outcome,
1121            json!({
1122                "action": action,
1123                "has_modifications": !modified_fields.is_empty(),
1124                "modified_fields": modified_fields,
1125                "error": error_message
1126            }),
1127        ));
1128        outcome
1129    }
1130
1131    #[cfg(feature = "script")]
1132    pub async fn load_script_from(
1133        &self,
1134        actor: AuditActor,
1135        target: String,
1136        script: &str,
1137    ) -> Result<(), String> {
1138        let result = self
1139            .script_interceptor
1140            .load_script(script)
1141            .await
1142            .map_err(|e| e.to_string());
1143
1144        let outcome = if result.is_ok() {
1145            AuditOutcome::Success
1146        } else {
1147            AuditOutcome::Failed
1148        };
1149        let error_message = result.as_ref().err().cloned();
1150
1151        self.record_audit_event(AuditEvent::new(
1152            actor,
1153            AuditEventKind::ScriptReloaded,
1154            target,
1155            outcome,
1156            json!({
1157                "script_bytes": script.len(),
1158                "error": error_message
1159            }),
1160        ));
1161
1162        result
1163    }
1164
1165    /// S5: Set the env var whitelist for relay.env() in user scripts.
1166    #[cfg(feature = "script")]
1167    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1168        self.script_interceptor.set_env_allow(env_allow).await;
1169    }
1170
1171    fn record_audit_event(&self, event: AuditEvent) {
1172        const AUDIT_HISTORY_LIMIT: usize = 200;
1173        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1174        if event.outcome == AuditOutcome::Failed {
1175            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1176        }
1177
1178        let details = event.details.to_string();
1179        tracing::info!(
1180            target: "relay_core_audit",
1181            event_id = %event.id,
1182            actor = %event.actor.as_str(),
1183            kind = %event.kind.as_str(),
1184            target = %event.target,
1185            outcome = %event.outcome.as_str(),
1186            details = %details
1187        );
1188
1189        if let Ok(mut history) = self.audit_history.lock() {
1190            if history.len() >= AUDIT_HISTORY_LIMIT {
1191                history.pop_front();
1192            }
1193            history.push_back(event.clone());
1194        }
1195
1196        if let Some(store) = self.store.clone() {
1197            let event_json = serde_json::to_value(&event).unwrap_or_default();
1198            let event_id = event.id.clone();
1199            let timestamp_ms = event.timestamp_ms;
1200            let actor = event.actor.as_str().to_string();
1201            let kind = event.kind.as_str().to_string();
1202            let target = event.target.clone();
1203            let outcome = event.outcome.as_str().to_string();
1204            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1205                handle.spawn(async move {
1206                    if let Err(e) = store
1207                        .save_audit_event(AuditEventRecord {
1208                            id: &event_id,
1209                            timestamp_ms,
1210                            actor: &actor,
1211                            kind: &kind,
1212                            target: &target,
1213                            outcome: &outcome,
1214                            content: &event_json,
1215                        })
1216                        .await
1217                    {
1218                        tracing::error!("Failed to persist audit event: {}", e);
1219                    }
1220                });
1221            }
1222        }
1223
1224        let _ = self.audit_broadcast_tx.send(event);
1225    }
1226
1227    fn transition_to_running(&self, port: u16) {
1228        self.update_lifecycle(RuntimeLifecycle {
1229            phase: RuntimeLifecyclePhase::Running,
1230            port: Some(port),
1231            started_at_ms: Some(now_unix_ms()),
1232            last_error: None,
1233        });
1234    }
1235
1236    fn transition_to_stopped(&self) {
1237        if let Ok(mut guard) = self.shutdown_tx.lock() {
1238            *guard = None;
1239        }
1240
1241        self.update_lifecycle(RuntimeLifecycle {
1242            phase: RuntimeLifecyclePhase::Stopped,
1243            port: None,
1244            started_at_ms: None,
1245            last_error: None,
1246        });
1247    }
1248
1249    fn transition_to_failed(&self, port: u16, error: String) {
1250        if let Ok(mut guard) = self.shutdown_tx.lock() {
1251            *guard = None;
1252        }
1253
1254        self.update_lifecycle(RuntimeLifecycle {
1255            phase: RuntimeLifecyclePhase::Failed,
1256            port: Some(port),
1257            started_at_ms: None,
1258            last_error: Some(error),
1259        });
1260    }
1261
1262    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1263        tracing::info!(
1264            target: "relay_core_lifecycle",
1265            phase = %lifecycle.phase.as_str(),
1266            port = ?lifecycle.port,
1267            started_at_ms = ?lifecycle.started_at_ms,
1268            last_error = ?lifecycle.last_error
1269        );
1270        self.lifecycle_tx.send_replace(lifecycle);
1271    }
1272
1273    pub fn spawn_proxy(
1274        self: &Arc<Self>,
1275        config: ProxyConfig,
1276        sink: mpsc::Sender<FlowUpdate>,
1277        extra_interceptor: Option<Arc<dyn Interceptor>>,
1278    ) -> Result<ProxySpawnResult, String> {
1279        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1280        match self.prepare_start(config.port, shutdown_tx) {
1281            Ok(()) => {}
1282            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1283            Err(error) => return Err(error),
1284        }
1285        let state = self.clone();
1286        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1287            if let Err(error) = state
1288                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1289                .await
1290            {
1291                error!("Proxy failed: {}", error);
1292            }
1293        })))
1294    }
1295
1296    pub async fn start_proxy(
1297        self: &Arc<Self>,
1298        config: ProxyConfig,
1299        sink: mpsc::Sender<FlowUpdate>,
1300        extra_interceptor: Option<Arc<dyn Interceptor>>,
1301    ) -> Result<(), String> {
1302        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1303        self.prepare_start(config.port, shutdown_tx)?;
1304        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1305            .await
1306    }
1307
1308    async fn run_proxy(
1309        self: &Arc<Self>,
1310        config: ProxyConfig,
1311        sink: mpsc::Sender<FlowUpdate>,
1312        extra_interceptor: Option<Arc<dyn Interceptor>>,
1313        shutdown_rx: oneshot::Receiver<()>,
1314    ) -> Result<(), String> {
1315        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1316        let state = self.clone();
1317
1318        if let Some(parent) = config.ca_cert_path.parent()
1319            && !parent.exists()
1320        {
1321            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1322        }
1323
1324        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1325            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1326        let ca = Arc::new(ca);
1327
1328        #[cfg(feature = "script")]
1329        let script_interceptor = self.script_interceptor.clone();
1330
1331        #[cfg(feature = "script")]
1332        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1333
1334        #[cfg(not(feature = "script"))]
1335        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1336
1337        interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1338            self.clone(),
1339        )));
1340
1341        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1342            self.clone(),
1343        )));
1344
1345        if let Some(interceptor) = extra_interceptor {
1346            interceptors.push(interceptor);
1347        }
1348
1349        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1350        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1351
1352        tokio::spawn(async move {
1353            while let Some(update) = proxy_rx.recv().await {
1354                match update.clone() {
1355                    FlowUpdate::Full(flow) => {
1356                        state.upsert_flow(flow);
1357                    }
1358                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1359                        state.append_ws_message(flow_id, message);
1360                    }
1361                    FlowUpdate::HttpBody {
1362                        flow_id,
1363                        direction,
1364                        body,
1365                    } => {
1366                        state.update_http_body(flow_id, body, direction);
1367                    }
1368                    FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1369                        // P1: Tag the flow as budget-exceeded and update resilience trace
1370                        state.tag_flow_budget_exceeded(flow_id, direction);
1371                    }
1372                }
1373
1374                let _ = state.flow_broadcast_tx.send(update.clone());
1375
1376                if sink.try_send(update).is_err() {
1377                    relay_core_lib::metrics::inc_flows_dropped();
1378                }
1379            }
1380        });
1381
1382        if let Some(udp_port) = config.udp_tproxy_port {
1383            let udp_proxy_tx = proxy_tx.clone();
1384            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1385            tokio::spawn(async move {
1386                match tokio::net::UdpSocket::bind(udp_addr).await {
1387                    Ok(socket) => {
1388                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1389                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1390                            error!("UDP TPROXY failed: {}", e);
1391                        }
1392                    }
1393                    Err(e) => {
1394                        error!("Failed to bind UDP TPROXY socket: {}", e);
1395                    }
1396                }
1397            });
1398        }
1399
1400        let listener = match TcpListener::bind(addr).await {
1401            Ok(listener) => listener,
1402            Err(e) => {
1403                if let Ok(mut guard) = self.shutdown_tx.lock() {
1404                    *guard = None;
1405                }
1406                let message = format!("Failed to bind to address {}: {}", addr, e);
1407                self.transition_to_failed(config.port, message.clone());
1408                return Err(message);
1409            }
1410        };
1411
1412        self.transition_to_running(config.port);
1413        let shutdown_rx = Some(shutdown_rx);
1414
1415        if config.transparent {
1416            let policy = ProxyPolicy {
1417                transparent_enabled: true,
1418                ..Default::default()
1419            };
1420            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1421            let policy_rx = self.policy_tx.subscribe();
1422
1423            let provider: Arc<dyn OriginalDstProvider> = {
1424                let mut addrs = BTreeSet::new();
1425                if let Ok(local) = listener.local_addr() {
1426                    addrs.insert(local);
1427                }
1428
1429                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1430                {
1431                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1432                }
1433                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1434                {
1435                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1436                        Ok(provider) => Arc::new(provider),
1437                        Err(e) => {
1438                            error!("Failed to initialize macOS PF provider: {}", e);
1439                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1440                        }
1441                    }
1442                }
1443                #[cfg(target_os = "windows")]
1444                {
1445                    let filter =
1446                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1447                            .to_string();
1448                    let port = config.port;
1449                    tokio::spawn(async move {
1450                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1451                            .await;
1452                    });
1453
1454                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1455                }
1456                #[cfg(not(any(
1457                    all(target_os = "linux", feature = "transparent-linux"),
1458                    all(target_os = "macos", feature = "transparent-macos"),
1459                    target_os = "windows"
1460                )))]
1461                {
1462                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1463                }
1464            };
1465
1466            let source = TransparentTcpCaptureSource::new(listener, provider);
1467            let result = relay_core_lib::start_proxy(
1468                source,
1469                proxy_tx,
1470                interceptor,
1471                ca,
1472                policy_rx,
1473                None,
1474                shutdown_rx,
1475                Some(self.get_rule_engine().await),
1476            )
1477            .await
1478            .map_err(|e| e.to_string());
1479            if let Err(error) = &result {
1480                self.transition_to_failed(config.port, error.clone());
1481            } else {
1482                self.transition_to_stopped();
1483            }
1484            result
1485        } else {
1486            let source = TcpCaptureSource::new(listener);
1487            let policy = ProxyPolicy::default();
1488            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1489            let policy_rx = self.policy_tx.subscribe();
1490
1491            let result = relay_core_lib::start_proxy(
1492                source,
1493                proxy_tx,
1494                interceptor,
1495                ca,
1496                policy_rx,
1497                None,
1498                shutdown_rx,
1499                Some(self.get_rule_engine().await),
1500            )
1501            .await
1502            .map_err(|e| e.to_string());
1503            if let Err(error) = &result {
1504                self.transition_to_failed(config.port, error.clone());
1505            } else {
1506                self.transition_to_stopped();
1507            }
1508            result
1509        }
1510    }
1511}
1512
1513fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1514    match update {
1515        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1516        FlowUpdate::WebSocketMessage {
1517            flow_id,
1518            mut message,
1519        } => {
1520            message.content = redact_body(message.content, redaction);
1521            FlowUpdate::WebSocketMessage { flow_id, message }
1522        }
1523        FlowUpdate::HttpBody {
1524            flow_id,
1525            direction,
1526            body,
1527        } => FlowUpdate::HttpBody {
1528            flow_id,
1529            direction,
1530            body: redact_body(body, redaction),
1531        },
1532        FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1533            FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1534        }
1535    }
1536}
1537
1538fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1539    if !redaction.enabled {
1540        return flow;
1541    }
1542    match &mut flow.layer {
1543        Layer::Http(http) => {
1544            redact_http_request(&mut http.request, redaction);
1545            if let Some(response) = &mut http.response {
1546                redact_headers(&mut response.headers, redaction);
1547                response.body = response
1548                    .body
1549                    .take()
1550                    .map(|body| redact_body(body, redaction));
1551            }
1552        }
1553        Layer::WebSocket(ws) => {
1554            redact_http_request(&mut ws.handshake_request, redaction);
1555            redact_headers(&mut ws.handshake_response.headers, redaction);
1556            ws.handshake_response.body = ws
1557                .handshake_response
1558                .body
1559                .take()
1560                .map(|body| redact_body(body, redaction));
1561            for message in &mut ws.messages {
1562                message.content = redact_body(message.content.clone(), redaction);
1563            }
1564        }
1565        _ => {}
1566    }
1567    flow
1568}
1569
1570fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1571    if !redaction.enabled {
1572        return summary;
1573    }
1574    summary.url = redact_url_string(&summary.url, redaction);
1575    summary
1576}
1577
1578fn redact_http_request(
1579    request: &mut relay_core_api::flow::HttpRequest,
1580    redaction: &RedactionPolicy,
1581) {
1582    redact_headers(&mut request.headers, redaction);
1583    redact_query_pairs(&mut request.query, redaction);
1584    request.url = redact_url(&request.url, redaction);
1585    request.body = request.body.take().map(|body| redact_body(body, redaction));
1586}
1587
1588fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1589    if !redaction.enabled {
1590        return;
1591    }
1592    let sensitive = redaction_set(&redaction.sensitive_header_names);
1593    for (name, value) in headers.iter_mut() {
1594        if sensitive.contains(&name.to_ascii_lowercase()) {
1595            *value = "[REDACTED]".to_string();
1596        }
1597    }
1598}
1599
1600fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1601    if !redaction.enabled {
1602        return;
1603    }
1604    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1605    for (name, value) in query.iter_mut() {
1606        if sensitive.contains(&name.to_ascii_lowercase()) {
1607            *value = "[REDACTED]".to_string();
1608        }
1609    }
1610}
1611
1612fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1613    if !redaction.enabled {
1614        return url.clone();
1615    }
1616    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1617    let pairs: Vec<(String, String)> = url
1618        .query_pairs()
1619        .map(|(k, v)| {
1620            let key = k.to_string();
1621            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1622                "[REDACTED]".to_string()
1623            } else {
1624                v.to_string()
1625            };
1626            (key, value)
1627        })
1628        .collect();
1629    let mut next = url.clone();
1630    if pairs.is_empty() {
1631        return next;
1632    }
1633    next.query_pairs_mut().clear();
1634    for (k, v) in pairs {
1635        next.query_pairs_mut().append_pair(&k, &v);
1636    }
1637    next
1638}
1639
1640fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1641    match url::Url::parse(input) {
1642        Ok(url) => redact_url(&url, redaction).to_string(),
1643        Err(_) => input.to_string(),
1644    }
1645}
1646
1647fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1648    if redaction.enabled && redaction.redact_bodies {
1649        body.content = "[REDACTED]".to_string();
1650    }
1651    body
1652}
1653
1654fn redaction_set(values: &[String]) -> HashSet<String> {
1655    values
1656        .iter()
1657        .map(|value| value.to_ascii_lowercase())
1658        .collect()
1659}
1660
1661#[derive(Clone)]
1662pub struct ProxyConfig {
1663    pub port: u16,
1664    pub ca_cert_path: std::path::PathBuf,
1665    pub ca_key_path: std::path::PathBuf,
1666    pub transparent: bool,
1667    pub udp_tproxy_port: Option<u16>,
1668}
1669
1670impl ProxyConfig {
1671    pub fn new(
1672        port: u16,
1673        ca_cert_path: std::path::PathBuf,
1674        ca_key_path: std::path::PathBuf,
1675    ) -> Self {
1676        Self {
1677            port,
1678            ca_cert_path,
1679            ca_key_path,
1680            transparent: false,
1681            udp_tproxy_port: None,
1682        }
1683    }
1684
1685    pub fn from_app_data_dir(
1686        app_data_dir: impl Into<std::path::PathBuf>,
1687        port: u16,
1688    ) -> Result<Self, String> {
1689        let app_data_dir = app_data_dir.into();
1690        if !app_data_dir.exists() {
1691            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1692        }
1693
1694        Ok(Self::new(
1695            port,
1696            app_data_dir.join("ca_cert.pem"),
1697            app_data_dir.join("ca_key.pem"),
1698        ))
1699    }
1700
1701    pub fn with_transparent(mut self, transparent: bool) -> Self {
1702        self.transparent = transparent;
1703        self
1704    }
1705
1706    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1707        self.udp_tproxy_port = udp_tproxy_port;
1708        self
1709    }
1710}
1711
1712fn now_unix_ms() -> u64 {
1713    SystemTime::now()
1714        .duration_since(UNIX_EPOCH)
1715        .unwrap_or_default()
1716        .as_millis() as u64
1717}
1718
1719fn modification_field_names(
1720    mods: Option<&relay_core_api::modification::FlowModification>,
1721) -> Vec<&'static str> {
1722    let Some(mods) = mods else {
1723        return Vec::new();
1724    };
1725
1726    let mut fields = Vec::new();
1727    if mods.method.is_some() {
1728        fields.push("method");
1729    }
1730    if mods.url.is_some() {
1731        fields.push("url");
1732    }
1733    if mods.request_headers.is_some() {
1734        fields.push("request_headers");
1735    }
1736    if mods.request_body.is_some() {
1737        fields.push("request_body");
1738    }
1739    if mods.status_code.is_some() {
1740        fields.push("status_code");
1741    }
1742    if mods.response_headers.is_some() {
1743        fields.push("response_headers");
1744    }
1745    if mods.response_body.is_some() {
1746        fields.push("response_body");
1747    }
1748    if mods.message_content.is_some() {
1749        fields.push("message_content");
1750    }
1751    fields
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756    use super::*;
1757    use chrono::Utc;
1758    use relay_core_api::flow::{
1759        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1760        ResponseTiming, TransportProtocol,
1761    };
1762    use std::sync::atomic::{AtomicU64, Ordering};
1763    use tokio::time::{Duration, sleep};
1764    use url::Url;
1765    use uuid::Uuid;
1766
1767    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1768
1769    fn sqlite_url() -> String {
1770        let nanos = SystemTime::now()
1771            .duration_since(UNIX_EPOCH)
1772            .expect("clock drift")
1773            .as_nanos();
1774        let pid = std::process::id();
1775        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1776        let db_dir = std::env::current_dir()
1777            .expect("cwd")
1778            .join("target")
1779            .join("test-dbs");
1780        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1781        let db_path = db_dir.join(format!(
1782            "relay-core-runtime-test-{}-{}-{}.db",
1783            pid, nanos, seq
1784        ));
1785        format!("sqlite://{}?mode=rwc", db_path.display())
1786    }
1787
1788    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1789        let start_time =
1790            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1791        let request_url =
1792            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1793        Flow {
1794            id: Uuid::new_v4(),
1795            start_time,
1796            end_time: Some(start_time),
1797            network: NetworkInfo {
1798                client_ip: "127.0.0.1".to_string(),
1799                client_port: 12000,
1800                server_ip: "127.0.0.1".to_string(),
1801                server_port: 8080,
1802                protocol: TransportProtocol::TCP,
1803                tls: false,
1804                tls_version: None,
1805                sni: None,
1806            },
1807            layer: Layer::Http(HttpLayer {
1808                request: HttpRequest {
1809                    method: method.to_string(),
1810                    url: request_url,
1811                    version: "HTTP/1.1".to_string(),
1812                    headers: vec![],
1813                    cookies: vec![],
1814                    query: vec![],
1815                    body: None,
1816                },
1817                response: Some(HttpResponse {
1818                    status,
1819                    status_text: "OK".to_string(),
1820                    version: "HTTP/1.1".to_string(),
1821                    headers: vec![],
1822                    cookies: vec![],
1823                    body: None,
1824                    timing: ResponseTiming {
1825                        time_to_first_byte: None,
1826                        time_to_last_byte: None,
1827                        connect_time_ms: None,
1828                        ssl_time_ms: None,
1829                    },
1830                }),
1831                error: None,
1832            }),
1833            tags: vec![],
1834            meta: std::collections::HashMap::new(),
1835            resilience_trace: None,
1836            rule_variables: std::collections::HashMap::new(),
1837            matched_rules: vec![],
1838        }
1839    }
1840
1841    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1842        let start_time =
1843            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1844        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1845            .expect("url should parse");
1846        Flow {
1847            id: Uuid::new_v4(),
1848            start_time,
1849            end_time: Some(start_time),
1850            network: NetworkInfo {
1851                client_ip: "127.0.0.1".to_string(),
1852                client_port: 12000,
1853                server_ip: "127.0.0.1".to_string(),
1854                server_port: 8080,
1855                protocol: TransportProtocol::TCP,
1856                tls: false,
1857                tls_version: None,
1858                sni: None,
1859            },
1860            layer: Layer::Http(HttpLayer {
1861                request: HttpRequest {
1862                    method: "GET".to_string(),
1863                    url: request_url,
1864                    version: "HTTP/1.1".to_string(),
1865                    headers: vec![
1866                        (
1867                            "Authorization".to_string(),
1868                            "Bearer secret-token".to_string(),
1869                        ),
1870                        ("X-Normal".to_string(), "visible".to_string()),
1871                    ],
1872                    cookies: vec![],
1873                    query: vec![
1874                        ("token".to_string(), "abc123".to_string()),
1875                        ("ok".to_string(), "1".to_string()),
1876                    ],
1877                    body: Some(BodyData {
1878                        encoding: "utf-8".to_string(),
1879                        content: "secret request body".to_string(),
1880                        size: 19,
1881                    }),
1882                },
1883                response: Some(HttpResponse {
1884                    status: 200,
1885                    status_text: "OK".to_string(),
1886                    version: "HTTP/1.1".to_string(),
1887                    headers: vec![
1888                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1889                        ("X-Response".to_string(), "visible".to_string()),
1890                    ],
1891                    cookies: vec![],
1892                    body: Some(BodyData {
1893                        encoding: "utf-8".to_string(),
1894                        content: "secret response body".to_string(),
1895                        size: 20,
1896                    }),
1897                    timing: ResponseTiming {
1898                        time_to_first_byte: None,
1899                        time_to_last_byte: None,
1900                        connect_time_ms: None,
1901                        ssl_time_ms: None,
1902                    },
1903                }),
1904                error: None,
1905            }),
1906            tags: vec![],
1907            meta: std::collections::HashMap::new(),
1908            resilience_trace: None,
1909            rule_variables: std::collections::HashMap::new(),
1910            matched_rules: vec![],
1911        }
1912    }
1913
1914    #[tokio::test]
1915    async fn set_rules_from_records_audit_event() {
1916        let state = CoreState::new(None).await;
1917
1918        state
1919            .set_rules_from(
1920                AuditActor::Http,
1921                "rule.upsert",
1922                "rule-1".to_string(),
1923                json!({ "route": "/api/v1/rules" }),
1924                Vec::new(),
1925            )
1926            .await
1927            .expect("set rules should succeed");
1928
1929        let events = state.recent_audit_events();
1930        let event = events.last().expect("audit event should exist");
1931        assert_eq!(event.actor, AuditActor::Http);
1932        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1933        assert_eq!(event.outcome, AuditOutcome::Success);
1934        assert_eq!(event.target, "rule-1");
1935        assert_eq!(event.details["operation"], "rule.upsert");
1936        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1937    }
1938
1939    #[tokio::test]
1940    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1941        let state = CoreState::new(None).await;
1942
1943        state
1944            .upsert_rule_from(
1945                AuditActor::Probe,
1946                "rule.upsert",
1947                "rule-1".to_string(),
1948                json!({ "tool": "set_rule" }),
1949                Rule {
1950                    id: "rule-1".to_string(),
1951                    name: "first".to_string(),
1952                    active: true,
1953                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1954                    priority: 1,
1955                    termination: relay_core_lib::rule::RuleTermination::Continue,
1956                    filter: relay_core_lib::rule::Filter::Url(
1957                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1958                    ),
1959                    actions: vec![],
1960                    constraints: None,
1961                },
1962            )
1963            .await
1964            .expect("initial upsert should succeed");
1965
1966        state
1967            .upsert_rule_from(
1968                AuditActor::Probe,
1969                "rule.upsert",
1970                "rule-1".to_string(),
1971                json!({ "tool": "set_rule" }),
1972                Rule {
1973                    id: "rule-1".to_string(),
1974                    name: "second".to_string(),
1975                    active: true,
1976                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1977                    priority: 2,
1978                    termination: relay_core_lib::rule::RuleTermination::Continue,
1979                    filter: relay_core_lib::rule::Filter::Url(
1980                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1981                    ),
1982                    actions: vec![],
1983                    constraints: None,
1984                },
1985            )
1986            .await
1987            .expect("replacement upsert should succeed");
1988
1989        let rules = state.get_rules().await;
1990        assert_eq!(rules.len(), 1);
1991        assert_eq!(rules[0].name, "second");
1992        let event = state
1993            .recent_audit_events()
1994            .last()
1995            .cloned()
1996            .expect("audit event");
1997        assert_eq!(event.details["operation"], "rule.upsert");
1998        assert_eq!(event.details["details"]["tool"], "set_rule");
1999    }
2000
2001    #[tokio::test]
2002    async fn delete_rule_from_returns_false_when_rule_missing() {
2003        let state = CoreState::new(None).await;
2004
2005        let deleted = state
2006            .delete_rule_from(
2007                AuditActor::Http,
2008                "rule.delete",
2009                "missing".to_string(),
2010                json!({ "route": "/api/v1/rules/{id}" }),
2011                "missing",
2012            )
2013            .await
2014            .expect("delete should not fail");
2015
2016        assert!(!deleted);
2017        assert!(state.recent_audit_events().is_empty());
2018    }
2019
2020    #[tokio::test]
2021    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2022        let state = CoreState::new(None).await;
2023
2024        let rule_id = state
2025            .create_mock_response_rule_from(
2026                AuditActor::Http,
2027                "api-mock-1".to_string(),
2028                json!({ "route": "/api/v1/mock", "status": 201 }),
2029                MockResponseRuleConfig {
2030                    rule_id: "api-mock-1".to_string(),
2031                    url_pattern: "example.com".to_string(),
2032                    name: "api-mock:example.com".to_string(),
2033                    status: 201,
2034                    content_type: "application/json".to_string(),
2035                    body: "{\"ok\":true}".to_string(),
2036                },
2037            )
2038            .await
2039            .expect("mock rule should be created");
2040
2041        assert_eq!(rule_id, "api-mock-1");
2042        let rules = state.get_rules().await;
2043        assert_eq!(rules.len(), 1);
2044        assert_eq!(rules[0].id, "api-mock-1");
2045        let event = state
2046            .recent_audit_events()
2047            .last()
2048            .cloned()
2049            .expect("audit event");
2050        assert_eq!(event.details["operation"], "rule.mock_create");
2051        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2052    }
2053
2054    #[tokio::test]
2055    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2056        let state = CoreState::new(None).await;
2057
2058        let rule_id = state
2059            .create_intercept_rule_from(
2060                AuditActor::Http,
2061                "intercept-1".to_string(),
2062                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2063                InterceptRuleConfig {
2064                    rule_id: "intercept-1".to_string(),
2065                    active: true,
2066                    url_pattern: "example.com".to_string(),
2067                    method: None,
2068                    phase: "request".to_string(),
2069                    name: "api-intercept:example.com".to_string(),
2070                    priority: 100,
2071                    termination: relay_core_lib::rule::RuleTermination::Stop,
2072                },
2073            )
2074            .await
2075            .expect("intercept rule should be created");
2076
2077        assert_eq!(rule_id, "intercept-1");
2078        let rules = state.get_rules().await;
2079        assert_eq!(rules.len(), 1);
2080        assert_eq!(rules[0].id, "intercept-1");
2081        assert_eq!(rules[0].name, "api-intercept:example.com");
2082        let event = state
2083            .recent_audit_events()
2084            .last()
2085            .cloned()
2086            .expect("audit event");
2087        assert_eq!(event.details["operation"], "rule.intercept_create");
2088        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2089    }
2090
2091    #[tokio::test]
2092    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2093        let state = CoreState::new(None).await;
2094
2095        state
2096            .upsert_legacy_intercept_rule_from(
2097                AuditActor::Tauri,
2098                "legacy-1".to_string(),
2099                json!({ "command": "set_intercept_rule" }),
2100                InterceptRule {
2101                    id: "legacy-1".to_string(),
2102                    active: true,
2103                    url_pattern: "example.com".to_string(),
2104                    method: None,
2105                    phase: "both".to_string(),
2106                },
2107            )
2108            .await
2109            .expect("initial family upsert should succeed");
2110        assert_eq!(state.get_rules().await.len(), 2);
2111
2112        state
2113            .upsert_legacy_intercept_rule_from(
2114                AuditActor::Tauri,
2115                "legacy-1".to_string(),
2116                json!({ "command": "set_intercept_rule" }),
2117                InterceptRule {
2118                    id: "legacy-1".to_string(),
2119                    active: true,
2120                    url_pattern: "example.org".to_string(),
2121                    method: Some("POST".to_string()),
2122                    phase: "request".to_string(),
2123                },
2124            )
2125            .await
2126            .expect("replacement family upsert should succeed");
2127
2128        let rules = state.get_rules().await;
2129        assert_eq!(rules.len(), 1);
2130        assert_eq!(rules[0].id, "legacy-1");
2131        let event = state
2132            .recent_audit_events()
2133            .last()
2134            .cloned()
2135            .expect("audit event");
2136        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2137        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2138    }
2139
2140    #[tokio::test]
2141    async fn resolve_intercept_failure_records_failed_audit_event() {
2142        let state = CoreState::new(None).await;
2143
2144        let result = state
2145            .resolve_intercept_with_modifications_from(
2146                AuditActor::Probe,
2147                "missing-flow:request".to_string(),
2148                "drop",
2149                None,
2150            )
2151            .await;
2152
2153        assert!(result.is_err());
2154        let events = state.recent_audit_events();
2155        let event = events.last().expect("audit event should exist");
2156        assert_eq!(event.actor, AuditActor::Probe);
2157        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2158        assert_eq!(event.outcome, AuditOutcome::Failed);
2159        assert_eq!(event.details["action"], "drop");
2160        assert!(
2161            event.details["error"]
2162                .as_str()
2163                .unwrap_or_default()
2164                .contains("Interception not found")
2165        );
2166    }
2167
2168    #[tokio::test]
2169    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2170        let state = CoreState::new(None).await;
2171        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2172
2173        state
2174            .prepare_start(8080, shutdown_tx)
2175            .expect("prepare start should succeed");
2176        let lifecycle = state.lifecycle();
2177        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2178        assert_eq!(lifecycle.port, Some(8080));
2179        assert!(lifecycle.started_at_ms.is_none());
2180        assert!(lifecycle.last_error.is_none());
2181
2182        assert_eq!(
2183            state.stop_proxy().expect("stop should succeed"),
2184            ProxyStopResult::Stopping
2185        );
2186        let lifecycle = state.lifecycle();
2187        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2188        assert_eq!(lifecycle.port, Some(8080));
2189        assert!(shutdown_rx.await.is_ok());
2190    }
2191
2192    #[tokio::test]
2193    async fn status_snapshot_derives_runtime_facing_fields() {
2194        let state = CoreState::new(None).await;
2195        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2196        state
2197            .prepare_start(8080, shutdown_tx)
2198            .expect("prepare start should succeed");
2199
2200        let status = state.status_snapshot();
2201        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2202        assert!(status.running);
2203        assert_eq!(status.port, Some(8080));
2204        assert!(status.uptime.is_none());
2205        assert!(status.last_error.is_none());
2206    }
2207
2208    #[tokio::test]
2209    async fn status_report_combines_status_and_metrics() {
2210        let state = CoreState::new(None).await;
2211        let report = state.status_report().await;
2212
2213        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2214        assert!(!report.status.running);
2215        assert_eq!(report.metrics.intercepts_pending, 0);
2216        assert_eq!(report.metrics.ws_pending_messages, 0);
2217        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2218        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2219        assert_eq!(report.metrics.audit_events_total, 0);
2220        assert_eq!(report.metrics.audit_events_failed, 0);
2221        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2222        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2223    }
2224
2225    #[test]
2226    fn proxy_config_new_and_transport_setters_preserve_values() {
2227        let config = ProxyConfig::new(
2228            8080,
2229            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2230            std::path::PathBuf::from("/tmp/ca_key.pem"),
2231        )
2232        .with_transparent(true)
2233        .with_udp_tproxy_port(Some(15000));
2234
2235        assert_eq!(config.port, 8080);
2236        assert_eq!(
2237            config.ca_cert_path,
2238            std::path::PathBuf::from("/tmp/ca_cert.pem")
2239        );
2240        assert_eq!(
2241            config.ca_key_path,
2242            std::path::PathBuf::from("/tmp/ca_key.pem")
2243        );
2244        assert!(config.transparent);
2245        assert_eq!(config.udp_tproxy_port, Some(15000));
2246    }
2247
2248    #[test]
2249    fn proxy_config_from_app_data_dir_creates_default_paths() {
2250        let unique = SystemTime::now()
2251            .duration_since(UNIX_EPOCH)
2252            .expect("clock drift")
2253            .as_nanos();
2254        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2255
2256        let config =
2257            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2258
2259        assert!(dir.exists());
2260        assert_eq!(config.port, 8899);
2261        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2262        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2263        assert!(!config.transparent);
2264        assert!(config.udp_tproxy_port.is_none());
2265    }
2266
2267    #[tokio::test]
2268    async fn intercept_snapshot_maps_pending_counts() {
2269        let state = CoreState::new(None).await;
2270        let snapshot = state.intercept_snapshot().await;
2271
2272        assert_eq!(snapshot.pending_count, 0);
2273        assert_eq!(snapshot.ws_pending_count, 0);
2274    }
2275
2276    #[tokio::test]
2277    async fn audit_snapshot_returns_latest_events_in_order() {
2278        let state = CoreState::new(None).await;
2279        state.record_audit_event(AuditEvent::new(
2280            AuditActor::Runtime,
2281            AuditEventKind::RuleChanged,
2282            "first",
2283            AuditOutcome::Success,
2284            json!({ "index": 1 }),
2285        ));
2286        state.record_audit_event(AuditEvent::new(
2287            AuditActor::Http,
2288            AuditEventKind::PolicyUpdated,
2289            "second",
2290            AuditOutcome::Success,
2291            json!({ "index": 2 }),
2292        ));
2293
2294        let snapshot = state.audit_snapshot(1);
2295
2296        assert_eq!(snapshot.events.len(), 1);
2297        assert_eq!(snapshot.events[0].target, "second");
2298        assert_eq!(snapshot.events[0].details["index"], 2);
2299    }
2300
2301    #[tokio::test]
2302    async fn query_audit_snapshot_filters_in_memory_events() {
2303        let state = CoreState::new(None).await;
2304        state.record_audit_event(AuditEvent::new(
2305            AuditActor::Http,
2306            AuditEventKind::RuleChanged,
2307            "rule-1",
2308            AuditOutcome::Success,
2309            json!({ "idx": 1 }),
2310        ));
2311        state.record_audit_event(AuditEvent::new(
2312            AuditActor::Probe,
2313            AuditEventKind::PolicyUpdated,
2314            "policy",
2315            AuditOutcome::Failed,
2316            json!({ "idx": 2 }),
2317        ));
2318
2319        let snapshot = state
2320            .query_audit_snapshot(CoreAuditQuery {
2321                actor: Some(AuditActor::Probe),
2322                kind: Some(AuditEventKind::PolicyUpdated),
2323                outcome: Some(AuditOutcome::Failed),
2324                limit: 10,
2325                ..Default::default()
2326            })
2327            .await;
2328
2329        assert_eq!(snapshot.events.len(), 1);
2330        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2331        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2332        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2333    }
2334
2335    #[tokio::test]
2336    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2337        let state = CoreState::new(Some(sqlite_url())).await;
2338        state.update_policy_from(
2339            AuditActor::Http,
2340            "policy".to_string(),
2341            ProxyPolicy {
2342                transparent_enabled: true,
2343                ..Default::default()
2344            },
2345        );
2346
2347        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2348        for _ in 0..10 {
2349            snapshot = state
2350                .query_audit_snapshot(CoreAuditQuery {
2351                    actor: Some(AuditActor::Http),
2352                    kind: Some(AuditEventKind::PolicyUpdated),
2353                    limit: 10,
2354                    ..Default::default()
2355                })
2356                .await;
2357            if !snapshot.events.is_empty() {
2358                break;
2359            }
2360            sleep(Duration::from_millis(20)).await;
2361        }
2362
2363        assert!(!snapshot.events.is_empty());
2364        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2365        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2366    }
2367
2368    #[tokio::test]
2369    async fn prepare_start_rejects_second_active_start() {
2370        let state = CoreState::new(None).await;
2371        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2372        state
2373            .prepare_start(8080, shutdown_tx)
2374            .expect("first start should succeed");
2375
2376        let (second_tx, _second_rx) = oneshot::channel();
2377        let error = state
2378            .prepare_start(8081, second_tx)
2379            .expect_err("second active start should be rejected");
2380        assert!(error.contains("already"));
2381    }
2382
2383    #[test]
2384    fn update_policy_records_audit_event() {
2385        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2386        let state = runtime.block_on(CoreState::new(None));
2387
2388        state.update_policy_from(
2389            AuditActor::Runtime,
2390            "policy".to_string(),
2391            ProxyPolicy {
2392                transparent_enabled: true,
2393                ..Default::default()
2394            },
2395        );
2396
2397        let events = state.recent_audit_events();
2398        let event = events.last().expect("audit event should exist");
2399        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2400        assert_eq!(event.outcome, AuditOutcome::Success);
2401        assert_eq!(event.details["transparent_enabled"], true);
2402    }
2403
2404    #[test]
2405    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2406        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2407        let state = runtime.block_on(CoreState::new(None));
2408        let original_timeout = state.policy_snapshot().request_timeout_ms;
2409
2410        state.patch_policy_from(
2411            AuditActor::Runtime,
2412            "policy.patch".to_string(),
2413            relay_core_api::policy::ProxyPolicyPatch {
2414                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2415                    enabled: Some(true),
2416                    redact_bodies: Some(true),
2417                    ..Default::default()
2418                }),
2419            },
2420        );
2421
2422        let policy = state.policy_snapshot();
2423        assert_eq!(policy.request_timeout_ms, original_timeout);
2424        assert!(policy.redaction.enabled);
2425        assert!(policy.redaction.redact_bodies);
2426
2427        let events = state.recent_audit_events();
2428        let event = events.last().expect("audit event should exist");
2429        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2430        assert_eq!(event.details["redaction_enabled"], true);
2431    }
2432
2433    #[tokio::test]
2434    async fn metrics_include_audit_and_lagged_event_counters() {
2435        let state = CoreState::new(None).await;
2436
2437        state.update_policy_from(
2438            AuditActor::Runtime,
2439            "policy".to_string(),
2440            ProxyPolicy::default(),
2441        );
2442        let _ = state
2443            .resolve_intercept_with_modifications_from(
2444                AuditActor::Probe,
2445                "missing-flow:request".to_string(),
2446                "drop",
2447                None,
2448            )
2449            .await;
2450
2451        state.record_flow_events_lagged(3);
2452        state.record_audit_events_lagged(5);
2453
2454        let metrics = state.get_metrics().await;
2455        assert_eq!(metrics.audit_events_total, 2);
2456        assert_eq!(metrics.audit_events_failed, 1);
2457        assert_eq!(metrics.flow_events_lagged_total, 3);
2458        assert_eq!(metrics.audit_events_lagged_total, 5);
2459    }
2460
2461    #[tokio::test]
2462    async fn prometheus_metrics_text_contains_observability_fields() {
2463        let state = CoreState::new(None).await;
2464        state.record_flow_events_lagged(2);
2465        state.record_audit_events_lagged(4);
2466
2467        let text = state.get_metrics_prometheus_text().await;
2468        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2469        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2470        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2471        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2472    }
2473
2474    #[tokio::test]
2475    async fn search_flows_uses_store_with_offset_pagination() {
2476        let state = CoreState::new(Some(sqlite_url())).await;
2477        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2478        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2479        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2480
2481        state.upsert_flow(Box::new(flow_a));
2482        state.upsert_flow(Box::new(flow_b));
2483        state.upsert_flow(Box::new(flow_c));
2484
2485        let mut baseline = Vec::new();
2486        for _ in 0..20 {
2487            baseline = state
2488                .search_flows(FlowQuery {
2489                    host: Some("api.example.com".to_string()),
2490                    path_contains: None,
2491                    method: None,
2492                    status_min: None,
2493                    status_max: None,
2494                    has_error: None,
2495                    is_websocket: None,
2496                    limit: Some(3),
2497                    offset: Some(0),
2498                })
2499                .await;
2500            if baseline.len() == 3 {
2501                break;
2502            }
2503            sleep(Duration::from_millis(20)).await;
2504        }
2505
2506        assert_eq!(baseline.len(), 3);
2507        let page = state
2508            .search_flows(FlowQuery {
2509                host: Some("api.example.com".to_string()),
2510                path_contains: None,
2511                method: None,
2512                status_min: None,
2513                status_max: None,
2514                has_error: None,
2515                is_websocket: None,
2516                limit: Some(1),
2517                offset: Some(1),
2518            })
2519            .await;
2520        assert_eq!(page.len(), 1);
2521        assert_eq!(page[0].id, baseline[1].id);
2522    }
2523
2524    #[tokio::test]
2525    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2526        let state = CoreState::new(Some(sqlite_url())).await;
2527        let first_flow = sample_http_flow(
2528            "persist.example.com",
2529            "/first",
2530            "GET",
2531            200,
2532            1_700_000_010_000,
2533        );
2534        let first_id = first_flow.id.to_string();
2535        state.upsert_flow(Box::new(first_flow));
2536        for i in 0..240 {
2537            state.upsert_flow(Box::new(sample_http_flow(
2538                "persist.example.com",
2539                &format!("/{}", i),
2540                "GET",
2541                200,
2542                1_700_000_020_000 + i,
2543            )));
2544        }
2545
2546        sleep(Duration::from_millis(200)).await;
2547
2548        let loaded = state.get_flow(first_id).await;
2549        assert!(loaded.is_some());
2550    }
2551
2552    #[tokio::test]
2553    async fn search_flows_redacts_summary_url_when_enabled() {
2554        let state = CoreState::new(None).await;
2555        state.update_policy_from(
2556            AuditActor::Runtime,
2557            "policy.redaction".to_string(),
2558            ProxyPolicy {
2559                redaction: RedactionPolicy {
2560                    enabled: true,
2561                    sensitive_query_keys: vec!["token".to_string()],
2562                    redact_bodies: false,
2563                    ..Default::default()
2564                },
2565                ..Default::default()
2566            },
2567        );
2568        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2569
2570        let mut items = Vec::new();
2571        for _ in 0..20 {
2572            items = state
2573                .search_flows(FlowQuery {
2574                    host: Some("api.example.com".to_string()),
2575                    path_contains: Some("/private".to_string()),
2576                    ..Default::default()
2577                })
2578                .await;
2579            if !items.is_empty() {
2580                break;
2581            }
2582            sleep(Duration::from_millis(20)).await;
2583        }
2584
2585        assert!(!items.is_empty());
2586        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2587        let token = redacted
2588            .query_pairs()
2589            .find(|(k, _)| k == "token")
2590            .map(|(_, v)| v.to_string());
2591        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2592    }
2593
2594    #[tokio::test]
2595    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2596        let state = CoreState::new(Some(sqlite_url())).await;
2597        state.update_policy_from(
2598            AuditActor::Runtime,
2599            "policy.redaction".to_string(),
2600            ProxyPolicy {
2601                redaction: RedactionPolicy {
2602                    enabled: true,
2603                    sensitive_header_names: vec![
2604                        "authorization".to_string(),
2605                        "set-cookie".to_string(),
2606                    ],
2607                    sensitive_query_keys: vec!["token".to_string()],
2608                    redact_bodies: true,
2609                },
2610                ..Default::default()
2611            },
2612        );
2613
2614        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2615        let flow_id = flow.id.to_string();
2616        state.upsert_flow(Box::new(flow));
2617        sleep(Duration::from_millis(80)).await;
2618
2619        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2620        let Layer::Http(http) = loaded.layer else {
2621            panic!("expected http layer");
2622        };
2623
2624        let auth = http
2625            .request
2626            .headers
2627            .iter()
2628            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2629            .map(|(_, v)| v.as_str());
2630        assert_eq!(auth, Some("[REDACTED]"));
2631
2632        let req_query_token = http
2633            .request
2634            .query
2635            .iter()
2636            .find(|(k, _)| k == "token")
2637            .map(|(_, v)| v.as_str());
2638        assert_eq!(req_query_token, Some("[REDACTED]"));
2639
2640        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2641        assert_eq!(req_body, Some("[REDACTED]"));
2642
2643        let response = http.response.expect("response should exist");
2644        let set_cookie = response
2645            .headers
2646            .iter()
2647            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2648            .map(|(_, v)| v.as_str());
2649        assert_eq!(set_cookie, Some("[REDACTED]"));
2650        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2651        assert_eq!(res_body, Some("[REDACTED]"));
2652    }
2653
2654    #[test]
2655    fn redact_flow_update_masks_http_body_when_enabled() {
2656        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2657        let state = runtime.block_on(CoreState::new(None));
2658        state.update_policy_from(
2659            AuditActor::Runtime,
2660            "policy.redaction".to_string(),
2661            ProxyPolicy {
2662                redaction: RedactionPolicy {
2663                    enabled: true,
2664                    redact_bodies: true,
2665                    ..Default::default()
2666                },
2667                ..Default::default()
2668            },
2669        );
2670
2671        let update = FlowUpdate::HttpBody {
2672            flow_id: "f-1".to_string(),
2673            direction: relay_core_api::flow::Direction::ClientToServer,
2674            body: BodyData {
2675                encoding: "utf-8".to_string(),
2676                content: "super-secret".to_string(),
2677                size: 12,
2678            },
2679        };
2680        let redacted = state.redact_flow_update_for_output(update);
2681        match redacted {
2682            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2683            _ => panic!("expected http body update"),
2684        }
2685    }
2686
2687    #[cfg(feature = "script")]
2688    #[tokio::test]
2689    async fn load_script_from_records_audit_event() {
2690        let state = CoreState::new(None).await;
2691
2692        state
2693            .load_script_from(
2694                AuditActor::Tauri,
2695                "tauri.load_script".to_string(),
2696                "globalThis.onRequestHeaders = (_flow) => {};",
2697            )
2698            .await
2699            .expect("script should load");
2700
2701        let events = state.recent_audit_events();
2702        let event = events.last().expect("audit event should exist");
2703        assert_eq!(event.actor, AuditActor::Tauri);
2704        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2705        assert_eq!(event.outcome, AuditOutcome::Success);
2706        assert_eq!(event.target, "tauri.load_script");
2707    }
2708}