Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod paths;
64pub mod rule;
65pub mod services;
66
67// ── Re-exports for user convenience ──
68// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
69pub use paths::CaPaths;
70pub use relay_core_api::{flow, policy};
71pub use relay_core_lib::InterceptionResult;
72pub use relay_core_lib::rule as lib_rule;
73
74use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
75use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
76use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
77
78pub mod rule_engine {
79    pub use relay_core_lib::rule_engine::*;
80}
81
82#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
83pub struct CoreMetrics {
84    pub flows_total: usize,
85    pub flows_in_memory: usize,
86    pub flows_dropped: usize,
87    pub intercepts_pending: usize,
88    pub ws_pending_messages: usize,
89    pub oldest_intercept_age_ms: Option<u64>,
90    pub oldest_ws_message_age_ms: Option<u64>,
91    pub rule_exec_errors: usize,
92    pub audit_events_total: usize,
93    pub audit_events_failed: usize,
94    pub flow_events_lagged_total: usize,
95    pub audit_events_lagged_total: usize,
96    /// O4: Total bodies degraded (budget exceeded)
97    pub proxy_body_degraded_total: usize,
98    /// O4: Total HTTP requests processed through streaming pipeline
99    pub proxy_http_request_total: usize,
100    /// O4: Total sandbox rejections
101    pub proxy_sandbox_reject_total: usize,
102    /// O4: Total invalid method rejections (strict_http_semantics)
103    pub proxy_invalid_method_total: usize,
104    /// O4: Total invalid status code rejections (strict_http_semantics)
105    pub proxy_invalid_status_total: usize,
106    /// O4: Total idempotent request retries
107    pub proxy_retry_total: usize,
108    /// O4: Total bodies processed in tap (streaming) mode
109    pub proxy_stream_mode_tap_total: usize,
110    /// O4: Total bodies degraded from tap to pass-through
111    pub proxy_stream_mode_degrade_total: usize,
112}
113
114impl CoreMetrics {
115    pub fn to_prometheus_text(&self) -> String {
116        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
117        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
118        format!(
119            "relay_core_flows_total {}\n\
120relay_core_flows_in_memory {}\n\
121relay_core_flows_dropped_total {}\n\
122relay_core_intercepts_pending {}\n\
123relay_core_ws_pending_messages {}\n\
124relay_core_oldest_intercept_age_ms {}\n\
125relay_core_oldest_ws_message_age_ms {}\n\
126relay_core_rule_exec_errors_total {}\n\
127relay_core_audit_events_total {}\n\
128relay_core_audit_events_failed_total {}\n\
129relay_core_flow_events_lagged_total {}\n\
130relay_core_audit_events_lagged_total {}\n\
131relay_core_proxy_body_degraded_total {}\n\
132relay_core_proxy_http_request_total {}\n\
133relay_core_proxy_sandbox_reject_total {}\n\
134relay_core_proxy_invalid_method_total {}\n\
135relay_core_proxy_invalid_status_total {}\n\
136relay_core_proxy_retry_total {}\n\
137relay_core_proxy_stream_mode_tap_total {}\n\
138relay_core_proxy_stream_mode_degrade_total {}\n",
139            self.flows_total,
140            self.flows_in_memory,
141            self.flows_dropped,
142            self.intercepts_pending,
143            self.ws_pending_messages,
144            oldest_intercept_age_ms,
145            oldest_ws_message_age_ms,
146            self.rule_exec_errors,
147            self.audit_events_total,
148            self.audit_events_failed,
149            self.flow_events_lagged_total,
150            self.audit_events_lagged_total,
151            self.proxy_body_degraded_total,
152            self.proxy_http_request_total,
153            self.proxy_sandbox_reject_total,
154            self.proxy_invalid_method_total,
155            self.proxy_invalid_status_total,
156            self.proxy_retry_total,
157            self.proxy_stream_mode_tap_total,
158            self.proxy_stream_mode_degrade_total,
159        )
160    }
161}
162
163#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
164pub struct CoreStatusSnapshot {
165    pub phase: RuntimeLifecyclePhase,
166    pub running: bool,
167    pub port: Option<u16>,
168    pub uptime: Option<u64>,
169    pub last_error: Option<String>,
170}
171
172#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
173pub struct CoreStatusReport {
174    pub status: CoreStatusSnapshot,
175    pub metrics: CoreMetrics,
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179pub struct CoreInterceptSnapshot {
180    pub pending_count: usize,
181    pub ws_pending_count: usize,
182}
183
184#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
185pub struct CoreAuditSnapshot {
186    pub events: Vec<AuditEvent>,
187}
188
189#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
190pub struct CoreAuditQuery {
191    pub since_ms: Option<u64>,
192    pub until_ms: Option<u64>,
193    pub actor: Option<AuditActor>,
194    pub kind: Option<AuditEventKind>,
195    pub outcome: Option<AuditOutcome>,
196    pub limit: usize,
197}
198
199impl Default for CoreAuditQuery {
200    fn default() -> Self {
201        Self {
202            since_ms: None,
203            until_ms: None,
204            actor: None,
205            kind: None,
206            outcome: None,
207            limit: 50,
208        }
209    }
210}
211
212#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
213#[serde(rename_all = "snake_case")]
214pub enum RuntimeLifecyclePhase {
215    Created,
216    Starting,
217    Running,
218    Stopping,
219    Stopped,
220    Failed,
221}
222
223impl RuntimeLifecyclePhase {
224    pub fn as_str(&self) -> &'static str {
225        match self {
226            Self::Created => "created",
227            Self::Starting => "starting",
228            Self::Running => "running",
229            Self::Stopping => "stopping",
230            Self::Stopped => "stopped",
231            Self::Failed => "failed",
232        }
233    }
234
235    pub fn is_active(&self) -> bool {
236        matches!(self, Self::Starting | Self::Running | Self::Stopping)
237    }
238}
239
240#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
241pub struct RuntimeLifecycle {
242    pub phase: RuntimeLifecyclePhase,
243    pub port: Option<u16>,
244    pub started_at_ms: Option<u64>,
245    pub last_error: Option<String>,
246}
247
248impl RuntimeLifecycle {
249    pub fn created() -> Self {
250        Self {
251            phase: RuntimeLifecyclePhase::Created,
252            port: None,
253            started_at_ms: None,
254            last_error: None,
255        }
256    }
257
258    pub fn is_active(&self) -> bool {
259        self.phase.is_active()
260    }
261
262    pub fn uptime_seconds(&self) -> Option<u64> {
263        let started_at_ms = self.started_at_ms?;
264        let now_ms = now_unix_ms();
265        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
266    }
267}
268
269pub enum ProxySpawnResult {
270    Started(tokio::task::JoinHandle<()>),
271    AlreadyRunning,
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub enum ProxyStopResult {
276    Stopping,
277    NotRunning,
278}
279
280impl From<RuntimeLifecycle> for CoreStatusSnapshot {
281    fn from(lifecycle: RuntimeLifecycle) -> Self {
282        Self {
283            running: lifecycle.is_active(),
284            uptime: lifecycle.uptime_seconds(),
285            port: lifecycle.port,
286            last_error: lifecycle.last_error,
287            phase: lifecycle.phase,
288        }
289    }
290}
291
292pub struct CoreState {
293    flow_store: mpsc::Sender<FlowStoreMessage>,
294    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
295    rule_store: mpsc::Sender<RuleStoreMessage>,
296    store: Option<Store>,
297    #[cfg(feature = "script")]
298    pub script_interceptor: Arc<ScriptInterceptor>,
299    pub policy_tx: watch::Sender<ProxyPolicy>,
300    pub flows_dropped: Arc<AtomicUsize>,
301    audit_events_total: Arc<AtomicUsize>,
302    audit_events_failed: Arc<AtomicUsize>,
303    flow_events_lagged_total: Arc<AtomicUsize>,
304    audit_events_lagged_total: Arc<AtomicUsize>,
305    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
306    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
307    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
308    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
309    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
310    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
311}
312
313impl CoreState {
314    pub async fn new(db_url: Option<String>) -> Self {
315        const AUDIT_HISTORY_LIMIT: usize = 200;
316
317        let store = if let Some(url) = db_url {
318            match Store::connect(&url).await {
319                Ok(s) => {
320                    if let Err(e) = s.init().await {
321                        tracing::error!("Failed to init store: {}", e);
322                    }
323                    Some(s)
324                }
325                Err(e) => {
326                    tracing::error!("Failed to connect to store: {}", e);
327                    None
328                }
329            }
330        } else {
331            None
332        };
333
334        let (flow_tx, flow_rx) = mpsc::channel(10000);
335        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
336        tokio::spawn(flow_actor.run());
337
338        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
339        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
340        tokio::spawn(intercept_actor.run());
341
342        let (rule_tx, rule_rx) = mpsc::channel(100);
343        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
344        tokio::spawn(rule_actor.run());
345
346        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
347        let (flow_broadcast_tx, _) = broadcast::channel(1000);
348        let (audit_broadcast_tx, _) = broadcast::channel(256);
349        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
350
351        #[cfg(feature = "script")]
352        let script_interceptor = ScriptInterceptor::new()
353            .await
354            .expect("Failed to initialize ScriptInterceptor");
355        Self {
356            flow_store: flow_tx,
357            intercept_broker: intercept_tx,
358            rule_store: rule_tx,
359            store,
360            #[cfg(feature = "script")]
361            script_interceptor: Arc::new(script_interceptor),
362            policy_tx,
363            flows_dropped: Arc::new(AtomicUsize::new(0)),
364            audit_events_total: Arc::new(AtomicUsize::new(0)),
365            audit_events_failed: Arc::new(AtomicUsize::new(0)),
366            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
367            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
368            flow_broadcast_tx,
369            audit_broadcast_tx,
370            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
371            lifecycle_tx,
372            shutdown_tx: Mutex::new(None),
373        }
374    }
375
376    pub async fn get_metrics(&self) -> CoreMetrics {
377        let (flow_tx, flow_rx) = oneshot::channel();
378        let _ = self
379            .flow_store
380            .send(FlowStoreMessage::GetMetrics(flow_tx))
381            .await;
382        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
383
384        let (int_tx, int_rx) = oneshot::channel();
385        let _ = self
386            .intercept_broker
387            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
388            .await;
389        let (
390            intercepts_pending,
391            ws_pending_messages,
392            oldest_intercept_age_ms,
393            oldest_ws_message_age_ms,
394        ) = int_rx.await.unwrap_or((0, 0, None, None));
395
396        let (rule_tx, rule_rx) = oneshot::channel();
397        let _ = self
398            .rule_store
399            .send(RuleStoreMessage::GetMetrics(rule_tx))
400            .await;
401        let rule_exec_errors = rule_rx.await.unwrap_or(0);
402
403        CoreMetrics {
404            flows_total,
405            flows_in_memory,
406            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
407            intercepts_pending,
408            ws_pending_messages,
409            oldest_intercept_age_ms,
410            oldest_ws_message_age_ms,
411            rule_exec_errors,
412            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
413            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
414            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
415            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
416            proxy_body_degraded_total: relay_core_lib::metrics::get_proxy_body_degraded(),
417            proxy_http_request_total: relay_core_lib::metrics::get_proxy_http_request(),
418            proxy_sandbox_reject_total: relay_core_lib::metrics::get_proxy_sandbox_reject(),
419            proxy_invalid_method_total: relay_core_lib::metrics::get_proxy_invalid_method(),
420            proxy_invalid_status_total: relay_core_lib::metrics::get_proxy_invalid_status(),
421            proxy_retry_total: relay_core_lib::metrics::get_proxy_retry(),
422            proxy_stream_mode_tap_total: relay_core_lib::metrics::get_proxy_stream_mode_tap(),
423            proxy_stream_mode_degrade_total: relay_core_lib::metrics::get_proxy_stream_mode_degrade(
424            ),
425        }
426    }
427
428    pub async fn get_metrics_prometheus_text(&self) -> String {
429        let mut text = self.get_metrics().await.to_prometheus_text();
430        #[cfg(feature = "script")]
431        {
432            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
433        }
434        text
435    }
436
437    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
438        self.lifecycle().into()
439    }
440
441    pub async fn status_report(&self) -> CoreStatusReport {
442        CoreStatusReport {
443            status: self.status_snapshot(),
444            metrics: self.get_metrics().await,
445        }
446    }
447
448    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
449        let metrics = self.get_metrics().await;
450        CoreInterceptSnapshot {
451            pending_count: metrics.intercepts_pending,
452            ws_pending_count: metrics.ws_pending_messages,
453        }
454    }
455
456    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
457        let events = self.recent_audit_events();
458        let start = events.len().saturating_sub(limit);
459        CoreAuditSnapshot {
460            events: events.into_iter().skip(start).collect(),
461        }
462    }
463
464    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
465        let limit = query.limit.clamp(1, 500);
466        if let Some(store) = &self.store {
467            let rows = store
468                .query_audit_events(
469                    query.since_ms,
470                    query.until_ms,
471                    query.actor.as_ref().map(AuditActor::as_str),
472                    query.kind.as_ref().map(AuditEventKind::as_str),
473                    query.outcome.as_ref().map(AuditOutcome::as_str),
474                    limit,
475                )
476                .await
477                .unwrap_or_default();
478
479            let mut events = Vec::with_capacity(rows.len());
480            for row in rows {
481                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
482                    events.push(event);
483                }
484            }
485            return CoreAuditSnapshot { events };
486        }
487
488        let mut events = self.recent_audit_events();
489        events.reverse();
490        let filtered = events
491            .into_iter()
492            .filter(|event| {
493                query
494                    .since_ms
495                    .map(|v| event.timestamp_ms >= v)
496                    .unwrap_or(true)
497            })
498            .filter(|event| {
499                query
500                    .until_ms
501                    .map(|v| event.timestamp_ms <= v)
502                    .unwrap_or(true)
503            })
504            .filter(|event| {
505                query
506                    .actor
507                    .as_ref()
508                    .map(|v| &event.actor == v)
509                    .unwrap_or(true)
510            })
511            .filter(|event| {
512                query
513                    .kind
514                    .as_ref()
515                    .map(|v| &event.kind == v)
516                    .unwrap_or(true)
517            })
518            .filter(|event| {
519                query
520                    .outcome
521                    .as_ref()
522                    .map(|v| &event.outcome == v)
523                    .unwrap_or(true)
524            })
525            .take(limit)
526            .collect();
527        CoreAuditSnapshot { events: filtered }
528    }
529
530    pub async fn get_flow(&self, id: String) -> Option<Flow> {
531        let (tx, rx) = oneshot::channel();
532        if let Err(e) = self
533            .flow_store
534            .send(FlowStoreMessage::GetFlow {
535                id: id.clone(),
536                respond_to: tx,
537            })
538            .await
539        {
540            error!("Failed to send GetFlow request: {}", e);
541            if let Some(store) = &self.store {
542                return store
543                    .load_flow(&id)
544                    .await
545                    .ok()
546                    .flatten()
547                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
548            }
549            return None;
550        }
551        let flow = rx
552            .await
553            .map_err(|e| {
554                error!("Failed to receive Flow response: {}", e);
555                e
556            })
557            .unwrap_or(None);
558        if let Some(flow) = flow {
559            return Some(redact_flow(flow, &self.current_redaction_policy()));
560        }
561        if let Some(store) = &self.store {
562            return store
563                .load_flow(&id)
564                .await
565                .ok()
566                .flatten()
567                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
568                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
569        }
570        None
571    }
572
573    pub async fn get_rules(&self) -> Vec<Rule> {
574        let (tx, rx) = oneshot::channel();
575        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
576            error!("Failed to send GetRules request: {}", e);
577            return Vec::new();
578        }
579        rx.await
580            .map_err(|e| {
581                error!("Failed to receive Rules response: {}", e);
582                e
583            })
584            .unwrap_or_default()
585    }
586
587    pub async fn set_rules(&self, rules: Vec<Rule>) {
588        let _ = self
589            .set_rules_from(
590                AuditActor::Runtime,
591                "rules.replace",
592                "rule_set".to_string(),
593                json!({}),
594                rules,
595            )
596            .await;
597    }
598
599    pub async fn upsert_rule_from(
600        &self,
601        actor: AuditActor,
602        operation: &str,
603        target: String,
604        details: serde_json::Value,
605        rule: Rule,
606    ) -> Result<(), String> {
607        let rule_id = rule.id.clone();
608        let mut rules = self.get_rules().await;
609        rules.retain(|existing| existing.id != rule_id);
610        rules.push(rule);
611        self.set_rules_from(actor, operation, target, details, rules)
612            .await
613    }
614
615    pub async fn delete_rule_from(
616        &self,
617        actor: AuditActor,
618        operation: &str,
619        target: String,
620        details: serde_json::Value,
621        rule_id: &str,
622    ) -> Result<bool, String> {
623        let mut rules = self.get_rules().await;
624        let before = rules.len();
625        rules.retain(|rule| rule.id != rule_id);
626        if rules.len() == before {
627            return Ok(false);
628        }
629        self.set_rules_from(actor, operation, target, details, rules)
630            .await
631            .map(|_| true)
632    }
633
634    pub async fn create_mock_response_rule_from(
635        &self,
636        actor: AuditActor,
637        target: String,
638        details: serde_json::Value,
639        config: MockResponseRuleConfig,
640    ) -> Result<String, String> {
641        let rule_id = config.rule_id.clone();
642        let rule = build_mock_response_rule(config);
643        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
644            .await
645            .map(|_| rule_id)
646    }
647
648    pub async fn create_intercept_rule_from(
649        &self,
650        actor: AuditActor,
651        target: String,
652        details: serde_json::Value,
653        config: InterceptRuleConfig,
654    ) -> Result<String, String> {
655        let rule_id = config.rule_id.clone();
656        let mut rules = self.get_rules().await;
657        rules.extend(build_intercept_rules(config));
658        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
659            .await
660            .map(|_| rule_id)
661    }
662
663    pub async fn upsert_legacy_intercept_rule_from(
664        &self,
665        actor: AuditActor,
666        target: String,
667        details: serde_json::Value,
668        rule: InterceptRule,
669    ) -> Result<(), String> {
670        let rule_id = rule.id.clone();
671        let mut rules = self.get_rules().await;
672        rules.retain(|existing| {
673            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
674        });
675        rules.extend(rule.to_rules());
676        self.set_rules_from(
677            actor,
678            "rule.intercept_legacy_upsert",
679            target,
680            details,
681            rules,
682        )
683        .await
684    }
685
686    pub async fn set_rules_from(
687        &self,
688        actor: AuditActor,
689        operation: &str,
690        target: String,
691        details: serde_json::Value,
692        rules: Vec<Rule>,
693    ) -> Result<(), String> {
694        let rule_count = rules.len();
695        if let Err(e) = self
696            .rule_store
697            .send(RuleStoreMessage::SetRules(rules))
698            .await
699        {
700            error!("Failed to send SetRules request: {}", e);
701            self.record_audit_event(AuditEvent::new(
702                actor,
703                AuditEventKind::RuleChanged,
704                target,
705                AuditOutcome::Failed,
706                json!({
707                    "operation": operation,
708                    "rule_count": rule_count,
709                    "details": details,
710                    "error": e.to_string()
711                }),
712            ));
713            return Err(e.to_string());
714        }
715
716        self.record_audit_event(AuditEvent::new(
717            actor,
718            AuditEventKind::RuleChanged,
719            target,
720            AuditOutcome::Success,
721            json!({
722                "operation": operation,
723                "rule_count": rule_count,
724                "details": details
725            }),
726        ));
727        Ok(())
728    }
729
730    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
731        let mut new_rules = Vec::new();
732        for rule in rules {
733            new_rules.extend(rule.to_rules());
734        }
735        self.set_rules(new_rules).await;
736    }
737
738    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
739        let (tx, rx) = oneshot::channel();
740        if let Err(e) = self
741            .rule_store
742            .send(RuleStoreMessage::GetRuleEngine(tx))
743            .await
744        {
745            error!("Failed to send GetRuleEngine request: {}", e);
746            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
747        }
748        rx.await
749            .map_err(|e| {
750                error!("Failed to receive RuleEngine response: {}", e);
751                e
752            })
753            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
754    }
755
756    pub fn update_policy(&self, policy: ProxyPolicy) {
757        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
758    }
759
760    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
761        let mut policy = self.policy_snapshot();
762        policy.apply_patch(patch);
763        self.update_policy_from(actor, target, policy);
764    }
765
766    pub fn policy_snapshot(&self) -> ProxyPolicy {
767        self.policy_tx.borrow().clone()
768    }
769
770    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
771        let details = json!({
772            "strict_http_semantics": policy.strict_http_semantics,
773            "request_timeout_ms": policy.request_timeout_ms,
774            "max_body_size": policy.max_body_size,
775            "transparent_enabled": policy.transparent_enabled,
776            "redaction_enabled": policy.redaction.enabled,
777            "redact_bodies": policy.redaction.redact_bodies
778        });
779
780        self.policy_tx.send_replace(policy);
781        self.record_audit_event(AuditEvent::new(
782            actor,
783            AuditEventKind::PolicyUpdated,
784            target,
785            AuditOutcome::Success,
786            details,
787        ));
788    }
789
790    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
791        if let Err(e) = self
792            .intercept_broker
793            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
794            .await
795        {
796            error!("Failed to send RegisterIntercept request: {}", e);
797        }
798    }
799
800    pub async fn resolve_intercept(
801        &self,
802        key: String,
803        result: InterceptionResult,
804    ) -> Result<(), String> {
805        let (tx, rx) = oneshot::channel();
806        if let Err(e) = self
807            .intercept_broker
808            .send(InterceptBrokerMessage::ResolveIntercept {
809                key,
810                result,
811                respond_to: tx,
812            })
813            .await
814        {
815            error!("Failed to send ResolveIntercept request: {}", e);
816            return Err(e.to_string());
817        }
818        rx.await.map_err(|_| "Actor dropped".to_string())?
819    }
820
821    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
822        let (tx, rx) = oneshot::channel();
823        if let Err(e) = self
824            .intercept_broker
825            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
826                key,
827                respond_to: tx,
828            })
829            .await
830        {
831            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
832            return None;
833        }
834        rx.await
835            .map_err(|e| {
836                error!("Failed to receive WebSocketMessage response: {}", e);
837                e
838            })
839            .unwrap_or(None)
840    }
841
842    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
843        if let Err(e) = self
844            .intercept_broker
845            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
846            .await
847        {
848            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
849        }
850    }
851
852    pub async fn is_intercept_pending(&self, key: String) -> bool {
853        let (tx, rx) = oneshot::channel();
854        if let Err(e) = self
855            .intercept_broker
856            .send(InterceptBrokerMessage::GetPendingIntercept {
857                key,
858                respond_to: tx,
859            })
860            .await
861        {
862            error!("Failed to send GetPendingIntercept request: {}", e);
863            return false;
864        }
865        rx.await
866            .map_err(|e| {
867                error!("Failed to receive PendingIntercept response: {}", e);
868                e
869            })
870            .unwrap_or(false)
871    }
872
873    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
874        let (tx, rx) = oneshot::channel();
875        if let Err(e) = self
876            .intercept_broker
877            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
878                flow_id,
879                respond_to: tx,
880            })
881            .await
882        {
883            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
884            return false;
885        }
886        rx.await
887            .map_err(|e| {
888                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
889                e
890            })
891            .unwrap_or(false)
892    }
893
894    pub fn upsert_flow(&self, flow: Box<Flow>) {
895        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
896            // Drop flow if channel is full
897            error!("FlowStore dropped flow: {}", e);
898            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
899        }
900    }
901
902    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
903        if let Err(e) = self
904            .flow_store
905            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
906        {
907            error!("FlowStore dropped WS message: {}", e);
908            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
909        }
910    }
911
912    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
913        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
914            flow_id,
915            body,
916            direction,
917        }) {
918            error!("FlowStore dropped HTTP body: {}", e);
919            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
920        }
921    }
922
923    /// P1: Tag a flow as budget-exceeded (body too large for full rule inspection)
924    pub fn tag_flow_budget_exceeded(&self, flow_id: String, direction: Direction) {
925        if let Err(e) = self
926            .flow_store
927            .try_send(FlowStoreMessage::TagBudgetExceeded { flow_id, direction })
928        {
929            error!("FlowStore dropped budget-exceeded tag: {}", e);
930            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
931        }
932    }
933
934    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
935    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
936        self.flow_broadcast_tx.subscribe()
937    }
938
939    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
940        self.audit_broadcast_tx.subscribe()
941    }
942
943    fn current_redaction_policy(&self) -> RedactionPolicy {
944        self.policy_tx.borrow().redaction.clone()
945    }
946
947    pub fn record_flow_events_lagged(&self, skipped: u64) {
948        self.flow_events_lagged_total
949            .fetch_add(skipped as usize, Ordering::Relaxed);
950    }
951
952    pub fn record_audit_events_lagged(&self, skipped: u64) {
953        self.audit_events_lagged_total
954            .fetch_add(skipped as usize, Ordering::Relaxed);
955    }
956
957    pub fn lifecycle(&self) -> RuntimeLifecycle {
958        self.lifecycle_tx.borrow().clone()
959    }
960
961    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
962        self.lifecycle_tx.subscribe()
963    }
964
965    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
966        let current = self.lifecycle();
967        if current.is_active() {
968            return Err(format!(
969                "Proxy is already {} on port {}",
970                current.phase.as_str(),
971                current.port.unwrap_or(port)
972            ));
973        }
974
975        let mut guard = self
976            .shutdown_tx
977            .lock()
978            .map_err(|_| "shutdown state poisoned".to_string())?;
979        *guard = Some(shutdown_tx);
980        drop(guard);
981
982        self.update_lifecycle(RuntimeLifecycle {
983            phase: RuntimeLifecyclePhase::Starting,
984            port: Some(port),
985            started_at_ms: None,
986            last_error: None,
987        });
988        Ok(())
989    }
990
991    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
992        let mut guard = self
993            .shutdown_tx
994            .lock()
995            .map_err(|_| "shutdown state poisoned".to_string())?;
996        let Some(tx) = guard.take() else {
997            return Ok(ProxyStopResult::NotRunning);
998        };
999        drop(guard);
1000
1001        let current = self.lifecycle();
1002        self.update_lifecycle(RuntimeLifecycle {
1003            phase: RuntimeLifecyclePhase::Stopping,
1004            port: current.port,
1005            started_at_ms: current.started_at_ms,
1006            last_error: current.last_error,
1007        });
1008        let _ = tx.send(());
1009        Ok(ProxyStopResult::Stopping)
1010    }
1011
1012    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
1013        self.audit_history
1014            .lock()
1015            .map(|events| events.iter().cloned().collect())
1016            .unwrap_or_default()
1017    }
1018
1019    /// 搜索内存中的 Flow 列表,返回轻量摘要。
1020    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
1021        let redaction = self.current_redaction_policy();
1022        if let Some(store) = &self.store {
1023            return store
1024                .query_flow_summaries(&query)
1025                .await
1026                .unwrap_or_default()
1027                .into_iter()
1028                .map(|summary| redact_flow_summary(summary, &redaction))
1029                .collect();
1030        }
1031        let (tx, rx) = oneshot::channel();
1032        if let Err(e) = self
1033            .flow_store
1034            .send(FlowStoreMessage::SearchFlows {
1035                query,
1036                respond_to: tx,
1037            })
1038            .await
1039        {
1040            error!("Failed to send SearchFlows request: {}", e);
1041            return Vec::new();
1042        }
1043        rx.await
1044            .unwrap_or_default()
1045            .into_iter()
1046            .map(|summary| redact_flow_summary(summary, &redaction))
1047            .collect()
1048    }
1049
1050    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
1051        let redaction = self.current_redaction_policy();
1052        redact_flow_update(update, &redaction)
1053    }
1054
1055    /// 解除截获并可选地应用修改。
1056    ///
1057    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1058    /// - 若 `mods` 为 None,直接 Continue;
1059    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1060    ///
1061    /// `key` 格式约定:
1062    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1063    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1064    pub async fn resolve_intercept_with_modifications(
1065        &self,
1066        key: String,
1067        action: &str,
1068        mods: Option<relay_core_api::modification::FlowModification>,
1069    ) -> Result<(), String> {
1070        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1071            .await
1072    }
1073
1074    pub async fn resolve_intercept_with_modifications_from(
1075        &self,
1076        actor: AuditActor,
1077        key: String,
1078        action: &str,
1079        mods: Option<relay_core_api::modification::FlowModification>,
1080    ) -> Result<(), String> {
1081        let modified_fields = modification_field_names(mods.as_ref());
1082        let result = match action {
1083            "drop" => InterceptionResult::Drop,
1084            _ => match mods {
1085                None => InterceptionResult::Continue,
1086                Some(m) => {
1087                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1088                    match parts.as_slice() {
1089                        [flow_id, phase] => {
1090                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1091                                modification::apply_flow_modification(&flow, phase, m)
1092                            } else {
1093                                InterceptionResult::Continue
1094                            }
1095                        }
1096                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1097                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1098                        [_, "ws_msg", _] => {
1099                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1100                                modification::apply_ws_modification(&msg, m)
1101                            } else {
1102                                InterceptionResult::Continue
1103                            }
1104                        }
1105                        _ => InterceptionResult::Continue,
1106                    }
1107                }
1108            },
1109        };
1110        let outcome = self.resolve_intercept(key.clone(), result).await;
1111        let audit_outcome = if outcome.is_ok() {
1112            AuditOutcome::Success
1113        } else {
1114            AuditOutcome::Failed
1115        };
1116        let error_message = outcome.as_ref().err().cloned();
1117
1118        self.record_audit_event(AuditEvent::new(
1119            actor,
1120            AuditEventKind::InterceptResolved,
1121            key,
1122            audit_outcome,
1123            json!({
1124                "action": action,
1125                "has_modifications": !modified_fields.is_empty(),
1126                "modified_fields": modified_fields,
1127                "error": error_message
1128            }),
1129        ));
1130        outcome
1131    }
1132
1133    #[cfg(feature = "script")]
1134    pub async fn load_script_from(
1135        &self,
1136        actor: AuditActor,
1137        target: String,
1138        script: &str,
1139    ) -> Result<(), String> {
1140        let result = self
1141            .script_interceptor
1142            .load_script(script)
1143            .await
1144            .map_err(|e| e.to_string());
1145
1146        let outcome = if result.is_ok() {
1147            AuditOutcome::Success
1148        } else {
1149            AuditOutcome::Failed
1150        };
1151        let error_message = result.as_ref().err().cloned();
1152
1153        self.record_audit_event(AuditEvent::new(
1154            actor,
1155            AuditEventKind::ScriptReloaded,
1156            target,
1157            outcome,
1158            json!({
1159                "script_bytes": script.len(),
1160                "error": error_message
1161            }),
1162        ));
1163
1164        result
1165    }
1166
1167    /// S5: Set the env var whitelist for relay.env() in user scripts.
1168    #[cfg(feature = "script")]
1169    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1170        self.script_interceptor.set_env_allow(env_allow).await;
1171    }
1172
1173    fn record_audit_event(&self, event: AuditEvent) {
1174        const AUDIT_HISTORY_LIMIT: usize = 200;
1175        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1176        if event.outcome == AuditOutcome::Failed {
1177            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1178        }
1179
1180        let details = event.details.to_string();
1181        tracing::info!(
1182            target: "relay_core_audit",
1183            event_id = %event.id,
1184            actor = %event.actor.as_str(),
1185            kind = %event.kind.as_str(),
1186            target = %event.target,
1187            outcome = %event.outcome.as_str(),
1188            details = %details
1189        );
1190
1191        if let Ok(mut history) = self.audit_history.lock() {
1192            if history.len() >= AUDIT_HISTORY_LIMIT {
1193                history.pop_front();
1194            }
1195            history.push_back(event.clone());
1196        }
1197
1198        if let Some(store) = self.store.clone() {
1199            let event_json = serde_json::to_value(&event).unwrap_or_default();
1200            let event_id = event.id.clone();
1201            let timestamp_ms = event.timestamp_ms;
1202            let actor = event.actor.as_str().to_string();
1203            let kind = event.kind.as_str().to_string();
1204            let target = event.target.clone();
1205            let outcome = event.outcome.as_str().to_string();
1206            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1207                handle.spawn(async move {
1208                    if let Err(e) = store
1209                        .save_audit_event(AuditEventRecord {
1210                            id: &event_id,
1211                            timestamp_ms,
1212                            actor: &actor,
1213                            kind: &kind,
1214                            target: &target,
1215                            outcome: &outcome,
1216                            content: &event_json,
1217                        })
1218                        .await
1219                    {
1220                        tracing::error!("Failed to persist audit event: {}", e);
1221                    }
1222                });
1223            }
1224        }
1225
1226        let _ = self.audit_broadcast_tx.send(event);
1227    }
1228
1229    fn transition_to_running(&self, port: u16) {
1230        self.update_lifecycle(RuntimeLifecycle {
1231            phase: RuntimeLifecyclePhase::Running,
1232            port: Some(port),
1233            started_at_ms: Some(now_unix_ms()),
1234            last_error: None,
1235        });
1236    }
1237
1238    fn transition_to_stopped(&self) {
1239        if let Ok(mut guard) = self.shutdown_tx.lock() {
1240            *guard = None;
1241        }
1242
1243        self.update_lifecycle(RuntimeLifecycle {
1244            phase: RuntimeLifecyclePhase::Stopped,
1245            port: None,
1246            started_at_ms: None,
1247            last_error: None,
1248        });
1249    }
1250
1251    fn transition_to_failed(&self, port: u16, error: String) {
1252        if let Ok(mut guard) = self.shutdown_tx.lock() {
1253            *guard = None;
1254        }
1255
1256        self.update_lifecycle(RuntimeLifecycle {
1257            phase: RuntimeLifecyclePhase::Failed,
1258            port: Some(port),
1259            started_at_ms: None,
1260            last_error: Some(error),
1261        });
1262    }
1263
1264    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1265        tracing::info!(
1266            target: "relay_core_lifecycle",
1267            phase = %lifecycle.phase.as_str(),
1268            port = ?lifecycle.port,
1269            started_at_ms = ?lifecycle.started_at_ms,
1270            last_error = ?lifecycle.last_error
1271        );
1272        self.lifecycle_tx.send_replace(lifecycle);
1273    }
1274
1275    pub fn spawn_proxy(
1276        self: &Arc<Self>,
1277        config: ProxyConfig,
1278        sink: mpsc::Sender<FlowUpdate>,
1279        extra_interceptor: Option<Arc<dyn Interceptor>>,
1280    ) -> Result<ProxySpawnResult, String> {
1281        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1282        match self.prepare_start(config.port, shutdown_tx) {
1283            Ok(()) => {}
1284            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1285            Err(error) => return Err(error),
1286        }
1287        let state = self.clone();
1288        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1289            if let Err(error) = state
1290                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1291                .await
1292            {
1293                error!("Proxy failed: {}", error);
1294            }
1295        })))
1296    }
1297
1298    pub async fn start_proxy(
1299        self: &Arc<Self>,
1300        config: ProxyConfig,
1301        sink: mpsc::Sender<FlowUpdate>,
1302        extra_interceptor: Option<Arc<dyn Interceptor>>,
1303    ) -> Result<(), String> {
1304        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1305        self.prepare_start(config.port, shutdown_tx)?;
1306        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1307            .await
1308    }
1309
1310    async fn run_proxy(
1311        self: &Arc<Self>,
1312        config: ProxyConfig,
1313        sink: mpsc::Sender<FlowUpdate>,
1314        extra_interceptor: Option<Arc<dyn Interceptor>>,
1315        shutdown_rx: oneshot::Receiver<()>,
1316    ) -> Result<(), String> {
1317        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1318        let state = self.clone();
1319
1320        if let Some(parent) = config.ca_cert_path.parent()
1321            && !parent.exists()
1322        {
1323            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1324        }
1325
1326        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1327            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1328        let ca = Arc::new(ca);
1329
1330        #[cfg(feature = "script")]
1331        let script_interceptor = self.script_interceptor.clone();
1332
1333        #[cfg(feature = "script")]
1334        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1335
1336        #[cfg(not(feature = "script"))]
1337        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1338
1339        interceptors.push(Arc::new(interceptors::rule::RuleInterceptor::new(
1340            self.clone(),
1341        )));
1342
1343        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1344            self.clone(),
1345        )));
1346
1347        if let Some(interceptor) = extra_interceptor {
1348            interceptors.push(interceptor);
1349        }
1350
1351        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1352        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1353
1354        tokio::spawn(async move {
1355            while let Some(update) = proxy_rx.recv().await {
1356                match update.clone() {
1357                    FlowUpdate::Full(flow) => {
1358                        state.upsert_flow(flow);
1359                    }
1360                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1361                        state.append_ws_message(flow_id, message);
1362                    }
1363                    FlowUpdate::HttpBody {
1364                        flow_id,
1365                        direction,
1366                        body,
1367                    } => {
1368                        state.update_http_body(flow_id, body, direction);
1369                    }
1370                    FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1371                        // P1: Tag the flow as budget-exceeded and update resilience trace
1372                        state.tag_flow_budget_exceeded(flow_id, direction);
1373                    }
1374                }
1375
1376                let _ = state.flow_broadcast_tx.send(update.clone());
1377
1378                if sink.try_send(update).is_err() {
1379                    relay_core_lib::metrics::inc_flows_dropped();
1380                }
1381            }
1382        });
1383
1384        if let Some(udp_port) = config.udp_tproxy_port {
1385            let udp_proxy_tx = proxy_tx.clone();
1386            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1387            tokio::spawn(async move {
1388                match tokio::net::UdpSocket::bind(udp_addr).await {
1389                    Ok(socket) => {
1390                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1391                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1392                            error!("UDP TPROXY failed: {}", e);
1393                        }
1394                    }
1395                    Err(e) => {
1396                        error!("Failed to bind UDP TPROXY socket: {}", e);
1397                    }
1398                }
1399            });
1400        }
1401
1402        let listener = match TcpListener::bind(addr).await {
1403            Ok(listener) => listener,
1404            Err(e) => {
1405                if let Ok(mut guard) = self.shutdown_tx.lock() {
1406                    *guard = None;
1407                }
1408                let message = format!("Failed to bind to address {}: {}", addr, e);
1409                self.transition_to_failed(config.port, message.clone());
1410                return Err(message);
1411            }
1412        };
1413
1414        self.transition_to_running(config.port);
1415        let shutdown_rx = Some(shutdown_rx);
1416
1417        if config.transparent {
1418            let policy = ProxyPolicy {
1419                transparent_enabled: true,
1420                ..Default::default()
1421            };
1422            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1423            let policy_rx = self.policy_tx.subscribe();
1424
1425            let provider: Arc<dyn OriginalDstProvider> = {
1426                let mut addrs = BTreeSet::new();
1427                if let Ok(local) = listener.local_addr() {
1428                    addrs.insert(local);
1429                }
1430
1431                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1432                {
1433                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1434                }
1435                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1436                {
1437                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1438                        Ok(provider) => Arc::new(provider),
1439                        Err(e) => {
1440                            error!("Failed to initialize macOS PF provider: {}", e);
1441                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1442                        }
1443                    }
1444                }
1445                #[cfg(target_os = "windows")]
1446                {
1447                    let filter =
1448                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1449                            .to_string();
1450                    let port = config.port;
1451                    tokio::spawn(async move {
1452                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1453                            .await;
1454                    });
1455
1456                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1457                }
1458                #[cfg(not(any(
1459                    all(target_os = "linux", feature = "transparent-linux"),
1460                    all(target_os = "macos", feature = "transparent-macos"),
1461                    target_os = "windows"
1462                )))]
1463                {
1464                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1465                }
1466            };
1467
1468            let source = TransparentTcpCaptureSource::new(listener, provider);
1469            let result = relay_core_lib::start_proxy(
1470                source,
1471                proxy_tx,
1472                interceptor,
1473                ca,
1474                policy_rx,
1475                None,
1476                shutdown_rx,
1477                Some(self.get_rule_engine().await),
1478            )
1479            .await
1480            .map_err(|e| e.to_string());
1481            if let Err(error) = &result {
1482                self.transition_to_failed(config.port, error.clone());
1483            } else {
1484                self.transition_to_stopped();
1485            }
1486            result
1487        } else {
1488            let source = TcpCaptureSource::new(listener);
1489            let policy = ProxyPolicy::default();
1490            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1491            let policy_rx = self.policy_tx.subscribe();
1492
1493            let result = relay_core_lib::start_proxy(
1494                source,
1495                proxy_tx,
1496                interceptor,
1497                ca,
1498                policy_rx,
1499                None,
1500                shutdown_rx,
1501                Some(self.get_rule_engine().await),
1502            )
1503            .await
1504            .map_err(|e| e.to_string());
1505            if let Err(error) = &result {
1506                self.transition_to_failed(config.port, error.clone());
1507            } else {
1508                self.transition_to_stopped();
1509            }
1510            result
1511        }
1512    }
1513}
1514
1515fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1516    match update {
1517        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1518        FlowUpdate::WebSocketMessage {
1519            flow_id,
1520            mut message,
1521        } => {
1522            message.content = redact_body(message.content, redaction);
1523            FlowUpdate::WebSocketMessage { flow_id, message }
1524        }
1525        FlowUpdate::HttpBody {
1526            flow_id,
1527            direction,
1528            body,
1529        } => FlowUpdate::HttpBody {
1530            flow_id,
1531            direction,
1532            body: redact_body(body, redaction),
1533        },
1534        FlowUpdate::BodyBudgetExceeded { flow_id, direction } => {
1535            FlowUpdate::BodyBudgetExceeded { flow_id, direction }
1536        }
1537    }
1538}
1539
1540fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1541    if !redaction.enabled {
1542        return flow;
1543    }
1544    match &mut flow.layer {
1545        Layer::Http(http) => {
1546            redact_http_request(&mut http.request, redaction);
1547            if let Some(response) = &mut http.response {
1548                redact_headers(&mut response.headers, redaction);
1549                response.body = response
1550                    .body
1551                    .take()
1552                    .map(|body| redact_body(body, redaction));
1553            }
1554        }
1555        Layer::WebSocket(ws) => {
1556            redact_http_request(&mut ws.handshake_request, redaction);
1557            redact_headers(&mut ws.handshake_response.headers, redaction);
1558            ws.handshake_response.body = ws
1559                .handshake_response
1560                .body
1561                .take()
1562                .map(|body| redact_body(body, redaction));
1563            for message in &mut ws.messages {
1564                message.content = redact_body(message.content.clone(), redaction);
1565            }
1566        }
1567        _ => {}
1568    }
1569    flow
1570}
1571
1572fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1573    if !redaction.enabled {
1574        return summary;
1575    }
1576    summary.url = redact_url_string(&summary.url, redaction);
1577    summary
1578}
1579
1580fn redact_http_request(
1581    request: &mut relay_core_api::flow::HttpRequest,
1582    redaction: &RedactionPolicy,
1583) {
1584    redact_headers(&mut request.headers, redaction);
1585    redact_query_pairs(&mut request.query, redaction);
1586    request.url = redact_url(&request.url, redaction);
1587    request.body = request.body.take().map(|body| redact_body(body, redaction));
1588}
1589
1590fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1591    if !redaction.enabled {
1592        return;
1593    }
1594    let sensitive = redaction_set(&redaction.sensitive_header_names);
1595    for (name, value) in headers.iter_mut() {
1596        if sensitive.contains(&name.to_ascii_lowercase()) {
1597            *value = "[REDACTED]".to_string();
1598        }
1599    }
1600}
1601
1602fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1603    if !redaction.enabled {
1604        return;
1605    }
1606    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1607    for (name, value) in query.iter_mut() {
1608        if sensitive.contains(&name.to_ascii_lowercase()) {
1609            *value = "[REDACTED]".to_string();
1610        }
1611    }
1612}
1613
1614fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1615    if !redaction.enabled {
1616        return url.clone();
1617    }
1618    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1619    let pairs: Vec<(String, String)> = url
1620        .query_pairs()
1621        .map(|(k, v)| {
1622            let key = k.to_string();
1623            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1624                "[REDACTED]".to_string()
1625            } else {
1626                v.to_string()
1627            };
1628            (key, value)
1629        })
1630        .collect();
1631    let mut next = url.clone();
1632    if pairs.is_empty() {
1633        return next;
1634    }
1635    next.query_pairs_mut().clear();
1636    for (k, v) in pairs {
1637        next.query_pairs_mut().append_pair(&k, &v);
1638    }
1639    next
1640}
1641
1642fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1643    match url::Url::parse(input) {
1644        Ok(url) => redact_url(&url, redaction).to_string(),
1645        Err(_) => input.to_string(),
1646    }
1647}
1648
1649fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1650    if redaction.enabled && redaction.redact_bodies {
1651        body.content = "[REDACTED]".to_string();
1652    }
1653    body
1654}
1655
1656fn redaction_set(values: &[String]) -> HashSet<String> {
1657    values
1658        .iter()
1659        .map(|value| value.to_ascii_lowercase())
1660        .collect()
1661}
1662
1663#[derive(Clone)]
1664pub struct ProxyConfig {
1665    pub port: u16,
1666    pub ca_cert_path: std::path::PathBuf,
1667    pub ca_key_path: std::path::PathBuf,
1668    pub transparent: bool,
1669    pub udp_tproxy_port: Option<u16>,
1670}
1671
1672impl ProxyConfig {
1673    pub fn new(
1674        port: u16,
1675        ca_cert_path: std::path::PathBuf,
1676        ca_key_path: std::path::PathBuf,
1677    ) -> Self {
1678        Self {
1679            port,
1680            ca_cert_path,
1681            ca_key_path,
1682            transparent: false,
1683            udp_tproxy_port: None,
1684        }
1685    }
1686
1687    pub fn from_app_data_dir(
1688        app_data_dir: impl Into<std::path::PathBuf>,
1689        port: u16,
1690    ) -> Result<Self, String> {
1691        let app_data_dir = app_data_dir.into();
1692        if !app_data_dir.exists() {
1693            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1694        }
1695
1696        Ok(Self::new(
1697            port,
1698            app_data_dir.join("ca_cert.pem"),
1699            app_data_dir.join("ca_key.pem"),
1700        ))
1701    }
1702
1703    pub fn with_transparent(mut self, transparent: bool) -> Self {
1704        self.transparent = transparent;
1705        self
1706    }
1707
1708    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1709        self.udp_tproxy_port = udp_tproxy_port;
1710        self
1711    }
1712}
1713
1714fn now_unix_ms() -> u64 {
1715    SystemTime::now()
1716        .duration_since(UNIX_EPOCH)
1717        .unwrap_or_default()
1718        .as_millis() as u64
1719}
1720
1721fn modification_field_names(
1722    mods: Option<&relay_core_api::modification::FlowModification>,
1723) -> Vec<&'static str> {
1724    let Some(mods) = mods else {
1725        return Vec::new();
1726    };
1727
1728    let mut fields = Vec::new();
1729    if mods.method.is_some() {
1730        fields.push("method");
1731    }
1732    if mods.url.is_some() {
1733        fields.push("url");
1734    }
1735    if mods.request_headers.is_some() {
1736        fields.push("request_headers");
1737    }
1738    if mods.request_body.is_some() {
1739        fields.push("request_body");
1740    }
1741    if mods.status_code.is_some() {
1742        fields.push("status_code");
1743    }
1744    if mods.response_headers.is_some() {
1745        fields.push("response_headers");
1746    }
1747    if mods.response_body.is_some() {
1748        fields.push("response_body");
1749    }
1750    if mods.message_content.is_some() {
1751        fields.push("message_content");
1752    }
1753    fields
1754}
1755
1756#[cfg(test)]
1757mod tests {
1758    use super::*;
1759    use chrono::Utc;
1760    use relay_core_api::flow::{
1761        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1762        ResponseTiming, TransportProtocol,
1763    };
1764    use std::sync::atomic::{AtomicU64, Ordering};
1765    use tokio::time::{Duration, sleep};
1766    use url::Url;
1767    use uuid::Uuid;
1768
1769    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1770
1771    fn sqlite_url() -> String {
1772        let nanos = SystemTime::now()
1773            .duration_since(UNIX_EPOCH)
1774            .expect("clock drift")
1775            .as_nanos();
1776        let pid = std::process::id();
1777        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1778        let db_dir = std::env::current_dir()
1779            .expect("cwd")
1780            .join("target")
1781            .join("test-dbs");
1782        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1783        let db_path = db_dir.join(format!(
1784            "relay-core-runtime-test-{}-{}-{}.db",
1785            pid, nanos, seq
1786        ));
1787        format!("sqlite://{}?mode=rwc", db_path.display())
1788    }
1789
1790    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1791        let start_time =
1792            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1793        let request_url =
1794            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1795        Flow {
1796            id: Uuid::new_v4(),
1797            start_time,
1798            end_time: Some(start_time),
1799            network: NetworkInfo {
1800                client_ip: "127.0.0.1".to_string(),
1801                client_port: 12000,
1802                server_ip: "127.0.0.1".to_string(),
1803                server_port: 8080,
1804                protocol: TransportProtocol::TCP,
1805                tls: false,
1806                tls_version: None,
1807                sni: None,
1808            },
1809            layer: Layer::Http(HttpLayer {
1810                request: HttpRequest {
1811                    method: method.to_string(),
1812                    url: request_url,
1813                    version: "HTTP/1.1".to_string(),
1814                    headers: vec![],
1815                    cookies: vec![],
1816                    query: vec![],
1817                    body: None,
1818                },
1819                response: Some(HttpResponse {
1820                    status,
1821                    status_text: "OK".to_string(),
1822                    version: "HTTP/1.1".to_string(),
1823                    headers: vec![],
1824                    cookies: vec![],
1825                    body: None,
1826                    timing: ResponseTiming {
1827                        time_to_first_byte: None,
1828                        time_to_last_byte: None,
1829                        connect_time_ms: None,
1830                        ssl_time_ms: None,
1831                    },
1832                }),
1833                error: None,
1834            }),
1835            tags: vec![],
1836            meta: std::collections::HashMap::new(),
1837            resilience_trace: None,
1838            rule_variables: std::collections::HashMap::new(),
1839            matched_rules: vec![],
1840        }
1841    }
1842
1843    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1844        let start_time =
1845            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1846        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1847            .expect("url should parse");
1848        Flow {
1849            id: Uuid::new_v4(),
1850            start_time,
1851            end_time: Some(start_time),
1852            network: NetworkInfo {
1853                client_ip: "127.0.0.1".to_string(),
1854                client_port: 12000,
1855                server_ip: "127.0.0.1".to_string(),
1856                server_port: 8080,
1857                protocol: TransportProtocol::TCP,
1858                tls: false,
1859                tls_version: None,
1860                sni: None,
1861            },
1862            layer: Layer::Http(HttpLayer {
1863                request: HttpRequest {
1864                    method: "GET".to_string(),
1865                    url: request_url,
1866                    version: "HTTP/1.1".to_string(),
1867                    headers: vec![
1868                        (
1869                            "Authorization".to_string(),
1870                            "Bearer secret-token".to_string(),
1871                        ),
1872                        ("X-Normal".to_string(), "visible".to_string()),
1873                    ],
1874                    cookies: vec![],
1875                    query: vec![
1876                        ("token".to_string(), "abc123".to_string()),
1877                        ("ok".to_string(), "1".to_string()),
1878                    ],
1879                    body: Some(BodyData {
1880                        encoding: "utf-8".to_string(),
1881                        content: "secret request body".to_string(),
1882                        size: 19,
1883                    }),
1884                },
1885                response: Some(HttpResponse {
1886                    status: 200,
1887                    status_text: "OK".to_string(),
1888                    version: "HTTP/1.1".to_string(),
1889                    headers: vec![
1890                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1891                        ("X-Response".to_string(), "visible".to_string()),
1892                    ],
1893                    cookies: vec![],
1894                    body: Some(BodyData {
1895                        encoding: "utf-8".to_string(),
1896                        content: "secret response body".to_string(),
1897                        size: 20,
1898                    }),
1899                    timing: ResponseTiming {
1900                        time_to_first_byte: None,
1901                        time_to_last_byte: None,
1902                        connect_time_ms: None,
1903                        ssl_time_ms: None,
1904                    },
1905                }),
1906                error: None,
1907            }),
1908            tags: vec![],
1909            meta: std::collections::HashMap::new(),
1910            resilience_trace: None,
1911            rule_variables: std::collections::HashMap::new(),
1912            matched_rules: vec![],
1913        }
1914    }
1915
1916    #[tokio::test]
1917    async fn set_rules_from_records_audit_event() {
1918        let state = CoreState::new(None).await;
1919
1920        state
1921            .set_rules_from(
1922                AuditActor::Http,
1923                "rule.upsert",
1924                "rule-1".to_string(),
1925                json!({ "route": "/api/v1/rules" }),
1926                Vec::new(),
1927            )
1928            .await
1929            .expect("set rules should succeed");
1930
1931        let events = state.recent_audit_events();
1932        let event = events.last().expect("audit event should exist");
1933        assert_eq!(event.actor, AuditActor::Http);
1934        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1935        assert_eq!(event.outcome, AuditOutcome::Success);
1936        assert_eq!(event.target, "rule-1");
1937        assert_eq!(event.details["operation"], "rule.upsert");
1938        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1939    }
1940
1941    #[tokio::test]
1942    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1943        let state = CoreState::new(None).await;
1944
1945        state
1946            .upsert_rule_from(
1947                AuditActor::Probe,
1948                "rule.upsert",
1949                "rule-1".to_string(),
1950                json!({ "tool": "set_rule" }),
1951                Rule {
1952                    id: "rule-1".to_string(),
1953                    name: "first".to_string(),
1954                    active: true,
1955                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1956                    priority: 1,
1957                    termination: relay_core_lib::rule::RuleTermination::Continue,
1958                    filter: relay_core_lib::rule::Filter::Url(
1959                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1960                    ),
1961                    actions: vec![],
1962                    constraints: None,
1963                },
1964            )
1965            .await
1966            .expect("initial upsert should succeed");
1967
1968        state
1969            .upsert_rule_from(
1970                AuditActor::Probe,
1971                "rule.upsert",
1972                "rule-1".to_string(),
1973                json!({ "tool": "set_rule" }),
1974                Rule {
1975                    id: "rule-1".to_string(),
1976                    name: "second".to_string(),
1977                    active: true,
1978                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1979                    priority: 2,
1980                    termination: relay_core_lib::rule::RuleTermination::Continue,
1981                    filter: relay_core_lib::rule::Filter::Url(
1982                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1983                    ),
1984                    actions: vec![],
1985                    constraints: None,
1986                },
1987            )
1988            .await
1989            .expect("replacement upsert should succeed");
1990
1991        let rules = state.get_rules().await;
1992        assert_eq!(rules.len(), 1);
1993        assert_eq!(rules[0].name, "second");
1994        let event = state
1995            .recent_audit_events()
1996            .last()
1997            .cloned()
1998            .expect("audit event");
1999        assert_eq!(event.details["operation"], "rule.upsert");
2000        assert_eq!(event.details["details"]["tool"], "set_rule");
2001    }
2002
2003    #[tokio::test]
2004    async fn delete_rule_from_returns_false_when_rule_missing() {
2005        let state = CoreState::new(None).await;
2006
2007        let deleted = state
2008            .delete_rule_from(
2009                AuditActor::Http,
2010                "rule.delete",
2011                "missing".to_string(),
2012                json!({ "route": "/api/v1/rules/{id}" }),
2013                "missing",
2014            )
2015            .await
2016            .expect("delete should not fail");
2017
2018        assert!(!deleted);
2019        assert!(state.recent_audit_events().is_empty());
2020    }
2021
2022    #[tokio::test]
2023    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
2024        let state = CoreState::new(None).await;
2025
2026        let rule_id = state
2027            .create_mock_response_rule_from(
2028                AuditActor::Http,
2029                "api-mock-1".to_string(),
2030                json!({ "route": "/api/v1/mock", "status": 201 }),
2031                MockResponseRuleConfig {
2032                    rule_id: "api-mock-1".to_string(),
2033                    url_pattern: "example.com".to_string(),
2034                    name: "api-mock:example.com".to_string(),
2035                    status: 201,
2036                    content_type: "application/json".to_string(),
2037                    body: "{\"ok\":true}".to_string(),
2038                },
2039            )
2040            .await
2041            .expect("mock rule should be created");
2042
2043        assert_eq!(rule_id, "api-mock-1");
2044        let rules = state.get_rules().await;
2045        assert_eq!(rules.len(), 1);
2046        assert_eq!(rules[0].id, "api-mock-1");
2047        let event = state
2048            .recent_audit_events()
2049            .last()
2050            .cloned()
2051            .expect("audit event");
2052        assert_eq!(event.details["operation"], "rule.mock_create");
2053        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
2054    }
2055
2056    #[tokio::test]
2057    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
2058        let state = CoreState::new(None).await;
2059
2060        let rule_id = state
2061            .create_intercept_rule_from(
2062                AuditActor::Http,
2063                "intercept-1".to_string(),
2064                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
2065                InterceptRuleConfig {
2066                    rule_id: "intercept-1".to_string(),
2067                    active: true,
2068                    url_pattern: "example.com".to_string(),
2069                    method: None,
2070                    phase: "request".to_string(),
2071                    name: "api-intercept:example.com".to_string(),
2072                    priority: 100,
2073                    termination: relay_core_lib::rule::RuleTermination::Stop,
2074                },
2075            )
2076            .await
2077            .expect("intercept rule should be created");
2078
2079        assert_eq!(rule_id, "intercept-1");
2080        let rules = state.get_rules().await;
2081        assert_eq!(rules.len(), 1);
2082        assert_eq!(rules[0].id, "intercept-1");
2083        assert_eq!(rules[0].name, "api-intercept:example.com");
2084        let event = state
2085            .recent_audit_events()
2086            .last()
2087            .cloned()
2088            .expect("audit event");
2089        assert_eq!(event.details["operation"], "rule.intercept_create");
2090        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2091    }
2092
2093    #[tokio::test]
2094    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2095        let state = CoreState::new(None).await;
2096
2097        state
2098            .upsert_legacy_intercept_rule_from(
2099                AuditActor::Tauri,
2100                "legacy-1".to_string(),
2101                json!({ "command": "set_intercept_rule" }),
2102                InterceptRule {
2103                    id: "legacy-1".to_string(),
2104                    active: true,
2105                    url_pattern: "example.com".to_string(),
2106                    method: None,
2107                    phase: "both".to_string(),
2108                },
2109            )
2110            .await
2111            .expect("initial family upsert should succeed");
2112        assert_eq!(state.get_rules().await.len(), 2);
2113
2114        state
2115            .upsert_legacy_intercept_rule_from(
2116                AuditActor::Tauri,
2117                "legacy-1".to_string(),
2118                json!({ "command": "set_intercept_rule" }),
2119                InterceptRule {
2120                    id: "legacy-1".to_string(),
2121                    active: true,
2122                    url_pattern: "example.org".to_string(),
2123                    method: Some("POST".to_string()),
2124                    phase: "request".to_string(),
2125                },
2126            )
2127            .await
2128            .expect("replacement family upsert should succeed");
2129
2130        let rules = state.get_rules().await;
2131        assert_eq!(rules.len(), 1);
2132        assert_eq!(rules[0].id, "legacy-1");
2133        let event = state
2134            .recent_audit_events()
2135            .last()
2136            .cloned()
2137            .expect("audit event");
2138        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2139        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2140    }
2141
2142    #[tokio::test]
2143    async fn resolve_intercept_failure_records_failed_audit_event() {
2144        let state = CoreState::new(None).await;
2145
2146        let result = state
2147            .resolve_intercept_with_modifications_from(
2148                AuditActor::Probe,
2149                "missing-flow:request".to_string(),
2150                "drop",
2151                None,
2152            )
2153            .await;
2154
2155        assert!(result.is_err());
2156        let events = state.recent_audit_events();
2157        let event = events.last().expect("audit event should exist");
2158        assert_eq!(event.actor, AuditActor::Probe);
2159        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2160        assert_eq!(event.outcome, AuditOutcome::Failed);
2161        assert_eq!(event.details["action"], "drop");
2162        assert!(
2163            event.details["error"]
2164                .as_str()
2165                .unwrap_or_default()
2166                .contains("Interception not found")
2167        );
2168    }
2169
2170    #[tokio::test]
2171    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2172        let state = CoreState::new(None).await;
2173        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2174
2175        state
2176            .prepare_start(8080, shutdown_tx)
2177            .expect("prepare start should succeed");
2178        let lifecycle = state.lifecycle();
2179        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2180        assert_eq!(lifecycle.port, Some(8080));
2181        assert!(lifecycle.started_at_ms.is_none());
2182        assert!(lifecycle.last_error.is_none());
2183
2184        assert_eq!(
2185            state.stop_proxy().expect("stop should succeed"),
2186            ProxyStopResult::Stopping
2187        );
2188        let lifecycle = state.lifecycle();
2189        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2190        assert_eq!(lifecycle.port, Some(8080));
2191        assert!(shutdown_rx.await.is_ok());
2192    }
2193
2194    #[tokio::test]
2195    async fn status_snapshot_derives_runtime_facing_fields() {
2196        let state = CoreState::new(None).await;
2197        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2198        state
2199            .prepare_start(8080, shutdown_tx)
2200            .expect("prepare start should succeed");
2201
2202        let status = state.status_snapshot();
2203        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2204        assert!(status.running);
2205        assert_eq!(status.port, Some(8080));
2206        assert!(status.uptime.is_none());
2207        assert!(status.last_error.is_none());
2208    }
2209
2210    #[tokio::test]
2211    async fn status_report_combines_status_and_metrics() {
2212        let state = CoreState::new(None).await;
2213        let report = state.status_report().await;
2214
2215        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2216        assert!(!report.status.running);
2217        assert_eq!(report.metrics.intercepts_pending, 0);
2218        assert_eq!(report.metrics.ws_pending_messages, 0);
2219        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2220        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2221        assert_eq!(report.metrics.audit_events_total, 0);
2222        assert_eq!(report.metrics.audit_events_failed, 0);
2223        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2224        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2225    }
2226
2227    #[test]
2228    fn proxy_config_new_and_transport_setters_preserve_values() {
2229        let config = ProxyConfig::new(
2230            8080,
2231            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2232            std::path::PathBuf::from("/tmp/ca_key.pem"),
2233        )
2234        .with_transparent(true)
2235        .with_udp_tproxy_port(Some(15000));
2236
2237        assert_eq!(config.port, 8080);
2238        assert_eq!(
2239            config.ca_cert_path,
2240            std::path::PathBuf::from("/tmp/ca_cert.pem")
2241        );
2242        assert_eq!(
2243            config.ca_key_path,
2244            std::path::PathBuf::from("/tmp/ca_key.pem")
2245        );
2246        assert!(config.transparent);
2247        assert_eq!(config.udp_tproxy_port, Some(15000));
2248    }
2249
2250    #[test]
2251    fn proxy_config_from_app_data_dir_creates_default_paths() {
2252        let unique = SystemTime::now()
2253            .duration_since(UNIX_EPOCH)
2254            .expect("clock drift")
2255            .as_nanos();
2256        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2257
2258        let config =
2259            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2260
2261        assert!(dir.exists());
2262        assert_eq!(config.port, 8899);
2263        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2264        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2265        assert!(!config.transparent);
2266        assert!(config.udp_tproxy_port.is_none());
2267    }
2268
2269    #[tokio::test]
2270    async fn intercept_snapshot_maps_pending_counts() {
2271        let state = CoreState::new(None).await;
2272        let snapshot = state.intercept_snapshot().await;
2273
2274        assert_eq!(snapshot.pending_count, 0);
2275        assert_eq!(snapshot.ws_pending_count, 0);
2276    }
2277
2278    #[tokio::test]
2279    async fn audit_snapshot_returns_latest_events_in_order() {
2280        let state = CoreState::new(None).await;
2281        state.record_audit_event(AuditEvent::new(
2282            AuditActor::Runtime,
2283            AuditEventKind::RuleChanged,
2284            "first",
2285            AuditOutcome::Success,
2286            json!({ "index": 1 }),
2287        ));
2288        state.record_audit_event(AuditEvent::new(
2289            AuditActor::Http,
2290            AuditEventKind::PolicyUpdated,
2291            "second",
2292            AuditOutcome::Success,
2293            json!({ "index": 2 }),
2294        ));
2295
2296        let snapshot = state.audit_snapshot(1);
2297
2298        assert_eq!(snapshot.events.len(), 1);
2299        assert_eq!(snapshot.events[0].target, "second");
2300        assert_eq!(snapshot.events[0].details["index"], 2);
2301    }
2302
2303    #[tokio::test]
2304    async fn query_audit_snapshot_filters_in_memory_events() {
2305        let state = CoreState::new(None).await;
2306        state.record_audit_event(AuditEvent::new(
2307            AuditActor::Http,
2308            AuditEventKind::RuleChanged,
2309            "rule-1",
2310            AuditOutcome::Success,
2311            json!({ "idx": 1 }),
2312        ));
2313        state.record_audit_event(AuditEvent::new(
2314            AuditActor::Probe,
2315            AuditEventKind::PolicyUpdated,
2316            "policy",
2317            AuditOutcome::Failed,
2318            json!({ "idx": 2 }),
2319        ));
2320
2321        let snapshot = state
2322            .query_audit_snapshot(CoreAuditQuery {
2323                actor: Some(AuditActor::Probe),
2324                kind: Some(AuditEventKind::PolicyUpdated),
2325                outcome: Some(AuditOutcome::Failed),
2326                limit: 10,
2327                ..Default::default()
2328            })
2329            .await;
2330
2331        assert_eq!(snapshot.events.len(), 1);
2332        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2333        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2334        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2335    }
2336
2337    #[tokio::test]
2338    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2339        let state = CoreState::new(Some(sqlite_url())).await;
2340        state.update_policy_from(
2341            AuditActor::Http,
2342            "policy".to_string(),
2343            ProxyPolicy {
2344                transparent_enabled: true,
2345                ..Default::default()
2346            },
2347        );
2348
2349        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2350        for _ in 0..10 {
2351            snapshot = state
2352                .query_audit_snapshot(CoreAuditQuery {
2353                    actor: Some(AuditActor::Http),
2354                    kind: Some(AuditEventKind::PolicyUpdated),
2355                    limit: 10,
2356                    ..Default::default()
2357                })
2358                .await;
2359            if !snapshot.events.is_empty() {
2360                break;
2361            }
2362            sleep(Duration::from_millis(20)).await;
2363        }
2364
2365        assert!(!snapshot.events.is_empty());
2366        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2367        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2368    }
2369
2370    #[tokio::test]
2371    async fn prepare_start_rejects_second_active_start() {
2372        let state = CoreState::new(None).await;
2373        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2374        state
2375            .prepare_start(8080, shutdown_tx)
2376            .expect("first start should succeed");
2377
2378        let (second_tx, _second_rx) = oneshot::channel();
2379        let error = state
2380            .prepare_start(8081, second_tx)
2381            .expect_err("second active start should be rejected");
2382        assert!(error.contains("already"));
2383    }
2384
2385    #[test]
2386    fn update_policy_records_audit_event() {
2387        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2388        let state = runtime.block_on(CoreState::new(None));
2389
2390        state.update_policy_from(
2391            AuditActor::Runtime,
2392            "policy".to_string(),
2393            ProxyPolicy {
2394                transparent_enabled: true,
2395                ..Default::default()
2396            },
2397        );
2398
2399        let events = state.recent_audit_events();
2400        let event = events.last().expect("audit event should exist");
2401        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2402        assert_eq!(event.outcome, AuditOutcome::Success);
2403        assert_eq!(event.details["transparent_enabled"], true);
2404    }
2405
2406    #[test]
2407    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2408        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2409        let state = runtime.block_on(CoreState::new(None));
2410        let original_timeout = state.policy_snapshot().request_timeout_ms;
2411
2412        state.patch_policy_from(
2413            AuditActor::Runtime,
2414            "policy.patch".to_string(),
2415            relay_core_api::policy::ProxyPolicyPatch {
2416                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2417                    enabled: Some(true),
2418                    redact_bodies: Some(true),
2419                    ..Default::default()
2420                }),
2421                upstream: None,
2422            },
2423        );
2424
2425        let policy = state.policy_snapshot();
2426        assert_eq!(policy.request_timeout_ms, original_timeout);
2427        assert!(policy.redaction.enabled);
2428        assert!(policy.redaction.redact_bodies);
2429
2430        let events = state.recent_audit_events();
2431        let event = events.last().expect("audit event should exist");
2432        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2433        assert_eq!(event.details["redaction_enabled"], true);
2434    }
2435
2436    #[tokio::test]
2437    async fn metrics_include_audit_and_lagged_event_counters() {
2438        let state = CoreState::new(None).await;
2439
2440        state.update_policy_from(
2441            AuditActor::Runtime,
2442            "policy".to_string(),
2443            ProxyPolicy::default(),
2444        );
2445        let _ = state
2446            .resolve_intercept_with_modifications_from(
2447                AuditActor::Probe,
2448                "missing-flow:request".to_string(),
2449                "drop",
2450                None,
2451            )
2452            .await;
2453
2454        state.record_flow_events_lagged(3);
2455        state.record_audit_events_lagged(5);
2456
2457        let metrics = state.get_metrics().await;
2458        assert_eq!(metrics.audit_events_total, 2);
2459        assert_eq!(metrics.audit_events_failed, 1);
2460        assert_eq!(metrics.flow_events_lagged_total, 3);
2461        assert_eq!(metrics.audit_events_lagged_total, 5);
2462    }
2463
2464    #[tokio::test]
2465    async fn prometheus_metrics_text_contains_observability_fields() {
2466        let state = CoreState::new(None).await;
2467        state.record_flow_events_lagged(2);
2468        state.record_audit_events_lagged(4);
2469
2470        let text = state.get_metrics_prometheus_text().await;
2471        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2472        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2473        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2474        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2475    }
2476
2477    #[tokio::test]
2478    async fn search_flows_uses_store_with_offset_pagination() {
2479        let state = CoreState::new(Some(sqlite_url())).await;
2480        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2481        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2482        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2483
2484        state.upsert_flow(Box::new(flow_a));
2485        state.upsert_flow(Box::new(flow_b));
2486        state.upsert_flow(Box::new(flow_c));
2487
2488        let mut baseline = Vec::new();
2489        for _ in 0..20 {
2490            baseline = state
2491                .search_flows(FlowQuery {
2492                    host: Some("api.example.com".to_string()),
2493                    path_contains: None,
2494                    method: None,
2495                    status_min: None,
2496                    status_max: None,
2497                    has_error: None,
2498                    is_websocket: None,
2499                    limit: Some(3),
2500                    offset: Some(0),
2501                })
2502                .await;
2503            if baseline.len() == 3 {
2504                break;
2505            }
2506            sleep(Duration::from_millis(20)).await;
2507        }
2508
2509        assert_eq!(baseline.len(), 3);
2510        let page = state
2511            .search_flows(FlowQuery {
2512                host: Some("api.example.com".to_string()),
2513                path_contains: None,
2514                method: None,
2515                status_min: None,
2516                status_max: None,
2517                has_error: None,
2518                is_websocket: None,
2519                limit: Some(1),
2520                offset: Some(1),
2521            })
2522            .await;
2523        assert_eq!(page.len(), 1);
2524        assert_eq!(page[0].id, baseline[1].id);
2525    }
2526
2527    #[tokio::test]
2528    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2529        let state = CoreState::new(Some(sqlite_url())).await;
2530        let first_flow = sample_http_flow(
2531            "persist.example.com",
2532            "/first",
2533            "GET",
2534            200,
2535            1_700_000_010_000,
2536        );
2537        let first_id = first_flow.id.to_string();
2538        state.upsert_flow(Box::new(first_flow));
2539        for i in 0..240 {
2540            state.upsert_flow(Box::new(sample_http_flow(
2541                "persist.example.com",
2542                &format!("/{}", i),
2543                "GET",
2544                200,
2545                1_700_000_020_000 + i,
2546            )));
2547        }
2548
2549        sleep(Duration::from_millis(200)).await;
2550
2551        let loaded = state.get_flow(first_id).await;
2552        assert!(loaded.is_some());
2553    }
2554
2555    #[tokio::test]
2556    async fn search_flows_redacts_summary_url_when_enabled() {
2557        let state = CoreState::new(None).await;
2558        state.update_policy_from(
2559            AuditActor::Runtime,
2560            "policy.redaction".to_string(),
2561            ProxyPolicy {
2562                redaction: RedactionPolicy {
2563                    enabled: true,
2564                    sensitive_query_keys: vec!["token".to_string()],
2565                    redact_bodies: false,
2566                    ..Default::default()
2567                },
2568                ..Default::default()
2569            },
2570        );
2571        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2572
2573        let mut items = Vec::new();
2574        for _ in 0..20 {
2575            items = state
2576                .search_flows(FlowQuery {
2577                    host: Some("api.example.com".to_string()),
2578                    path_contains: Some("/private".to_string()),
2579                    ..Default::default()
2580                })
2581                .await;
2582            if !items.is_empty() {
2583                break;
2584            }
2585            sleep(Duration::from_millis(20)).await;
2586        }
2587
2588        assert!(!items.is_empty());
2589        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2590        let token = redacted
2591            .query_pairs()
2592            .find(|(k, _)| k == "token")
2593            .map(|(_, v)| v.to_string());
2594        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2595    }
2596
2597    #[tokio::test]
2598    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2599        let state = CoreState::new(Some(sqlite_url())).await;
2600        state.update_policy_from(
2601            AuditActor::Runtime,
2602            "policy.redaction".to_string(),
2603            ProxyPolicy {
2604                redaction: RedactionPolicy {
2605                    enabled: true,
2606                    sensitive_header_names: vec![
2607                        "authorization".to_string(),
2608                        "set-cookie".to_string(),
2609                    ],
2610                    sensitive_query_keys: vec!["token".to_string()],
2611                    redact_bodies: true,
2612                },
2613                ..Default::default()
2614            },
2615        );
2616
2617        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2618        let flow_id = flow.id.to_string();
2619        state.upsert_flow(Box::new(flow));
2620        sleep(Duration::from_millis(80)).await;
2621
2622        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2623        let Layer::Http(http) = loaded.layer else {
2624            panic!("expected http layer");
2625        };
2626
2627        let auth = http
2628            .request
2629            .headers
2630            .iter()
2631            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2632            .map(|(_, v)| v.as_str());
2633        assert_eq!(auth, Some("[REDACTED]"));
2634
2635        let req_query_token = http
2636            .request
2637            .query
2638            .iter()
2639            .find(|(k, _)| k == "token")
2640            .map(|(_, v)| v.as_str());
2641        assert_eq!(req_query_token, Some("[REDACTED]"));
2642
2643        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2644        assert_eq!(req_body, Some("[REDACTED]"));
2645
2646        let response = http.response.expect("response should exist");
2647        let set_cookie = response
2648            .headers
2649            .iter()
2650            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2651            .map(|(_, v)| v.as_str());
2652        assert_eq!(set_cookie, Some("[REDACTED]"));
2653        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2654        assert_eq!(res_body, Some("[REDACTED]"));
2655    }
2656
2657    #[test]
2658    fn redact_flow_update_masks_http_body_when_enabled() {
2659        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2660        let state = runtime.block_on(CoreState::new(None));
2661        state.update_policy_from(
2662            AuditActor::Runtime,
2663            "policy.redaction".to_string(),
2664            ProxyPolicy {
2665                redaction: RedactionPolicy {
2666                    enabled: true,
2667                    redact_bodies: true,
2668                    ..Default::default()
2669                },
2670                ..Default::default()
2671            },
2672        );
2673
2674        let update = FlowUpdate::HttpBody {
2675            flow_id: "f-1".to_string(),
2676            direction: relay_core_api::flow::Direction::ClientToServer,
2677            body: BodyData {
2678                encoding: "utf-8".to_string(),
2679                content: "super-secret".to_string(),
2680                size: 12,
2681            },
2682        };
2683        let redacted = state.redact_flow_update_for_output(update);
2684        match redacted {
2685            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2686            _ => panic!("expected http body update"),
2687        }
2688    }
2689
2690    #[cfg(feature = "script")]
2691    #[tokio::test]
2692    async fn load_script_from_records_audit_event() {
2693        let state = CoreState::new(None).await;
2694
2695        state
2696            .load_script_from(
2697                AuditActor::Tauri,
2698                "tauri.load_script".to_string(),
2699                "globalThis.onRequestHeaders = (_flow) => {};",
2700            )
2701            .await
2702            .expect("script should load");
2703
2704        let events = state.recent_audit_events();
2705        let event = events.last().expect("audit event should exist");
2706        assert_eq!(event.actor, AuditActor::Tauri);
2707        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2708        assert_eq!(event.outcome, AuditOutcome::Success);
2709        assert_eq!(event.target, "tauri.load_script");
2710    }
2711}