Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66// ── Re-exports for user convenience ──
67// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
68pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77    pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82    pub flows_total: usize,
83    pub flows_in_memory: usize,
84    pub flows_dropped: usize,
85    pub intercepts_pending: usize,
86    pub ws_pending_messages: usize,
87    pub oldest_intercept_age_ms: Option<u64>,
88    pub oldest_ws_message_age_ms: Option<u64>,
89    pub rule_exec_errors: usize,
90    pub audit_events_total: usize,
91    pub audit_events_failed: usize,
92    pub flow_events_lagged_total: usize,
93    pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97    pub fn to_prometheus_text(&self) -> String {
98        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100        format!(
101            "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113            self.flows_total,
114            self.flows_in_memory,
115            self.flows_dropped,
116            self.intercepts_pending,
117            self.ws_pending_messages,
118            oldest_intercept_age_ms,
119            oldest_ws_message_age_ms,
120            self.rule_exec_errors,
121            self.audit_events_total,
122            self.audit_events_failed,
123            self.flow_events_lagged_total,
124            self.audit_events_lagged_total,
125        )
126    }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131    pub phase: RuntimeLifecyclePhase,
132    pub running: bool,
133    pub port: Option<u16>,
134    pub uptime: Option<u64>,
135    pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140    pub status: CoreStatusSnapshot,
141    pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146    pub pending_count: usize,
147    pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152    pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157    pub since_ms: Option<u64>,
158    pub until_ms: Option<u64>,
159    pub actor: Option<AuditActor>,
160    pub kind: Option<AuditEventKind>,
161    pub outcome: Option<AuditOutcome>,
162    pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166    fn default() -> Self {
167        Self {
168            since_ms: None,
169            until_ms: None,
170            actor: None,
171            kind: None,
172            outcome: None,
173            limit: 50,
174        }
175    }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181    Created,
182    Starting,
183    Running,
184    Stopping,
185    Stopped,
186    Failed,
187}
188
189impl RuntimeLifecyclePhase {
190    pub fn as_str(&self) -> &'static str {
191        match self {
192            Self::Created => "created",
193            Self::Starting => "starting",
194            Self::Running => "running",
195            Self::Stopping => "stopping",
196            Self::Stopped => "stopped",
197            Self::Failed => "failed",
198        }
199    }
200
201    pub fn is_active(&self) -> bool {
202        matches!(self, Self::Starting | Self::Running | Self::Stopping)
203    }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208    pub phase: RuntimeLifecyclePhase,
209    pub port: Option<u16>,
210    pub started_at_ms: Option<u64>,
211    pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215    pub fn created() -> Self {
216        Self {
217            phase: RuntimeLifecyclePhase::Created,
218            port: None,
219            started_at_ms: None,
220            last_error: None,
221        }
222    }
223
224    pub fn is_active(&self) -> bool {
225        self.phase.is_active()
226    }
227
228    pub fn uptime_seconds(&self) -> Option<u64> {
229        let started_at_ms = self.started_at_ms?;
230        let now_ms = now_unix_ms();
231        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232    }
233}
234
235pub enum ProxySpawnResult {
236    Started(tokio::task::JoinHandle<()>),
237    AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242    Stopping,
243    NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247    fn from(lifecycle: RuntimeLifecycle) -> Self {
248        Self {
249            running: lifecycle.is_active(),
250            uptime: lifecycle.uptime_seconds(),
251            port: lifecycle.port,
252            last_error: lifecycle.last_error,
253            phase: lifecycle.phase,
254        }
255    }
256}
257
258pub struct CoreState {
259    flow_store: mpsc::Sender<FlowStoreMessage>,
260    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261    rule_store: mpsc::Sender<RuleStoreMessage>,
262    store: Option<Store>,
263    #[cfg(feature = "script")]
264    pub script_interceptor: Arc<ScriptInterceptor>,
265    pub policy_tx: watch::Sender<ProxyPolicy>,
266    pub flows_dropped: Arc<AtomicUsize>,
267    audit_events_total: Arc<AtomicUsize>,
268    audit_events_failed: Arc<AtomicUsize>,
269    flow_events_lagged_total: Arc<AtomicUsize>,
270    audit_events_lagged_total: Arc<AtomicUsize>,
271    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
272    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280    pub async fn new(db_url: Option<String>) -> Self {
281        const AUDIT_HISTORY_LIMIT: usize = 200;
282
283        let store = if let Some(url) = db_url {
284            match Store::connect(&url).await {
285                Ok(s) => {
286                    if let Err(e) = s.init().await {
287                        tracing::error!("Failed to init store: {}", e);
288                    }
289                    Some(s)
290                }
291                Err(e) => {
292                    tracing::error!("Failed to connect to store: {}", e);
293                    None
294                }
295            }
296        } else {
297            None
298        };
299
300        let (flow_tx, flow_rx) = mpsc::channel(10000);
301        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302        tokio::spawn(flow_actor.run());
303
304        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306        tokio::spawn(intercept_actor.run());
307
308        let (rule_tx, rule_rx) = mpsc::channel(100);
309        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310        tokio::spawn(rule_actor.run());
311
312        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313        let (flow_broadcast_tx, _) = broadcast::channel(1000);
314        let (audit_broadcast_tx, _) = broadcast::channel(256);
315        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317        #[cfg(feature = "script")]
318        let script_interceptor = ScriptInterceptor::new()
319            .await
320            .expect("Failed to initialize ScriptInterceptor");
321        Self {
322            flow_store: flow_tx,
323            intercept_broker: intercept_tx,
324            rule_store: rule_tx,
325            store,
326            #[cfg(feature = "script")]
327            script_interceptor: Arc::new(script_interceptor),
328            policy_tx,
329            flows_dropped: Arc::new(AtomicUsize::new(0)),
330            audit_events_total: Arc::new(AtomicUsize::new(0)),
331            audit_events_failed: Arc::new(AtomicUsize::new(0)),
332            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
333            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
334            flow_broadcast_tx,
335            audit_broadcast_tx,
336            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
337            lifecycle_tx,
338            shutdown_tx: Mutex::new(None),
339        }
340    }
341
342    pub async fn get_metrics(&self) -> CoreMetrics {
343        let (flow_tx, flow_rx) = oneshot::channel();
344        let _ = self
345            .flow_store
346            .send(FlowStoreMessage::GetMetrics(flow_tx))
347            .await;
348        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
349
350        let (int_tx, int_rx) = oneshot::channel();
351        let _ = self
352            .intercept_broker
353            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
354            .await;
355        let (
356            intercepts_pending,
357            ws_pending_messages,
358            oldest_intercept_age_ms,
359            oldest_ws_message_age_ms,
360        ) = int_rx.await.unwrap_or((0, 0, None, None));
361
362        let (rule_tx, rule_rx) = oneshot::channel();
363        let _ = self
364            .rule_store
365            .send(RuleStoreMessage::GetMetrics(rule_tx))
366            .await;
367        let rule_exec_errors = rule_rx.await.unwrap_or(0);
368
369        CoreMetrics {
370            flows_total,
371            flows_in_memory,
372            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
373            intercepts_pending,
374            ws_pending_messages,
375            oldest_intercept_age_ms,
376            oldest_ws_message_age_ms,
377            rule_exec_errors,
378            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
379            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
380            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
381            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
382        }
383    }
384
385    pub async fn get_metrics_prometheus_text(&self) -> String {
386        self.get_metrics().await.to_prometheus_text()
387    }
388
389    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
390        self.lifecycle().into()
391    }
392
393    pub async fn status_report(&self) -> CoreStatusReport {
394        CoreStatusReport {
395            status: self.status_snapshot(),
396            metrics: self.get_metrics().await,
397        }
398    }
399
400    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
401        let metrics = self.get_metrics().await;
402        CoreInterceptSnapshot {
403            pending_count: metrics.intercepts_pending,
404            ws_pending_count: metrics.ws_pending_messages,
405        }
406    }
407
408    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
409        let events = self.recent_audit_events();
410        let start = events.len().saturating_sub(limit);
411        CoreAuditSnapshot {
412            events: events.into_iter().skip(start).collect(),
413        }
414    }
415
416    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
417        let limit = query.limit.clamp(1, 500);
418        if let Some(store) = &self.store {
419            let rows = store
420                .query_audit_events(
421                    query.since_ms,
422                    query.until_ms,
423                    query.actor.as_ref().map(AuditActor::as_str),
424                    query.kind.as_ref().map(AuditEventKind::as_str),
425                    query.outcome.as_ref().map(AuditOutcome::as_str),
426                    limit,
427                )
428                .await
429                .unwrap_or_default();
430
431            let mut events = Vec::with_capacity(rows.len());
432            for row in rows {
433                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
434                    events.push(event);
435                }
436            }
437            return CoreAuditSnapshot { events };
438        }
439
440        let mut events = self.recent_audit_events();
441        events.reverse();
442        let filtered = events
443            .into_iter()
444            .filter(|event| {
445                query
446                    .since_ms
447                    .map(|v| event.timestamp_ms >= v)
448                    .unwrap_or(true)
449            })
450            .filter(|event| {
451                query
452                    .until_ms
453                    .map(|v| event.timestamp_ms <= v)
454                    .unwrap_or(true)
455            })
456            .filter(|event| {
457                query
458                    .actor
459                    .as_ref()
460                    .map(|v| &event.actor == v)
461                    .unwrap_or(true)
462            })
463            .filter(|event| {
464                query
465                    .kind
466                    .as_ref()
467                    .map(|v| &event.kind == v)
468                    .unwrap_or(true)
469            })
470            .filter(|event| {
471                query
472                    .outcome
473                    .as_ref()
474                    .map(|v| &event.outcome == v)
475                    .unwrap_or(true)
476            })
477            .take(limit)
478            .collect();
479        CoreAuditSnapshot { events: filtered }
480    }
481
482    pub async fn get_flow(&self, id: String) -> Option<Flow> {
483        let (tx, rx) = oneshot::channel();
484        if let Err(e) = self
485            .flow_store
486            .send(FlowStoreMessage::GetFlow {
487                id: id.clone(),
488                respond_to: tx,
489            })
490            .await
491        {
492            error!("Failed to send GetFlow request: {}", e);
493            if let Some(store) = &self.store {
494                return store
495                    .load_flow(&id)
496                    .await
497                    .ok()
498                    .flatten()
499                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
500            }
501            return None;
502        }
503        let flow = rx
504            .await
505            .map_err(|e| {
506                error!("Failed to receive Flow response: {}", e);
507                e
508            })
509            .unwrap_or(None);
510        if let Some(flow) = flow {
511            return Some(redact_flow(flow, &self.current_redaction_policy()));
512        }
513        if let Some(store) = &self.store {
514            return store
515                .load_flow(&id)
516                .await
517                .ok()
518                .flatten()
519                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
520                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
521        }
522        None
523    }
524
525    pub async fn get_rules(&self) -> Vec<Rule> {
526        let (tx, rx) = oneshot::channel();
527        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
528            error!("Failed to send GetRules request: {}", e);
529            return Vec::new();
530        }
531        rx.await
532            .map_err(|e| {
533                error!("Failed to receive Rules response: {}", e);
534                e
535            })
536            .unwrap_or_default()
537    }
538
539    pub async fn set_rules(&self, rules: Vec<Rule>) {
540        let _ = self
541            .set_rules_from(
542                AuditActor::Runtime,
543                "rules.replace",
544                "rule_set".to_string(),
545                json!({}),
546                rules,
547            )
548            .await;
549    }
550
551    pub async fn upsert_rule_from(
552        &self,
553        actor: AuditActor,
554        operation: &str,
555        target: String,
556        details: serde_json::Value,
557        rule: Rule,
558    ) -> Result<(), String> {
559        let rule_id = rule.id.clone();
560        let mut rules = self.get_rules().await;
561        rules.retain(|existing| existing.id != rule_id);
562        rules.push(rule);
563        self.set_rules_from(actor, operation, target, details, rules)
564            .await
565    }
566
567    pub async fn delete_rule_from(
568        &self,
569        actor: AuditActor,
570        operation: &str,
571        target: String,
572        details: serde_json::Value,
573        rule_id: &str,
574    ) -> Result<bool, String> {
575        let mut rules = self.get_rules().await;
576        let before = rules.len();
577        rules.retain(|rule| rule.id != rule_id);
578        if rules.len() == before {
579            return Ok(false);
580        }
581        self.set_rules_from(actor, operation, target, details, rules)
582            .await
583            .map(|_| true)
584    }
585
586    pub async fn create_mock_response_rule_from(
587        &self,
588        actor: AuditActor,
589        target: String,
590        details: serde_json::Value,
591        config: MockResponseRuleConfig,
592    ) -> Result<String, String> {
593        let rule_id = config.rule_id.clone();
594        let rule = build_mock_response_rule(config);
595        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
596            .await
597            .map(|_| rule_id)
598    }
599
600    pub async fn create_intercept_rule_from(
601        &self,
602        actor: AuditActor,
603        target: String,
604        details: serde_json::Value,
605        config: InterceptRuleConfig,
606    ) -> Result<String, String> {
607        let rule_id = config.rule_id.clone();
608        let mut rules = self.get_rules().await;
609        rules.extend(build_intercept_rules(config));
610        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
611            .await
612            .map(|_| rule_id)
613    }
614
615    pub async fn upsert_legacy_intercept_rule_from(
616        &self,
617        actor: AuditActor,
618        target: String,
619        details: serde_json::Value,
620        rule: InterceptRule,
621    ) -> Result<(), String> {
622        let rule_id = rule.id.clone();
623        let mut rules = self.get_rules().await;
624        rules.retain(|existing| {
625            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
626        });
627        rules.extend(rule.to_rules());
628        self.set_rules_from(
629            actor,
630            "rule.intercept_legacy_upsert",
631            target,
632            details,
633            rules,
634        )
635        .await
636    }
637
638    pub async fn set_rules_from(
639        &self,
640        actor: AuditActor,
641        operation: &str,
642        target: String,
643        details: serde_json::Value,
644        rules: Vec<Rule>,
645    ) -> Result<(), String> {
646        let rule_count = rules.len();
647        if let Err(e) = self
648            .rule_store
649            .send(RuleStoreMessage::SetRules(rules))
650            .await
651        {
652            error!("Failed to send SetRules request: {}", e);
653            self.record_audit_event(AuditEvent::new(
654                actor,
655                AuditEventKind::RuleChanged,
656                target,
657                AuditOutcome::Failed,
658                json!({
659                    "operation": operation,
660                    "rule_count": rule_count,
661                    "details": details,
662                    "error": e.to_string()
663                }),
664            ));
665            return Err(e.to_string());
666        }
667
668        self.record_audit_event(AuditEvent::new(
669            actor,
670            AuditEventKind::RuleChanged,
671            target,
672            AuditOutcome::Success,
673            json!({
674                "operation": operation,
675                "rule_count": rule_count,
676                "details": details
677            }),
678        ));
679        Ok(())
680    }
681
682    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
683        let mut new_rules = Vec::new();
684        for rule in rules {
685            new_rules.extend(rule.to_rules());
686        }
687        self.set_rules(new_rules).await;
688    }
689
690    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
691        let (tx, rx) = oneshot::channel();
692        if let Err(e) = self
693            .rule_store
694            .send(RuleStoreMessage::GetRuleEngine(tx))
695            .await
696        {
697            error!("Failed to send GetRuleEngine request: {}", e);
698            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
699        }
700        rx.await
701            .map_err(|e| {
702                error!("Failed to receive RuleEngine response: {}", e);
703                e
704            })
705            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
706    }
707
708    pub fn update_policy(&self, policy: ProxyPolicy) {
709        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
710    }
711
712    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
713        let mut policy = self.policy_snapshot();
714        policy.apply_patch(patch);
715        self.update_policy_from(actor, target, policy);
716    }
717
718    pub fn policy_snapshot(&self) -> ProxyPolicy {
719        self.policy_tx.borrow().clone()
720    }
721
722    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
723        let details = json!({
724            "strict_http_semantics": policy.strict_http_semantics,
725            "request_timeout_ms": policy.request_timeout_ms,
726            "max_body_size": policy.max_body_size,
727            "transparent_enabled": policy.transparent_enabled,
728            "redaction_enabled": policy.redaction.enabled,
729            "redact_bodies": policy.redaction.redact_bodies
730        });
731
732        self.policy_tx.send_replace(policy);
733        self.record_audit_event(AuditEvent::new(
734            actor,
735            AuditEventKind::PolicyUpdated,
736            target,
737            AuditOutcome::Success,
738            details,
739        ));
740    }
741
742    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
743        if let Err(e) = self
744            .intercept_broker
745            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
746            .await
747        {
748            error!("Failed to send RegisterIntercept request: {}", e);
749        }
750    }
751
752    pub async fn resolve_intercept(
753        &self,
754        key: String,
755        result: InterceptionResult,
756    ) -> Result<(), String> {
757        let (tx, rx) = oneshot::channel();
758        if let Err(e) = self
759            .intercept_broker
760            .send(InterceptBrokerMessage::ResolveIntercept {
761                key,
762                result,
763                respond_to: tx,
764            })
765            .await
766        {
767            error!("Failed to send ResolveIntercept request: {}", e);
768            return Err(e.to_string());
769        }
770        rx.await.map_err(|_| "Actor dropped".to_string())?
771    }
772
773    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
774        let (tx, rx) = oneshot::channel();
775        if let Err(e) = self
776            .intercept_broker
777            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
778                key,
779                respond_to: tx,
780            })
781            .await
782        {
783            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
784            return None;
785        }
786        rx.await
787            .map_err(|e| {
788                error!("Failed to receive WebSocketMessage response: {}", e);
789                e
790            })
791            .unwrap_or(None)
792    }
793
794    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
795        if let Err(e) = self
796            .intercept_broker
797            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
798            .await
799        {
800            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
801        }
802    }
803
804    pub async fn is_intercept_pending(&self, key: String) -> bool {
805        let (tx, rx) = oneshot::channel();
806        if let Err(e) = self
807            .intercept_broker
808            .send(InterceptBrokerMessage::GetPendingIntercept {
809                key,
810                respond_to: tx,
811            })
812            .await
813        {
814            error!("Failed to send GetPendingIntercept request: {}", e);
815            return false;
816        }
817        rx.await
818            .map_err(|e| {
819                error!("Failed to receive PendingIntercept response: {}", e);
820                e
821            })
822            .unwrap_or(false)
823    }
824
825    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
826        let (tx, rx) = oneshot::channel();
827        if let Err(e) = self
828            .intercept_broker
829            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
830                flow_id,
831                respond_to: tx,
832            })
833            .await
834        {
835            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
836            return false;
837        }
838        rx.await
839            .map_err(|e| {
840                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
841                e
842            })
843            .unwrap_or(false)
844    }
845
846    pub fn upsert_flow(&self, flow: Box<Flow>) {
847        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
848            // Drop flow if channel is full
849            error!("FlowStore dropped flow: {}", e);
850            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
851        }
852    }
853
854    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
855        if let Err(e) = self
856            .flow_store
857            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
858        {
859            error!("FlowStore dropped WS message: {}", e);
860            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
861        }
862    }
863
864    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
865        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
866            flow_id,
867            body,
868            direction,
869        }) {
870            error!("FlowStore dropped HTTP body: {}", e);
871            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
872        }
873    }
874
875    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
876    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
877        self.flow_broadcast_tx.subscribe()
878    }
879
880    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
881        self.audit_broadcast_tx.subscribe()
882    }
883
884    fn current_redaction_policy(&self) -> RedactionPolicy {
885        self.policy_tx.borrow().redaction.clone()
886    }
887
888    pub fn record_flow_events_lagged(&self, skipped: u64) {
889        self.flow_events_lagged_total
890            .fetch_add(skipped as usize, Ordering::Relaxed);
891    }
892
893    pub fn record_audit_events_lagged(&self, skipped: u64) {
894        self.audit_events_lagged_total
895            .fetch_add(skipped as usize, Ordering::Relaxed);
896    }
897
898    pub fn lifecycle(&self) -> RuntimeLifecycle {
899        self.lifecycle_tx.borrow().clone()
900    }
901
902    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
903        self.lifecycle_tx.subscribe()
904    }
905
906    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
907        let current = self.lifecycle();
908        if current.is_active() {
909            return Err(format!(
910                "Proxy is already {} on port {}",
911                current.phase.as_str(),
912                current.port.unwrap_or(port)
913            ));
914        }
915
916        let mut guard = self
917            .shutdown_tx
918            .lock()
919            .map_err(|_| "shutdown state poisoned".to_string())?;
920        *guard = Some(shutdown_tx);
921        drop(guard);
922
923        self.update_lifecycle(RuntimeLifecycle {
924            phase: RuntimeLifecyclePhase::Starting,
925            port: Some(port),
926            started_at_ms: None,
927            last_error: None,
928        });
929        Ok(())
930    }
931
932    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
933        let mut guard = self
934            .shutdown_tx
935            .lock()
936            .map_err(|_| "shutdown state poisoned".to_string())?;
937        let Some(tx) = guard.take() else {
938            return Ok(ProxyStopResult::NotRunning);
939        };
940        drop(guard);
941
942        let current = self.lifecycle();
943        self.update_lifecycle(RuntimeLifecycle {
944            phase: RuntimeLifecyclePhase::Stopping,
945            port: current.port,
946            started_at_ms: current.started_at_ms,
947            last_error: current.last_error,
948        });
949        let _ = tx.send(());
950        Ok(ProxyStopResult::Stopping)
951    }
952
953    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
954        self.audit_history
955            .lock()
956            .map(|events| events.iter().cloned().collect())
957            .unwrap_or_default()
958    }
959
960    /// 搜索内存中的 Flow 列表,返回轻量摘要。
961    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
962        let redaction = self.current_redaction_policy();
963        if let Some(store) = &self.store {
964            return store
965                .query_flow_summaries(&query)
966                .await
967                .unwrap_or_default()
968                .into_iter()
969                .map(|summary| redact_flow_summary(summary, &redaction))
970                .collect();
971        }
972        let (tx, rx) = oneshot::channel();
973        if let Err(e) = self
974            .flow_store
975            .send(FlowStoreMessage::SearchFlows {
976                query,
977                respond_to: tx,
978            })
979            .await
980        {
981            error!("Failed to send SearchFlows request: {}", e);
982            return Vec::new();
983        }
984        rx.await
985            .unwrap_or_default()
986            .into_iter()
987            .map(|summary| redact_flow_summary(summary, &redaction))
988            .collect()
989    }
990
991    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
992        let redaction = self.current_redaction_policy();
993        redact_flow_update(update, &redaction)
994    }
995
996    /// 解除截获并可选地应用修改。
997    ///
998    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
999    /// - 若 `mods` 为 None,直接 Continue;
1000    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1001    ///
1002    /// `key` 格式约定:
1003    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1004    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1005    pub async fn resolve_intercept_with_modifications(
1006        &self,
1007        key: String,
1008        action: &str,
1009        mods: Option<relay_core_api::modification::FlowModification>,
1010    ) -> Result<(), String> {
1011        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1012            .await
1013    }
1014
1015    pub async fn resolve_intercept_with_modifications_from(
1016        &self,
1017        actor: AuditActor,
1018        key: String,
1019        action: &str,
1020        mods: Option<relay_core_api::modification::FlowModification>,
1021    ) -> Result<(), String> {
1022        let modified_fields = modification_field_names(mods.as_ref());
1023        let result = match action {
1024            "drop" => InterceptionResult::Drop,
1025            _ => match mods {
1026                None => InterceptionResult::Continue,
1027                Some(m) => {
1028                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1029                    match parts.as_slice() {
1030                        [flow_id, phase] => {
1031                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1032                                modification::apply_flow_modification(&flow, phase, m)
1033                            } else {
1034                                InterceptionResult::Continue
1035                            }
1036                        }
1037                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1038                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1039                        [_, "ws_msg", _] => {
1040                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1041                                modification::apply_ws_modification(&msg, m)
1042                            } else {
1043                                InterceptionResult::Continue
1044                            }
1045                        }
1046                        _ => InterceptionResult::Continue,
1047                    }
1048                }
1049            },
1050        };
1051        let outcome = self.resolve_intercept(key.clone(), result).await;
1052        let audit_outcome = if outcome.is_ok() {
1053            AuditOutcome::Success
1054        } else {
1055            AuditOutcome::Failed
1056        };
1057        let error_message = outcome.as_ref().err().cloned();
1058
1059        self.record_audit_event(AuditEvent::new(
1060            actor,
1061            AuditEventKind::InterceptResolved,
1062            key,
1063            audit_outcome,
1064            json!({
1065                "action": action,
1066                "has_modifications": !modified_fields.is_empty(),
1067                "modified_fields": modified_fields,
1068                "error": error_message
1069            }),
1070        ));
1071        outcome
1072    }
1073
1074    #[cfg(feature = "script")]
1075    pub async fn load_script_from(
1076        &self,
1077        actor: AuditActor,
1078        target: String,
1079        script: &str,
1080    ) -> Result<(), String> {
1081        let result = self
1082            .script_interceptor
1083            .load_script(script)
1084            .await
1085            .map_err(|e| e.to_string());
1086
1087        let outcome = if result.is_ok() {
1088            AuditOutcome::Success
1089        } else {
1090            AuditOutcome::Failed
1091        };
1092        let error_message = result.as_ref().err().cloned();
1093
1094        self.record_audit_event(AuditEvent::new(
1095            actor,
1096            AuditEventKind::ScriptReloaded,
1097            target,
1098            outcome,
1099            json!({
1100                "script_bytes": script.len(),
1101                "error": error_message
1102            }),
1103        ));
1104
1105        result
1106    }
1107
1108    fn record_audit_event(&self, event: AuditEvent) {
1109        const AUDIT_HISTORY_LIMIT: usize = 200;
1110        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1111        if event.outcome == AuditOutcome::Failed {
1112            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1113        }
1114
1115        let details = event.details.to_string();
1116        tracing::info!(
1117            target: "relay_core_audit",
1118            event_id = %event.id,
1119            actor = %event.actor.as_str(),
1120            kind = %event.kind.as_str(),
1121            target = %event.target,
1122            outcome = %event.outcome.as_str(),
1123            details = %details
1124        );
1125
1126        if let Ok(mut history) = self.audit_history.lock() {
1127            if history.len() >= AUDIT_HISTORY_LIMIT {
1128                history.pop_front();
1129            }
1130            history.push_back(event.clone());
1131        }
1132
1133        if let Some(store) = self.store.clone() {
1134            let event_json = serde_json::to_value(&event).unwrap_or_default();
1135            let event_id = event.id.clone();
1136            let timestamp_ms = event.timestamp_ms;
1137            let actor = event.actor.as_str().to_string();
1138            let kind = event.kind.as_str().to_string();
1139            let target = event.target.clone();
1140            let outcome = event.outcome.as_str().to_string();
1141            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1142                handle.spawn(async move {
1143                    if let Err(e) = store
1144                        .save_audit_event(AuditEventRecord {
1145                            id: &event_id,
1146                            timestamp_ms,
1147                            actor: &actor,
1148                            kind: &kind,
1149                            target: &target,
1150                            outcome: &outcome,
1151                            content: &event_json,
1152                        })
1153                        .await
1154                    {
1155                        tracing::error!("Failed to persist audit event: {}", e);
1156                    }
1157                });
1158            }
1159        }
1160
1161        let _ = self.audit_broadcast_tx.send(event);
1162    }
1163
1164    fn transition_to_running(&self, port: u16) {
1165        self.update_lifecycle(RuntimeLifecycle {
1166            phase: RuntimeLifecyclePhase::Running,
1167            port: Some(port),
1168            started_at_ms: Some(now_unix_ms()),
1169            last_error: None,
1170        });
1171    }
1172
1173    fn transition_to_stopped(&self) {
1174        if let Ok(mut guard) = self.shutdown_tx.lock() {
1175            *guard = None;
1176        }
1177
1178        self.update_lifecycle(RuntimeLifecycle {
1179            phase: RuntimeLifecyclePhase::Stopped,
1180            port: None,
1181            started_at_ms: None,
1182            last_error: None,
1183        });
1184    }
1185
1186    fn transition_to_failed(&self, port: u16, error: String) {
1187        if let Ok(mut guard) = self.shutdown_tx.lock() {
1188            *guard = None;
1189        }
1190
1191        self.update_lifecycle(RuntimeLifecycle {
1192            phase: RuntimeLifecyclePhase::Failed,
1193            port: Some(port),
1194            started_at_ms: None,
1195            last_error: Some(error),
1196        });
1197    }
1198
1199    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1200        tracing::info!(
1201            target: "relay_core_lifecycle",
1202            phase = %lifecycle.phase.as_str(),
1203            port = ?lifecycle.port,
1204            started_at_ms = ?lifecycle.started_at_ms,
1205            last_error = ?lifecycle.last_error
1206        );
1207        self.lifecycle_tx.send_replace(lifecycle);
1208    }
1209
1210    pub fn spawn_proxy(
1211        self: &Arc<Self>,
1212        config: ProxyConfig,
1213        sink: mpsc::Sender<FlowUpdate>,
1214        extra_interceptor: Option<Arc<dyn Interceptor>>,
1215    ) -> Result<ProxySpawnResult, String> {
1216        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1217        match self.prepare_start(config.port, shutdown_tx) {
1218            Ok(()) => {}
1219            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1220            Err(error) => return Err(error),
1221        }
1222        let state = self.clone();
1223        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1224            if let Err(error) = state
1225                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1226                .await
1227            {
1228                error!("Proxy failed: {}", error);
1229            }
1230        })))
1231    }
1232
1233    pub async fn start_proxy(
1234        self: &Arc<Self>,
1235        config: ProxyConfig,
1236        sink: mpsc::Sender<FlowUpdate>,
1237        extra_interceptor: Option<Arc<dyn Interceptor>>,
1238    ) -> Result<(), String> {
1239        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1240        self.prepare_start(config.port, shutdown_tx)?;
1241        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1242            .await
1243    }
1244
1245    async fn run_proxy(
1246        self: &Arc<Self>,
1247        config: ProxyConfig,
1248        sink: mpsc::Sender<FlowUpdate>,
1249        extra_interceptor: Option<Arc<dyn Interceptor>>,
1250        shutdown_rx: oneshot::Receiver<()>,
1251    ) -> Result<(), String> {
1252        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1253        let state = self.clone();
1254
1255        if let Some(parent) = config.ca_cert_path.parent()
1256            && !parent.exists()
1257        {
1258            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1259        }
1260
1261        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1262            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1263        let ca = Arc::new(ca);
1264
1265        #[cfg(feature = "script")]
1266        let script_interceptor = self.script_interceptor.clone();
1267
1268        #[cfg(feature = "script")]
1269        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1270
1271        #[cfg(not(feature = "script"))]
1272        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1273
1274        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1275            self.clone(),
1276        )));
1277
1278        if let Some(interceptor) = extra_interceptor {
1279            interceptors.push(interceptor);
1280        }
1281
1282        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1283        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1284
1285        tokio::spawn(async move {
1286            while let Some(update) = proxy_rx.recv().await {
1287                match update.clone() {
1288                    FlowUpdate::Full(flow) => {
1289                        state.upsert_flow(flow);
1290                    }
1291                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1292                        state.append_ws_message(flow_id, message);
1293                    }
1294                    FlowUpdate::HttpBody {
1295                        flow_id,
1296                        direction,
1297                        body,
1298                    } => {
1299                        state.update_http_body(flow_id, body, direction);
1300                    }
1301                }
1302
1303                let _ = state.flow_broadcast_tx.send(update.clone());
1304
1305                if sink.try_send(update).is_err() {
1306                    relay_core_lib::metrics::inc_flows_dropped();
1307                }
1308            }
1309        });
1310
1311        if let Some(udp_port) = config.udp_tproxy_port {
1312            let udp_proxy_tx = proxy_tx.clone();
1313            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1314            tokio::spawn(async move {
1315                match tokio::net::UdpSocket::bind(udp_addr).await {
1316                    Ok(socket) => {
1317                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1318                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1319                            error!("UDP TPROXY failed: {}", e);
1320                        }
1321                    }
1322                    Err(e) => {
1323                        error!("Failed to bind UDP TPROXY socket: {}", e);
1324                    }
1325                }
1326            });
1327        }
1328
1329        let listener = match TcpListener::bind(addr).await {
1330            Ok(listener) => listener,
1331            Err(e) => {
1332                if let Ok(mut guard) = self.shutdown_tx.lock() {
1333                    *guard = None;
1334                }
1335                let message = format!("Failed to bind to address {}: {}", addr, e);
1336                self.transition_to_failed(config.port, message.clone());
1337                return Err(message);
1338            }
1339        };
1340
1341        self.transition_to_running(config.port);
1342        let shutdown_rx = Some(shutdown_rx);
1343
1344        if config.transparent {
1345            let policy = ProxyPolicy {
1346                transparent_enabled: true,
1347                ..Default::default()
1348            };
1349            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1350            let policy_rx = self.policy_tx.subscribe();
1351
1352            let provider: Arc<dyn OriginalDstProvider> = {
1353                let mut addrs = BTreeSet::new();
1354                if let Ok(local) = listener.local_addr() {
1355                    addrs.insert(local);
1356                }
1357
1358                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1359                {
1360                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1361                }
1362                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1363                {
1364                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1365                        Ok(provider) => Arc::new(provider),
1366                        Err(e) => {
1367                            error!("Failed to initialize macOS PF provider: {}", e);
1368                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1369                        }
1370                    }
1371                }
1372                #[cfg(target_os = "windows")]
1373                {
1374                    let filter =
1375                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1376                            .to_string();
1377                    let port = config.port;
1378                    tokio::spawn(async move {
1379                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1380                            .await;
1381                    });
1382
1383                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1384                }
1385                #[cfg(not(any(
1386                    all(target_os = "linux", feature = "transparent-linux"),
1387                    all(target_os = "macos", feature = "transparent-macos"),
1388                    target_os = "windows"
1389                )))]
1390                {
1391                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1392                }
1393            };
1394
1395            let source = TransparentTcpCaptureSource::new(listener, provider);
1396            let result = relay_core_lib::start_proxy(
1397                source,
1398                proxy_tx,
1399                interceptor,
1400                ca,
1401                policy_rx,
1402                None,
1403                shutdown_rx,
1404            )
1405            .await
1406            .map_err(|e| e.to_string());
1407            if let Err(error) = &result {
1408                self.transition_to_failed(config.port, error.clone());
1409            } else {
1410                self.transition_to_stopped();
1411            }
1412            result
1413        } else {
1414            let source = TcpCaptureSource::new(listener);
1415            let policy = ProxyPolicy::default();
1416            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1417            let policy_rx = self.policy_tx.subscribe();
1418
1419            let result = relay_core_lib::start_proxy(
1420                source,
1421                proxy_tx,
1422                interceptor,
1423                ca,
1424                policy_rx,
1425                None,
1426                shutdown_rx,
1427            )
1428            .await
1429            .map_err(|e| e.to_string());
1430            if let Err(error) = &result {
1431                self.transition_to_failed(config.port, error.clone());
1432            } else {
1433                self.transition_to_stopped();
1434            }
1435            result
1436        }
1437    }
1438}
1439
1440fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1441    match update {
1442        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1443        FlowUpdate::WebSocketMessage {
1444            flow_id,
1445            mut message,
1446        } => {
1447            message.content = redact_body(message.content, redaction);
1448            FlowUpdate::WebSocketMessage { flow_id, message }
1449        }
1450        FlowUpdate::HttpBody {
1451            flow_id,
1452            direction,
1453            body,
1454        } => FlowUpdate::HttpBody {
1455            flow_id,
1456            direction,
1457            body: redact_body(body, redaction),
1458        },
1459    }
1460}
1461
1462fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1463    if !redaction.enabled {
1464        return flow;
1465    }
1466    match &mut flow.layer {
1467        Layer::Http(http) => {
1468            redact_http_request(&mut http.request, redaction);
1469            if let Some(response) = &mut http.response {
1470                redact_headers(&mut response.headers, redaction);
1471                response.body = response
1472                    .body
1473                    .take()
1474                    .map(|body| redact_body(body, redaction));
1475            }
1476        }
1477        Layer::WebSocket(ws) => {
1478            redact_http_request(&mut ws.handshake_request, redaction);
1479            redact_headers(&mut ws.handshake_response.headers, redaction);
1480            ws.handshake_response.body = ws
1481                .handshake_response
1482                .body
1483                .take()
1484                .map(|body| redact_body(body, redaction));
1485            for message in &mut ws.messages {
1486                message.content = redact_body(message.content.clone(), redaction);
1487            }
1488        }
1489        _ => {}
1490    }
1491    flow
1492}
1493
1494fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1495    if !redaction.enabled {
1496        return summary;
1497    }
1498    summary.url = redact_url_string(&summary.url, redaction);
1499    summary
1500}
1501
1502fn redact_http_request(
1503    request: &mut relay_core_api::flow::HttpRequest,
1504    redaction: &RedactionPolicy,
1505) {
1506    redact_headers(&mut request.headers, redaction);
1507    redact_query_pairs(&mut request.query, redaction);
1508    request.url = redact_url(&request.url, redaction);
1509    request.body = request.body.take().map(|body| redact_body(body, redaction));
1510}
1511
1512fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1513    if !redaction.enabled {
1514        return;
1515    }
1516    let sensitive = redaction_set(&redaction.sensitive_header_names);
1517    for (name, value) in headers.iter_mut() {
1518        if sensitive.contains(&name.to_ascii_lowercase()) {
1519            *value = "[REDACTED]".to_string();
1520        }
1521    }
1522}
1523
1524fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1525    if !redaction.enabled {
1526        return;
1527    }
1528    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1529    for (name, value) in query.iter_mut() {
1530        if sensitive.contains(&name.to_ascii_lowercase()) {
1531            *value = "[REDACTED]".to_string();
1532        }
1533    }
1534}
1535
1536fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1537    if !redaction.enabled {
1538        return url.clone();
1539    }
1540    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1541    let pairs: Vec<(String, String)> = url
1542        .query_pairs()
1543        .map(|(k, v)| {
1544            let key = k.to_string();
1545            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1546                "[REDACTED]".to_string()
1547            } else {
1548                v.to_string()
1549            };
1550            (key, value)
1551        })
1552        .collect();
1553    let mut next = url.clone();
1554    if pairs.is_empty() {
1555        return next;
1556    }
1557    next.query_pairs_mut().clear();
1558    for (k, v) in pairs {
1559        next.query_pairs_mut().append_pair(&k, &v);
1560    }
1561    next
1562}
1563
1564fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1565    match url::Url::parse(input) {
1566        Ok(url) => redact_url(&url, redaction).to_string(),
1567        Err(_) => input.to_string(),
1568    }
1569}
1570
1571fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1572    if redaction.enabled && redaction.redact_bodies {
1573        body.content = "[REDACTED]".to_string();
1574    }
1575    body
1576}
1577
1578fn redaction_set(values: &[String]) -> HashSet<String> {
1579    values
1580        .iter()
1581        .map(|value| value.to_ascii_lowercase())
1582        .collect()
1583}
1584
1585#[derive(Clone)]
1586pub struct ProxyConfig {
1587    pub port: u16,
1588    pub ca_cert_path: std::path::PathBuf,
1589    pub ca_key_path: std::path::PathBuf,
1590    pub transparent: bool,
1591    pub udp_tproxy_port: Option<u16>,
1592}
1593
1594impl ProxyConfig {
1595    pub fn new(
1596        port: u16,
1597        ca_cert_path: std::path::PathBuf,
1598        ca_key_path: std::path::PathBuf,
1599    ) -> Self {
1600        Self {
1601            port,
1602            ca_cert_path,
1603            ca_key_path,
1604            transparent: false,
1605            udp_tproxy_port: None,
1606        }
1607    }
1608
1609    pub fn from_app_data_dir(
1610        app_data_dir: impl Into<std::path::PathBuf>,
1611        port: u16,
1612    ) -> Result<Self, String> {
1613        let app_data_dir = app_data_dir.into();
1614        if !app_data_dir.exists() {
1615            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1616        }
1617
1618        Ok(Self::new(
1619            port,
1620            app_data_dir.join("ca_cert.pem"),
1621            app_data_dir.join("ca_key.pem"),
1622        ))
1623    }
1624
1625    pub fn with_transparent(mut self, transparent: bool) -> Self {
1626        self.transparent = transparent;
1627        self
1628    }
1629
1630    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1631        self.udp_tproxy_port = udp_tproxy_port;
1632        self
1633    }
1634}
1635
1636fn now_unix_ms() -> u64 {
1637    SystemTime::now()
1638        .duration_since(UNIX_EPOCH)
1639        .unwrap_or_default()
1640        .as_millis() as u64
1641}
1642
1643fn modification_field_names(
1644    mods: Option<&relay_core_api::modification::FlowModification>,
1645) -> Vec<&'static str> {
1646    let Some(mods) = mods else {
1647        return Vec::new();
1648    };
1649
1650    let mut fields = Vec::new();
1651    if mods.method.is_some() {
1652        fields.push("method");
1653    }
1654    if mods.url.is_some() {
1655        fields.push("url");
1656    }
1657    if mods.request_headers.is_some() {
1658        fields.push("request_headers");
1659    }
1660    if mods.request_body.is_some() {
1661        fields.push("request_body");
1662    }
1663    if mods.status_code.is_some() {
1664        fields.push("status_code");
1665    }
1666    if mods.response_headers.is_some() {
1667        fields.push("response_headers");
1668    }
1669    if mods.response_body.is_some() {
1670        fields.push("response_body");
1671    }
1672    if mods.message_content.is_some() {
1673        fields.push("message_content");
1674    }
1675    fields
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680    use super::*;
1681    use chrono::Utc;
1682    use relay_core_api::flow::{
1683        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1684        ResponseTiming, TransportProtocol,
1685    };
1686    use std::sync::atomic::{AtomicU64, Ordering};
1687    use tokio::time::{Duration, sleep};
1688    use url::Url;
1689    use uuid::Uuid;
1690
1691    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1692
1693    fn sqlite_url() -> String {
1694        let nanos = SystemTime::now()
1695            .duration_since(UNIX_EPOCH)
1696            .expect("clock drift")
1697            .as_nanos();
1698        let pid = std::process::id();
1699        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1700        let db_dir = std::env::current_dir()
1701            .expect("cwd")
1702            .join("target")
1703            .join("test-dbs");
1704        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1705        let db_path = db_dir.join(format!(
1706            "relay-core-runtime-test-{}-{}-{}.db",
1707            pid, nanos, seq
1708        ));
1709        format!("sqlite://{}?mode=rwc", db_path.display())
1710    }
1711
1712    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1713        let start_time =
1714            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1715        let request_url =
1716            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1717        Flow {
1718            id: Uuid::new_v4(),
1719            start_time,
1720            end_time: Some(start_time),
1721            network: NetworkInfo {
1722                client_ip: "127.0.0.1".to_string(),
1723                client_port: 12000,
1724                server_ip: "127.0.0.1".to_string(),
1725                server_port: 8080,
1726                protocol: TransportProtocol::TCP,
1727                tls: false,
1728                tls_version: None,
1729                sni: None,
1730            },
1731            layer: Layer::Http(HttpLayer {
1732                request: HttpRequest {
1733                    method: method.to_string(),
1734                    url: request_url,
1735                    version: "HTTP/1.1".to_string(),
1736                    headers: vec![],
1737                    cookies: vec![],
1738                    query: vec![],
1739                    body: None,
1740                },
1741                response: Some(HttpResponse {
1742                    status,
1743                    status_text: "OK".to_string(),
1744                    version: "HTTP/1.1".to_string(),
1745                    headers: vec![],
1746                    cookies: vec![],
1747                    body: None,
1748                    timing: ResponseTiming {
1749                        time_to_first_byte: None,
1750                        time_to_last_byte: None,
1751                        connect_time_ms: None,
1752                        ssl_time_ms: None,
1753                    },
1754                }),
1755                error: None,
1756            }),
1757            tags: vec![],
1758            meta: std::collections::HashMap::new(),
1759        }
1760    }
1761
1762    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1763        let start_time =
1764            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1765        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1766            .expect("url should parse");
1767        Flow {
1768            id: Uuid::new_v4(),
1769            start_time,
1770            end_time: Some(start_time),
1771            network: NetworkInfo {
1772                client_ip: "127.0.0.1".to_string(),
1773                client_port: 12000,
1774                server_ip: "127.0.0.1".to_string(),
1775                server_port: 8080,
1776                protocol: TransportProtocol::TCP,
1777                tls: false,
1778                tls_version: None,
1779                sni: None,
1780            },
1781            layer: Layer::Http(HttpLayer {
1782                request: HttpRequest {
1783                    method: "GET".to_string(),
1784                    url: request_url,
1785                    version: "HTTP/1.1".to_string(),
1786                    headers: vec![
1787                        (
1788                            "Authorization".to_string(),
1789                            "Bearer secret-token".to_string(),
1790                        ),
1791                        ("X-Normal".to_string(), "visible".to_string()),
1792                    ],
1793                    cookies: vec![],
1794                    query: vec![
1795                        ("token".to_string(), "abc123".to_string()),
1796                        ("ok".to_string(), "1".to_string()),
1797                    ],
1798                    body: Some(BodyData {
1799                        encoding: "utf-8".to_string(),
1800                        content: "secret request body".to_string(),
1801                        size: 19,
1802                    }),
1803                },
1804                response: Some(HttpResponse {
1805                    status: 200,
1806                    status_text: "OK".to_string(),
1807                    version: "HTTP/1.1".to_string(),
1808                    headers: vec![
1809                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1810                        ("X-Response".to_string(), "visible".to_string()),
1811                    ],
1812                    cookies: vec![],
1813                    body: Some(BodyData {
1814                        encoding: "utf-8".to_string(),
1815                        content: "secret response body".to_string(),
1816                        size: 20,
1817                    }),
1818                    timing: ResponseTiming {
1819                        time_to_first_byte: None,
1820                        time_to_last_byte: None,
1821                        connect_time_ms: None,
1822                        ssl_time_ms: None,
1823                    },
1824                }),
1825                error: None,
1826            }),
1827            tags: vec![],
1828            meta: std::collections::HashMap::new(),
1829        }
1830    }
1831
1832    #[tokio::test]
1833    async fn set_rules_from_records_audit_event() {
1834        let state = CoreState::new(None).await;
1835
1836        state
1837            .set_rules_from(
1838                AuditActor::Http,
1839                "rule.upsert",
1840                "rule-1".to_string(),
1841                json!({ "route": "/api/v1/rules" }),
1842                Vec::new(),
1843            )
1844            .await
1845            .expect("set rules should succeed");
1846
1847        let events = state.recent_audit_events();
1848        let event = events.last().expect("audit event should exist");
1849        assert_eq!(event.actor, AuditActor::Http);
1850        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1851        assert_eq!(event.outcome, AuditOutcome::Success);
1852        assert_eq!(event.target, "rule-1");
1853        assert_eq!(event.details["operation"], "rule.upsert");
1854        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1855    }
1856
1857    #[tokio::test]
1858    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1859        let state = CoreState::new(None).await;
1860
1861        state
1862            .upsert_rule_from(
1863                AuditActor::Probe,
1864                "rule.upsert",
1865                "rule-1".to_string(),
1866                json!({ "tool": "set_rule" }),
1867                Rule {
1868                    id: "rule-1".to_string(),
1869                    name: "first".to_string(),
1870                    active: true,
1871                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1872                    priority: 1,
1873                    termination: relay_core_lib::rule::RuleTermination::Continue,
1874                    filter: relay_core_lib::rule::Filter::Url(
1875                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1876                    ),
1877                    actions: vec![],
1878                    constraints: None,
1879                },
1880            )
1881            .await
1882            .expect("initial upsert should succeed");
1883
1884        state
1885            .upsert_rule_from(
1886                AuditActor::Probe,
1887                "rule.upsert",
1888                "rule-1".to_string(),
1889                json!({ "tool": "set_rule" }),
1890                Rule {
1891                    id: "rule-1".to_string(),
1892                    name: "second".to_string(),
1893                    active: true,
1894                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1895                    priority: 2,
1896                    termination: relay_core_lib::rule::RuleTermination::Continue,
1897                    filter: relay_core_lib::rule::Filter::Url(
1898                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1899                    ),
1900                    actions: vec![],
1901                    constraints: None,
1902                },
1903            )
1904            .await
1905            .expect("replacement upsert should succeed");
1906
1907        let rules = state.get_rules().await;
1908        assert_eq!(rules.len(), 1);
1909        assert_eq!(rules[0].name, "second");
1910        let event = state
1911            .recent_audit_events()
1912            .last()
1913            .cloned()
1914            .expect("audit event");
1915        assert_eq!(event.details["operation"], "rule.upsert");
1916        assert_eq!(event.details["details"]["tool"], "set_rule");
1917    }
1918
1919    #[tokio::test]
1920    async fn delete_rule_from_returns_false_when_rule_missing() {
1921        let state = CoreState::new(None).await;
1922
1923        let deleted = state
1924            .delete_rule_from(
1925                AuditActor::Http,
1926                "rule.delete",
1927                "missing".to_string(),
1928                json!({ "route": "/api/v1/rules/{id}" }),
1929                "missing",
1930            )
1931            .await
1932            .expect("delete should not fail");
1933
1934        assert!(!deleted);
1935        assert!(state.recent_audit_events().is_empty());
1936    }
1937
1938    #[tokio::test]
1939    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1940        let state = CoreState::new(None).await;
1941
1942        let rule_id = state
1943            .create_mock_response_rule_from(
1944                AuditActor::Http,
1945                "api-mock-1".to_string(),
1946                json!({ "route": "/api/v1/mock", "status": 201 }),
1947                MockResponseRuleConfig {
1948                    rule_id: "api-mock-1".to_string(),
1949                    url_pattern: "example.com".to_string(),
1950                    name: "api-mock:example.com".to_string(),
1951                    status: 201,
1952                    content_type: "application/json".to_string(),
1953                    body: "{\"ok\":true}".to_string(),
1954                },
1955            )
1956            .await
1957            .expect("mock rule should be created");
1958
1959        assert_eq!(rule_id, "api-mock-1");
1960        let rules = state.get_rules().await;
1961        assert_eq!(rules.len(), 1);
1962        assert_eq!(rules[0].id, "api-mock-1");
1963        let event = state
1964            .recent_audit_events()
1965            .last()
1966            .cloned()
1967            .expect("audit event");
1968        assert_eq!(event.details["operation"], "rule.mock_create");
1969        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1970    }
1971
1972    #[tokio::test]
1973    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1974        let state = CoreState::new(None).await;
1975
1976        let rule_id = state
1977            .create_intercept_rule_from(
1978                AuditActor::Http,
1979                "intercept-1".to_string(),
1980                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1981                InterceptRuleConfig {
1982                    rule_id: "intercept-1".to_string(),
1983                    active: true,
1984                    url_pattern: "example.com".to_string(),
1985                    method: None,
1986                    phase: "request".to_string(),
1987                    name: "api-intercept:example.com".to_string(),
1988                    priority: 100,
1989                    termination: relay_core_lib::rule::RuleTermination::Stop,
1990                },
1991            )
1992            .await
1993            .expect("intercept rule should be created");
1994
1995        assert_eq!(rule_id, "intercept-1");
1996        let rules = state.get_rules().await;
1997        assert_eq!(rules.len(), 1);
1998        assert_eq!(rules[0].id, "intercept-1");
1999        assert_eq!(rules[0].name, "api-intercept:example.com");
2000        let event = state
2001            .recent_audit_events()
2002            .last()
2003            .cloned()
2004            .expect("audit event");
2005        assert_eq!(event.details["operation"], "rule.intercept_create");
2006        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2007    }
2008
2009    #[tokio::test]
2010    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2011        let state = CoreState::new(None).await;
2012
2013        state
2014            .upsert_legacy_intercept_rule_from(
2015                AuditActor::Tauri,
2016                "legacy-1".to_string(),
2017                json!({ "command": "set_intercept_rule" }),
2018                InterceptRule {
2019                    id: "legacy-1".to_string(),
2020                    active: true,
2021                    url_pattern: "example.com".to_string(),
2022                    method: None,
2023                    phase: "both".to_string(),
2024                },
2025            )
2026            .await
2027            .expect("initial family upsert should succeed");
2028        assert_eq!(state.get_rules().await.len(), 2);
2029
2030        state
2031            .upsert_legacy_intercept_rule_from(
2032                AuditActor::Tauri,
2033                "legacy-1".to_string(),
2034                json!({ "command": "set_intercept_rule" }),
2035                InterceptRule {
2036                    id: "legacy-1".to_string(),
2037                    active: true,
2038                    url_pattern: "example.org".to_string(),
2039                    method: Some("POST".to_string()),
2040                    phase: "request".to_string(),
2041                },
2042            )
2043            .await
2044            .expect("replacement family upsert should succeed");
2045
2046        let rules = state.get_rules().await;
2047        assert_eq!(rules.len(), 1);
2048        assert_eq!(rules[0].id, "legacy-1");
2049        let event = state
2050            .recent_audit_events()
2051            .last()
2052            .cloned()
2053            .expect("audit event");
2054        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2055        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2056    }
2057
2058    #[tokio::test]
2059    async fn resolve_intercept_failure_records_failed_audit_event() {
2060        let state = CoreState::new(None).await;
2061
2062        let result = state
2063            .resolve_intercept_with_modifications_from(
2064                AuditActor::Probe,
2065                "missing-flow:request".to_string(),
2066                "drop",
2067                None,
2068            )
2069            .await;
2070
2071        assert!(result.is_err());
2072        let events = state.recent_audit_events();
2073        let event = events.last().expect("audit event should exist");
2074        assert_eq!(event.actor, AuditActor::Probe);
2075        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2076        assert_eq!(event.outcome, AuditOutcome::Failed);
2077        assert_eq!(event.details["action"], "drop");
2078        assert!(
2079            event.details["error"]
2080                .as_str()
2081                .unwrap_or_default()
2082                .contains("Interception not found")
2083        );
2084    }
2085
2086    #[tokio::test]
2087    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2088        let state = CoreState::new(None).await;
2089        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2090
2091        state
2092            .prepare_start(8080, shutdown_tx)
2093            .expect("prepare start should succeed");
2094        let lifecycle = state.lifecycle();
2095        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2096        assert_eq!(lifecycle.port, Some(8080));
2097        assert!(lifecycle.started_at_ms.is_none());
2098        assert!(lifecycle.last_error.is_none());
2099
2100        assert_eq!(
2101            state.stop_proxy().expect("stop should succeed"),
2102            ProxyStopResult::Stopping
2103        );
2104        let lifecycle = state.lifecycle();
2105        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2106        assert_eq!(lifecycle.port, Some(8080));
2107        assert!(shutdown_rx.await.is_ok());
2108    }
2109
2110    #[tokio::test]
2111    async fn status_snapshot_derives_runtime_facing_fields() {
2112        let state = CoreState::new(None).await;
2113        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2114        state
2115            .prepare_start(8080, shutdown_tx)
2116            .expect("prepare start should succeed");
2117
2118        let status = state.status_snapshot();
2119        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2120        assert!(status.running);
2121        assert_eq!(status.port, Some(8080));
2122        assert!(status.uptime.is_none());
2123        assert!(status.last_error.is_none());
2124    }
2125
2126    #[tokio::test]
2127    async fn status_report_combines_status_and_metrics() {
2128        let state = CoreState::new(None).await;
2129        let report = state.status_report().await;
2130
2131        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2132        assert!(!report.status.running);
2133        assert_eq!(report.metrics.intercepts_pending, 0);
2134        assert_eq!(report.metrics.ws_pending_messages, 0);
2135        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2136        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2137        assert_eq!(report.metrics.audit_events_total, 0);
2138        assert_eq!(report.metrics.audit_events_failed, 0);
2139        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2140        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2141    }
2142
2143    #[test]
2144    fn proxy_config_new_and_transport_setters_preserve_values() {
2145        let config = ProxyConfig::new(
2146            8080,
2147            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2148            std::path::PathBuf::from("/tmp/ca_key.pem"),
2149        )
2150        .with_transparent(true)
2151        .with_udp_tproxy_port(Some(15000));
2152
2153        assert_eq!(config.port, 8080);
2154        assert_eq!(
2155            config.ca_cert_path,
2156            std::path::PathBuf::from("/tmp/ca_cert.pem")
2157        );
2158        assert_eq!(
2159            config.ca_key_path,
2160            std::path::PathBuf::from("/tmp/ca_key.pem")
2161        );
2162        assert!(config.transparent);
2163        assert_eq!(config.udp_tproxy_port, Some(15000));
2164    }
2165
2166    #[test]
2167    fn proxy_config_from_app_data_dir_creates_default_paths() {
2168        let unique = SystemTime::now()
2169            .duration_since(UNIX_EPOCH)
2170            .expect("clock drift")
2171            .as_nanos();
2172        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2173
2174        let config =
2175            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2176
2177        assert!(dir.exists());
2178        assert_eq!(config.port, 8899);
2179        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2180        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2181        assert!(!config.transparent);
2182        assert!(config.udp_tproxy_port.is_none());
2183    }
2184
2185    #[tokio::test]
2186    async fn intercept_snapshot_maps_pending_counts() {
2187        let state = CoreState::new(None).await;
2188        let snapshot = state.intercept_snapshot().await;
2189
2190        assert_eq!(snapshot.pending_count, 0);
2191        assert_eq!(snapshot.ws_pending_count, 0);
2192    }
2193
2194    #[tokio::test]
2195    async fn audit_snapshot_returns_latest_events_in_order() {
2196        let state = CoreState::new(None).await;
2197        state.record_audit_event(AuditEvent::new(
2198            AuditActor::Runtime,
2199            AuditEventKind::RuleChanged,
2200            "first",
2201            AuditOutcome::Success,
2202            json!({ "index": 1 }),
2203        ));
2204        state.record_audit_event(AuditEvent::new(
2205            AuditActor::Http,
2206            AuditEventKind::PolicyUpdated,
2207            "second",
2208            AuditOutcome::Success,
2209            json!({ "index": 2 }),
2210        ));
2211
2212        let snapshot = state.audit_snapshot(1);
2213
2214        assert_eq!(snapshot.events.len(), 1);
2215        assert_eq!(snapshot.events[0].target, "second");
2216        assert_eq!(snapshot.events[0].details["index"], 2);
2217    }
2218
2219    #[tokio::test]
2220    async fn query_audit_snapshot_filters_in_memory_events() {
2221        let state = CoreState::new(None).await;
2222        state.record_audit_event(AuditEvent::new(
2223            AuditActor::Http,
2224            AuditEventKind::RuleChanged,
2225            "rule-1",
2226            AuditOutcome::Success,
2227            json!({ "idx": 1 }),
2228        ));
2229        state.record_audit_event(AuditEvent::new(
2230            AuditActor::Probe,
2231            AuditEventKind::PolicyUpdated,
2232            "policy",
2233            AuditOutcome::Failed,
2234            json!({ "idx": 2 }),
2235        ));
2236
2237        let snapshot = state
2238            .query_audit_snapshot(CoreAuditQuery {
2239                actor: Some(AuditActor::Probe),
2240                kind: Some(AuditEventKind::PolicyUpdated),
2241                outcome: Some(AuditOutcome::Failed),
2242                limit: 10,
2243                ..Default::default()
2244            })
2245            .await;
2246
2247        assert_eq!(snapshot.events.len(), 1);
2248        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2249        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2250        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2251    }
2252
2253    #[tokio::test]
2254    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2255        let state = CoreState::new(Some(sqlite_url())).await;
2256        state.update_policy_from(
2257            AuditActor::Http,
2258            "policy".to_string(),
2259            ProxyPolicy {
2260                transparent_enabled: true,
2261                ..Default::default()
2262            },
2263        );
2264
2265        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2266        for _ in 0..10 {
2267            snapshot = state
2268                .query_audit_snapshot(CoreAuditQuery {
2269                    actor: Some(AuditActor::Http),
2270                    kind: Some(AuditEventKind::PolicyUpdated),
2271                    limit: 10,
2272                    ..Default::default()
2273                })
2274                .await;
2275            if !snapshot.events.is_empty() {
2276                break;
2277            }
2278            sleep(Duration::from_millis(20)).await;
2279        }
2280
2281        assert!(!snapshot.events.is_empty());
2282        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2283        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2284    }
2285
2286    #[tokio::test]
2287    async fn prepare_start_rejects_second_active_start() {
2288        let state = CoreState::new(None).await;
2289        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2290        state
2291            .prepare_start(8080, shutdown_tx)
2292            .expect("first start should succeed");
2293
2294        let (second_tx, _second_rx) = oneshot::channel();
2295        let error = state
2296            .prepare_start(8081, second_tx)
2297            .expect_err("second active start should be rejected");
2298        assert!(error.contains("already"));
2299    }
2300
2301    #[test]
2302    fn update_policy_records_audit_event() {
2303        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2304        let state = runtime.block_on(CoreState::new(None));
2305
2306        state.update_policy_from(
2307            AuditActor::Runtime,
2308            "policy".to_string(),
2309            ProxyPolicy {
2310                transparent_enabled: true,
2311                ..Default::default()
2312            },
2313        );
2314
2315        let events = state.recent_audit_events();
2316        let event = events.last().expect("audit event should exist");
2317        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2318        assert_eq!(event.outcome, AuditOutcome::Success);
2319        assert_eq!(event.details["transparent_enabled"], true);
2320    }
2321
2322    #[test]
2323    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2324        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2325        let state = runtime.block_on(CoreState::new(None));
2326        let original_timeout = state.policy_snapshot().request_timeout_ms;
2327
2328        state.patch_policy_from(
2329            AuditActor::Runtime,
2330            "policy.patch".to_string(),
2331            relay_core_api::policy::ProxyPolicyPatch {
2332                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2333                    enabled: Some(true),
2334                    redact_bodies: Some(true),
2335                    ..Default::default()
2336                }),
2337            },
2338        );
2339
2340        let policy = state.policy_snapshot();
2341        assert_eq!(policy.request_timeout_ms, original_timeout);
2342        assert!(policy.redaction.enabled);
2343        assert!(policy.redaction.redact_bodies);
2344
2345        let events = state.recent_audit_events();
2346        let event = events.last().expect("audit event should exist");
2347        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2348        assert_eq!(event.details["redaction_enabled"], true);
2349    }
2350
2351    #[tokio::test]
2352    async fn metrics_include_audit_and_lagged_event_counters() {
2353        let state = CoreState::new(None).await;
2354
2355        state.update_policy_from(
2356            AuditActor::Runtime,
2357            "policy".to_string(),
2358            ProxyPolicy::default(),
2359        );
2360        let _ = state
2361            .resolve_intercept_with_modifications_from(
2362                AuditActor::Probe,
2363                "missing-flow:request".to_string(),
2364                "drop",
2365                None,
2366            )
2367            .await;
2368
2369        state.record_flow_events_lagged(3);
2370        state.record_audit_events_lagged(5);
2371
2372        let metrics = state.get_metrics().await;
2373        assert_eq!(metrics.audit_events_total, 2);
2374        assert_eq!(metrics.audit_events_failed, 1);
2375        assert_eq!(metrics.flow_events_lagged_total, 3);
2376        assert_eq!(metrics.audit_events_lagged_total, 5);
2377    }
2378
2379    #[tokio::test]
2380    async fn prometheus_metrics_text_contains_observability_fields() {
2381        let state = CoreState::new(None).await;
2382        state.record_flow_events_lagged(2);
2383        state.record_audit_events_lagged(4);
2384
2385        let text = state.get_metrics_prometheus_text().await;
2386        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2387        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2388        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2389        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2390    }
2391
2392    #[tokio::test]
2393    async fn search_flows_uses_store_with_offset_pagination() {
2394        let state = CoreState::new(Some(sqlite_url())).await;
2395        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2396        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2397        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2398
2399        state.upsert_flow(Box::new(flow_a));
2400        state.upsert_flow(Box::new(flow_b));
2401        state.upsert_flow(Box::new(flow_c));
2402
2403        let mut baseline = Vec::new();
2404        for _ in 0..20 {
2405            baseline = state
2406                .search_flows(FlowQuery {
2407                    host: Some("api.example.com".to_string()),
2408                    path_contains: None,
2409                    method: None,
2410                    status_min: None,
2411                    status_max: None,
2412                    has_error: None,
2413                    is_websocket: None,
2414                    limit: Some(3),
2415                    offset: Some(0),
2416                })
2417                .await;
2418            if baseline.len() == 3 {
2419                break;
2420            }
2421            sleep(Duration::from_millis(20)).await;
2422        }
2423
2424        assert_eq!(baseline.len(), 3);
2425        let page = state
2426            .search_flows(FlowQuery {
2427                host: Some("api.example.com".to_string()),
2428                path_contains: None,
2429                method: None,
2430                status_min: None,
2431                status_max: None,
2432                has_error: None,
2433                is_websocket: None,
2434                limit: Some(1),
2435                offset: Some(1),
2436            })
2437            .await;
2438        assert_eq!(page.len(), 1);
2439        assert_eq!(page[0].id, baseline[1].id);
2440    }
2441
2442    #[tokio::test]
2443    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2444        let state = CoreState::new(Some(sqlite_url())).await;
2445        let first_flow = sample_http_flow(
2446            "persist.example.com",
2447            "/first",
2448            "GET",
2449            200,
2450            1_700_000_010_000,
2451        );
2452        let first_id = first_flow.id.to_string();
2453        state.upsert_flow(Box::new(first_flow));
2454        for i in 0..240 {
2455            state.upsert_flow(Box::new(sample_http_flow(
2456                "persist.example.com",
2457                &format!("/{}", i),
2458                "GET",
2459                200,
2460                1_700_000_020_000 + i,
2461            )));
2462        }
2463
2464        sleep(Duration::from_millis(200)).await;
2465
2466        let loaded = state.get_flow(first_id).await;
2467        assert!(loaded.is_some());
2468    }
2469
2470    #[tokio::test]
2471    async fn search_flows_redacts_summary_url_when_enabled() {
2472        let state = CoreState::new(None).await;
2473        state.update_policy_from(
2474            AuditActor::Runtime,
2475            "policy.redaction".to_string(),
2476            ProxyPolicy {
2477                redaction: RedactionPolicy {
2478                    enabled: true,
2479                    sensitive_query_keys: vec!["token".to_string()],
2480                    redact_bodies: false,
2481                    ..Default::default()
2482                },
2483                ..Default::default()
2484            },
2485        );
2486        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2487
2488        let mut items = Vec::new();
2489        for _ in 0..20 {
2490            items = state
2491                .search_flows(FlowQuery {
2492                    host: Some("api.example.com".to_string()),
2493                    path_contains: Some("/private".to_string()),
2494                    ..Default::default()
2495                })
2496                .await;
2497            if !items.is_empty() {
2498                break;
2499            }
2500            sleep(Duration::from_millis(20)).await;
2501        }
2502
2503        assert!(!items.is_empty());
2504        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2505        let token = redacted
2506            .query_pairs()
2507            .find(|(k, _)| k == "token")
2508            .map(|(_, v)| v.to_string());
2509        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2510    }
2511
2512    #[tokio::test]
2513    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2514        let state = CoreState::new(Some(sqlite_url())).await;
2515        state.update_policy_from(
2516            AuditActor::Runtime,
2517            "policy.redaction".to_string(),
2518            ProxyPolicy {
2519                redaction: RedactionPolicy {
2520                    enabled: true,
2521                    sensitive_header_names: vec![
2522                        "authorization".to_string(),
2523                        "set-cookie".to_string(),
2524                    ],
2525                    sensitive_query_keys: vec!["token".to_string()],
2526                    redact_bodies: true,
2527                },
2528                ..Default::default()
2529            },
2530        );
2531
2532        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2533        let flow_id = flow.id.to_string();
2534        state.upsert_flow(Box::new(flow));
2535        sleep(Duration::from_millis(80)).await;
2536
2537        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2538        let Layer::Http(http) = loaded.layer else {
2539            panic!("expected http layer");
2540        };
2541
2542        let auth = http
2543            .request
2544            .headers
2545            .iter()
2546            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2547            .map(|(_, v)| v.as_str());
2548        assert_eq!(auth, Some("[REDACTED]"));
2549
2550        let req_query_token = http
2551            .request
2552            .query
2553            .iter()
2554            .find(|(k, _)| k == "token")
2555            .map(|(_, v)| v.as_str());
2556        assert_eq!(req_query_token, Some("[REDACTED]"));
2557
2558        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2559        assert_eq!(req_body, Some("[REDACTED]"));
2560
2561        let response = http.response.expect("response should exist");
2562        let set_cookie = response
2563            .headers
2564            .iter()
2565            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2566            .map(|(_, v)| v.as_str());
2567        assert_eq!(set_cookie, Some("[REDACTED]"));
2568        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2569        assert_eq!(res_body, Some("[REDACTED]"));
2570    }
2571
2572    #[test]
2573    fn redact_flow_update_masks_http_body_when_enabled() {
2574        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2575        let state = runtime.block_on(CoreState::new(None));
2576        state.update_policy_from(
2577            AuditActor::Runtime,
2578            "policy.redaction".to_string(),
2579            ProxyPolicy {
2580                redaction: RedactionPolicy {
2581                    enabled: true,
2582                    redact_bodies: true,
2583                    ..Default::default()
2584                },
2585                ..Default::default()
2586            },
2587        );
2588
2589        let update = FlowUpdate::HttpBody {
2590            flow_id: "f-1".to_string(),
2591            direction: relay_core_api::flow::Direction::ClientToServer,
2592            body: BodyData {
2593                encoding: "utf-8".to_string(),
2594                content: "super-secret".to_string(),
2595                size: 12,
2596            },
2597        };
2598        let redacted = state.redact_flow_update_for_output(update);
2599        match redacted {
2600            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2601            _ => panic!("expected http body update"),
2602        }
2603    }
2604
2605    #[cfg(feature = "script")]
2606    #[tokio::test]
2607    async fn load_script_from_records_audit_event() {
2608        let state = CoreState::new(None).await;
2609
2610        state
2611            .load_script_from(
2612                AuditActor::Tauri,
2613                "tauri.load_script".to_string(),
2614                "globalThis.onRequestHeaders = (_flow) => {};",
2615            )
2616            .await
2617            .expect("script should load");
2618
2619        let events = state.recent_audit_events();
2620        let event = events.last().expect("audit event should exist");
2621        assert_eq!(event.actor, AuditActor::Tauri);
2622        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2623        assert_eq!(event.outcome, AuditOutcome::Success);
2624        assert_eq!(event.target, "tauri.load_script");
2625    }
2626}