Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use relay_core_api::flow::{BodyData, Direction, Flow, FlowUpdate, Layer, WebSocketMessage};
24use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
25#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
26use relay_core_lib::capture::LinuxOriginalDstProvider;
27#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
28use relay_core_lib::capture::MacOsOriginalDstProvider;
29#[cfg(target_os = "windows")]
30use relay_core_lib::capture::WindowsOriginalDstProvider;
31use relay_core_lib::capture::udp::UdpProxy;
32use relay_core_lib::capture::{OriginalDstProvider, TcpCaptureSource, TransparentTcpCaptureSource};
33use relay_core_lib::interceptor::{CompositeInterceptor, Interceptor};
34use relay_core_lib::tls::CertificateAuthority;
35#[cfg(feature = "script")]
36use relay_core_script::ScriptInterceptor;
37use std::collections::{BTreeSet, HashSet, VecDeque};
38use std::net::SocketAddr;
39use std::sync::Arc;
40use std::sync::Mutex;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tokio::net::TcpListener;
44use tokio::sync::{broadcast, mpsc, oneshot, watch};
45
46use tracing::error;
47
48use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
49use crate::rule::{
50    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
51    build_mock_response_rule,
52};
53use relay_core_api::modification::{FlowQuery, FlowSummary};
54use relay_core_lib::rule::Rule;
55use relay_core_lib::rule::engine::RuleEngine;
56use relay_core_storage::store::{AuditEventRecord, Store};
57use serde_json::json;
58
59pub mod actors;
60pub mod audit;
61pub mod interceptors;
62pub mod modification;
63pub mod rule;
64pub mod services;
65
66// ── Re-exports for user convenience ──
67// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
68pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77    pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82    pub flows_total: usize,
83    pub flows_in_memory: usize,
84    pub flows_dropped: usize,
85    pub intercepts_pending: usize,
86    pub ws_pending_messages: usize,
87    pub oldest_intercept_age_ms: Option<u64>,
88    pub oldest_ws_message_age_ms: Option<u64>,
89    pub rule_exec_errors: usize,
90    pub audit_events_total: usize,
91    pub audit_events_failed: usize,
92    pub flow_events_lagged_total: usize,
93    pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97    pub fn to_prometheus_text(&self) -> String {
98        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100        format!(
101            "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113            self.flows_total,
114            self.flows_in_memory,
115            self.flows_dropped,
116            self.intercepts_pending,
117            self.ws_pending_messages,
118            oldest_intercept_age_ms,
119            oldest_ws_message_age_ms,
120            self.rule_exec_errors,
121            self.audit_events_total,
122            self.audit_events_failed,
123            self.flow_events_lagged_total,
124            self.audit_events_lagged_total,
125        )
126    }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131    pub phase: RuntimeLifecyclePhase,
132    pub running: bool,
133    pub port: Option<u16>,
134    pub uptime: Option<u64>,
135    pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140    pub status: CoreStatusSnapshot,
141    pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146    pub pending_count: usize,
147    pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152    pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157    pub since_ms: Option<u64>,
158    pub until_ms: Option<u64>,
159    pub actor: Option<AuditActor>,
160    pub kind: Option<AuditEventKind>,
161    pub outcome: Option<AuditOutcome>,
162    pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166    fn default() -> Self {
167        Self {
168            since_ms: None,
169            until_ms: None,
170            actor: None,
171            kind: None,
172            outcome: None,
173            limit: 50,
174        }
175    }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181    Created,
182    Starting,
183    Running,
184    Stopping,
185    Stopped,
186    Failed,
187}
188
189impl RuntimeLifecyclePhase {
190    pub fn as_str(&self) -> &'static str {
191        match self {
192            Self::Created => "created",
193            Self::Starting => "starting",
194            Self::Running => "running",
195            Self::Stopping => "stopping",
196            Self::Stopped => "stopped",
197            Self::Failed => "failed",
198        }
199    }
200
201    pub fn is_active(&self) -> bool {
202        matches!(self, Self::Starting | Self::Running | Self::Stopping)
203    }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208    pub phase: RuntimeLifecyclePhase,
209    pub port: Option<u16>,
210    pub started_at_ms: Option<u64>,
211    pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215    pub fn created() -> Self {
216        Self {
217            phase: RuntimeLifecyclePhase::Created,
218            port: None,
219            started_at_ms: None,
220            last_error: None,
221        }
222    }
223
224    pub fn is_active(&self) -> bool {
225        self.phase.is_active()
226    }
227
228    pub fn uptime_seconds(&self) -> Option<u64> {
229        let started_at_ms = self.started_at_ms?;
230        let now_ms = now_unix_ms();
231        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232    }
233}
234
235pub enum ProxySpawnResult {
236    Started(tokio::task::JoinHandle<()>),
237    AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242    Stopping,
243    NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247    fn from(lifecycle: RuntimeLifecycle) -> Self {
248        Self {
249            running: lifecycle.is_active(),
250            uptime: lifecycle.uptime_seconds(),
251            port: lifecycle.port,
252            last_error: lifecycle.last_error,
253            phase: lifecycle.phase,
254        }
255    }
256}
257
258pub struct CoreState {
259    flow_store: mpsc::Sender<FlowStoreMessage>,
260    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261    rule_store: mpsc::Sender<RuleStoreMessage>,
262    store: Option<Store>,
263    #[cfg(feature = "script")]
264    pub script_interceptor: Arc<ScriptInterceptor>,
265    pub policy_tx: watch::Sender<ProxyPolicy>,
266    pub flows_dropped: Arc<AtomicUsize>,
267    audit_events_total: Arc<AtomicUsize>,
268    audit_events_failed: Arc<AtomicUsize>,
269    flow_events_lagged_total: Arc<AtomicUsize>,
270    audit_events_lagged_total: Arc<AtomicUsize>,
271    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
272    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280    pub async fn new(db_url: Option<String>) -> Self {
281        const AUDIT_HISTORY_LIMIT: usize = 200;
282
283        let store = if let Some(url) = db_url {
284            match Store::connect(&url).await {
285                Ok(s) => {
286                    if let Err(e) = s.init().await {
287                        tracing::error!("Failed to init store: {}", e);
288                    }
289                    Some(s)
290                }
291                Err(e) => {
292                    tracing::error!("Failed to connect to store: {}", e);
293                    None
294                }
295            }
296        } else {
297            None
298        };
299
300        let (flow_tx, flow_rx) = mpsc::channel(10000);
301        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302        tokio::spawn(flow_actor.run());
303
304        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306        tokio::spawn(intercept_actor.run());
307
308        let (rule_tx, rule_rx) = mpsc::channel(100);
309        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310        tokio::spawn(rule_actor.run());
311
312        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313        let (flow_broadcast_tx, _) = broadcast::channel(1000);
314        let (audit_broadcast_tx, _) = broadcast::channel(256);
315        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317        #[cfg(feature = "script")]
318        let script_interceptor = ScriptInterceptor::new()
319            .await
320            .expect("Failed to initialize ScriptInterceptor");
321        Self {
322            flow_store: flow_tx,
323            intercept_broker: intercept_tx,
324            rule_store: rule_tx,
325            store,
326            #[cfg(feature = "script")]
327            script_interceptor: Arc::new(script_interceptor),
328            policy_tx,
329            flows_dropped: Arc::new(AtomicUsize::new(0)),
330            audit_events_total: Arc::new(AtomicUsize::new(0)),
331            audit_events_failed: Arc::new(AtomicUsize::new(0)),
332            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
333            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
334            flow_broadcast_tx,
335            audit_broadcast_tx,
336            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
337            lifecycle_tx,
338            shutdown_tx: Mutex::new(None),
339        }
340    }
341
342    pub async fn get_metrics(&self) -> CoreMetrics {
343        let (flow_tx, flow_rx) = oneshot::channel();
344        let _ = self
345            .flow_store
346            .send(FlowStoreMessage::GetMetrics(flow_tx))
347            .await;
348        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
349
350        let (int_tx, int_rx) = oneshot::channel();
351        let _ = self
352            .intercept_broker
353            .send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx })
354            .await;
355        let (
356            intercepts_pending,
357            ws_pending_messages,
358            oldest_intercept_age_ms,
359            oldest_ws_message_age_ms,
360        ) = int_rx.await.unwrap_or((0, 0, None, None));
361
362        let (rule_tx, rule_rx) = oneshot::channel();
363        let _ = self
364            .rule_store
365            .send(RuleStoreMessage::GetMetrics(rule_tx))
366            .await;
367        let rule_exec_errors = rule_rx.await.unwrap_or(0);
368
369        CoreMetrics {
370            flows_total,
371            flows_in_memory,
372            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
373            intercepts_pending,
374            ws_pending_messages,
375            oldest_intercept_age_ms,
376            oldest_ws_message_age_ms,
377            rule_exec_errors,
378            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
379            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
380            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
381            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
382        }
383    }
384
385    pub async fn get_metrics_prometheus_text(&self) -> String {
386        let mut text = self.get_metrics().await.to_prometheus_text();
387        #[cfg(feature = "script")]
388        {
389            text.push_str(&self.script_interceptor.metrics.prometheus_lines());
390        }
391        text
392    }
393
394    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
395        self.lifecycle().into()
396    }
397
398    pub async fn status_report(&self) -> CoreStatusReport {
399        CoreStatusReport {
400            status: self.status_snapshot(),
401            metrics: self.get_metrics().await,
402        }
403    }
404
405    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
406        let metrics = self.get_metrics().await;
407        CoreInterceptSnapshot {
408            pending_count: metrics.intercepts_pending,
409            ws_pending_count: metrics.ws_pending_messages,
410        }
411    }
412
413    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
414        let events = self.recent_audit_events();
415        let start = events.len().saturating_sub(limit);
416        CoreAuditSnapshot {
417            events: events.into_iter().skip(start).collect(),
418        }
419    }
420
421    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
422        let limit = query.limit.clamp(1, 500);
423        if let Some(store) = &self.store {
424            let rows = store
425                .query_audit_events(
426                    query.since_ms,
427                    query.until_ms,
428                    query.actor.as_ref().map(AuditActor::as_str),
429                    query.kind.as_ref().map(AuditEventKind::as_str),
430                    query.outcome.as_ref().map(AuditOutcome::as_str),
431                    limit,
432                )
433                .await
434                .unwrap_or_default();
435
436            let mut events = Vec::with_capacity(rows.len());
437            for row in rows {
438                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
439                    events.push(event);
440                }
441            }
442            return CoreAuditSnapshot { events };
443        }
444
445        let mut events = self.recent_audit_events();
446        events.reverse();
447        let filtered = events
448            .into_iter()
449            .filter(|event| {
450                query
451                    .since_ms
452                    .map(|v| event.timestamp_ms >= v)
453                    .unwrap_or(true)
454            })
455            .filter(|event| {
456                query
457                    .until_ms
458                    .map(|v| event.timestamp_ms <= v)
459                    .unwrap_or(true)
460            })
461            .filter(|event| {
462                query
463                    .actor
464                    .as_ref()
465                    .map(|v| &event.actor == v)
466                    .unwrap_or(true)
467            })
468            .filter(|event| {
469                query
470                    .kind
471                    .as_ref()
472                    .map(|v| &event.kind == v)
473                    .unwrap_or(true)
474            })
475            .filter(|event| {
476                query
477                    .outcome
478                    .as_ref()
479                    .map(|v| &event.outcome == v)
480                    .unwrap_or(true)
481            })
482            .take(limit)
483            .collect();
484        CoreAuditSnapshot { events: filtered }
485    }
486
487    pub async fn get_flow(&self, id: String) -> Option<Flow> {
488        let (tx, rx) = oneshot::channel();
489        if let Err(e) = self
490            .flow_store
491            .send(FlowStoreMessage::GetFlow {
492                id: id.clone(),
493                respond_to: tx,
494            })
495            .await
496        {
497            error!("Failed to send GetFlow request: {}", e);
498            if let Some(store) = &self.store {
499                return store
500                    .load_flow(&id)
501                    .await
502                    .ok()
503                    .flatten()
504                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
505            }
506            return None;
507        }
508        let flow = rx
509            .await
510            .map_err(|e| {
511                error!("Failed to receive Flow response: {}", e);
512                e
513            })
514            .unwrap_or(None);
515        if let Some(flow) = flow {
516            return Some(redact_flow(flow, &self.current_redaction_policy()));
517        }
518        if let Some(store) = &self.store {
519            return store
520                .load_flow(&id)
521                .await
522                .ok()
523                .flatten()
524                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
525                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
526        }
527        None
528    }
529
530    pub async fn get_rules(&self) -> Vec<Rule> {
531        let (tx, rx) = oneshot::channel();
532        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
533            error!("Failed to send GetRules request: {}", e);
534            return Vec::new();
535        }
536        rx.await
537            .map_err(|e| {
538                error!("Failed to receive Rules response: {}", e);
539                e
540            })
541            .unwrap_or_default()
542    }
543
544    pub async fn set_rules(&self, rules: Vec<Rule>) {
545        let _ = self
546            .set_rules_from(
547                AuditActor::Runtime,
548                "rules.replace",
549                "rule_set".to_string(),
550                json!({}),
551                rules,
552            )
553            .await;
554    }
555
556    pub async fn upsert_rule_from(
557        &self,
558        actor: AuditActor,
559        operation: &str,
560        target: String,
561        details: serde_json::Value,
562        rule: Rule,
563    ) -> Result<(), String> {
564        let rule_id = rule.id.clone();
565        let mut rules = self.get_rules().await;
566        rules.retain(|existing| existing.id != rule_id);
567        rules.push(rule);
568        self.set_rules_from(actor, operation, target, details, rules)
569            .await
570    }
571
572    pub async fn delete_rule_from(
573        &self,
574        actor: AuditActor,
575        operation: &str,
576        target: String,
577        details: serde_json::Value,
578        rule_id: &str,
579    ) -> Result<bool, String> {
580        let mut rules = self.get_rules().await;
581        let before = rules.len();
582        rules.retain(|rule| rule.id != rule_id);
583        if rules.len() == before {
584            return Ok(false);
585        }
586        self.set_rules_from(actor, operation, target, details, rules)
587            .await
588            .map(|_| true)
589    }
590
591    pub async fn create_mock_response_rule_from(
592        &self,
593        actor: AuditActor,
594        target: String,
595        details: serde_json::Value,
596        config: MockResponseRuleConfig,
597    ) -> Result<String, String> {
598        let rule_id = config.rule_id.clone();
599        let rule = build_mock_response_rule(config);
600        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
601            .await
602            .map(|_| rule_id)
603    }
604
605    pub async fn create_intercept_rule_from(
606        &self,
607        actor: AuditActor,
608        target: String,
609        details: serde_json::Value,
610        config: InterceptRuleConfig,
611    ) -> Result<String, String> {
612        let rule_id = config.rule_id.clone();
613        let mut rules = self.get_rules().await;
614        rules.extend(build_intercept_rules(config));
615        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
616            .await
617            .map(|_| rule_id)
618    }
619
620    pub async fn upsert_legacy_intercept_rule_from(
621        &self,
622        actor: AuditActor,
623        target: String,
624        details: serde_json::Value,
625        rule: InterceptRule,
626    ) -> Result<(), String> {
627        let rule_id = rule.id.clone();
628        let mut rules = self.get_rules().await;
629        rules.retain(|existing| {
630            existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id))
631        });
632        rules.extend(rule.to_rules());
633        self.set_rules_from(
634            actor,
635            "rule.intercept_legacy_upsert",
636            target,
637            details,
638            rules,
639        )
640        .await
641    }
642
643    pub async fn set_rules_from(
644        &self,
645        actor: AuditActor,
646        operation: &str,
647        target: String,
648        details: serde_json::Value,
649        rules: Vec<Rule>,
650    ) -> Result<(), String> {
651        let rule_count = rules.len();
652        if let Err(e) = self
653            .rule_store
654            .send(RuleStoreMessage::SetRules(rules))
655            .await
656        {
657            error!("Failed to send SetRules request: {}", e);
658            self.record_audit_event(AuditEvent::new(
659                actor,
660                AuditEventKind::RuleChanged,
661                target,
662                AuditOutcome::Failed,
663                json!({
664                    "operation": operation,
665                    "rule_count": rule_count,
666                    "details": details,
667                    "error": e.to_string()
668                }),
669            ));
670            return Err(e.to_string());
671        }
672
673        self.record_audit_event(AuditEvent::new(
674            actor,
675            AuditEventKind::RuleChanged,
676            target,
677            AuditOutcome::Success,
678            json!({
679                "operation": operation,
680                "rule_count": rule_count,
681                "details": details
682            }),
683        ));
684        Ok(())
685    }
686
687    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
688        let mut new_rules = Vec::new();
689        for rule in rules {
690            new_rules.extend(rule.to_rules());
691        }
692        self.set_rules(new_rules).await;
693    }
694
695    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
696        let (tx, rx) = oneshot::channel();
697        if let Err(e) = self
698            .rule_store
699            .send(RuleStoreMessage::GetRuleEngine(tx))
700            .await
701        {
702            error!("Failed to send GetRuleEngine request: {}", e);
703            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
704        }
705        rx.await
706            .map_err(|e| {
707                error!("Failed to receive RuleEngine response: {}", e);
708                e
709            })
710            .unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
711    }
712
713    pub fn update_policy(&self, policy: ProxyPolicy) {
714        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
715    }
716
717    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
718        let mut policy = self.policy_snapshot();
719        policy.apply_patch(patch);
720        self.update_policy_from(actor, target, policy);
721    }
722
723    pub fn policy_snapshot(&self) -> ProxyPolicy {
724        self.policy_tx.borrow().clone()
725    }
726
727    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
728        let details = json!({
729            "strict_http_semantics": policy.strict_http_semantics,
730            "request_timeout_ms": policy.request_timeout_ms,
731            "max_body_size": policy.max_body_size,
732            "transparent_enabled": policy.transparent_enabled,
733            "redaction_enabled": policy.redaction.enabled,
734            "redact_bodies": policy.redaction.redact_bodies
735        });
736
737        self.policy_tx.send_replace(policy);
738        self.record_audit_event(AuditEvent::new(
739            actor,
740            AuditEventKind::PolicyUpdated,
741            target,
742            AuditOutcome::Success,
743            details,
744        ));
745    }
746
747    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
748        if let Err(e) = self
749            .intercept_broker
750            .send(InterceptBrokerMessage::RegisterIntercept { key, tx })
751            .await
752        {
753            error!("Failed to send RegisterIntercept request: {}", e);
754        }
755    }
756
757    pub async fn resolve_intercept(
758        &self,
759        key: String,
760        result: InterceptionResult,
761    ) -> Result<(), String> {
762        let (tx, rx) = oneshot::channel();
763        if let Err(e) = self
764            .intercept_broker
765            .send(InterceptBrokerMessage::ResolveIntercept {
766                key,
767                result,
768                respond_to: tx,
769            })
770            .await
771        {
772            error!("Failed to send ResolveIntercept request: {}", e);
773            return Err(e.to_string());
774        }
775        rx.await.map_err(|_| "Actor dropped".to_string())?
776    }
777
778    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
779        let (tx, rx) = oneshot::channel();
780        if let Err(e) = self
781            .intercept_broker
782            .send(InterceptBrokerMessage::GetPendingWebSocketMessage {
783                key,
784                respond_to: tx,
785            })
786            .await
787        {
788            error!("Failed to send GetPendingWebSocketMessage request: {}", e);
789            return None;
790        }
791        rx.await
792            .map_err(|e| {
793                error!("Failed to receive WebSocketMessage response: {}", e);
794                e
795            })
796            .unwrap_or(None)
797    }
798
799    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
800        if let Err(e) = self
801            .intercept_broker
802            .send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message })
803            .await
804        {
805            error!("Failed to send SetPendingWebSocketMessage request: {}", e);
806        }
807    }
808
809    pub async fn is_intercept_pending(&self, key: String) -> bool {
810        let (tx, rx) = oneshot::channel();
811        if let Err(e) = self
812            .intercept_broker
813            .send(InterceptBrokerMessage::GetPendingIntercept {
814                key,
815                respond_to: tx,
816            })
817            .await
818        {
819            error!("Failed to send GetPendingIntercept request: {}", e);
820            return false;
821        }
822        rx.await
823            .map_err(|e| {
824                error!("Failed to receive PendingIntercept response: {}", e);
825                e
826            })
827            .unwrap_or(false)
828    }
829
830    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
831        let (tx, rx) = oneshot::channel();
832        if let Err(e) = self
833            .intercept_broker
834            .send(InterceptBrokerMessage::GetPendingInterceptByFlowId {
835                flow_id,
836                respond_to: tx,
837            })
838            .await
839        {
840            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
841            return false;
842        }
843        rx.await
844            .map_err(|e| {
845                error!("Failed to receive PendingInterceptByFlowId response: {}", e);
846                e
847            })
848            .unwrap_or(false)
849    }
850
851    pub fn upsert_flow(&self, flow: Box<Flow>) {
852        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
853            // Drop flow if channel is full
854            error!("FlowStore dropped flow: {}", e);
855            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
856        }
857    }
858
859    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
860        if let Err(e) = self
861            .flow_store
862            .try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message })
863        {
864            error!("FlowStore dropped WS message: {}", e);
865            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
866        }
867    }
868
869    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
870        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody {
871            flow_id,
872            body,
873            direction,
874        }) {
875            error!("FlowStore dropped HTTP body: {}", e);
876            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
877        }
878    }
879
880    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
881    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
882        self.flow_broadcast_tx.subscribe()
883    }
884
885    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
886        self.audit_broadcast_tx.subscribe()
887    }
888
889    fn current_redaction_policy(&self) -> RedactionPolicy {
890        self.policy_tx.borrow().redaction.clone()
891    }
892
893    pub fn record_flow_events_lagged(&self, skipped: u64) {
894        self.flow_events_lagged_total
895            .fetch_add(skipped as usize, Ordering::Relaxed);
896    }
897
898    pub fn record_audit_events_lagged(&self, skipped: u64) {
899        self.audit_events_lagged_total
900            .fetch_add(skipped as usize, Ordering::Relaxed);
901    }
902
903    pub fn lifecycle(&self) -> RuntimeLifecycle {
904        self.lifecycle_tx.borrow().clone()
905    }
906
907    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
908        self.lifecycle_tx.subscribe()
909    }
910
911    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
912        let current = self.lifecycle();
913        if current.is_active() {
914            return Err(format!(
915                "Proxy is already {} on port {}",
916                current.phase.as_str(),
917                current.port.unwrap_or(port)
918            ));
919        }
920
921        let mut guard = self
922            .shutdown_tx
923            .lock()
924            .map_err(|_| "shutdown state poisoned".to_string())?;
925        *guard = Some(shutdown_tx);
926        drop(guard);
927
928        self.update_lifecycle(RuntimeLifecycle {
929            phase: RuntimeLifecyclePhase::Starting,
930            port: Some(port),
931            started_at_ms: None,
932            last_error: None,
933        });
934        Ok(())
935    }
936
937    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
938        let mut guard = self
939            .shutdown_tx
940            .lock()
941            .map_err(|_| "shutdown state poisoned".to_string())?;
942        let Some(tx) = guard.take() else {
943            return Ok(ProxyStopResult::NotRunning);
944        };
945        drop(guard);
946
947        let current = self.lifecycle();
948        self.update_lifecycle(RuntimeLifecycle {
949            phase: RuntimeLifecyclePhase::Stopping,
950            port: current.port,
951            started_at_ms: current.started_at_ms,
952            last_error: current.last_error,
953        });
954        let _ = tx.send(());
955        Ok(ProxyStopResult::Stopping)
956    }
957
958    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
959        self.audit_history
960            .lock()
961            .map(|events| events.iter().cloned().collect())
962            .unwrap_or_default()
963    }
964
965    /// 搜索内存中的 Flow 列表,返回轻量摘要。
966    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
967        let redaction = self.current_redaction_policy();
968        if let Some(store) = &self.store {
969            return store
970                .query_flow_summaries(&query)
971                .await
972                .unwrap_or_default()
973                .into_iter()
974                .map(|summary| redact_flow_summary(summary, &redaction))
975                .collect();
976        }
977        let (tx, rx) = oneshot::channel();
978        if let Err(e) = self
979            .flow_store
980            .send(FlowStoreMessage::SearchFlows {
981                query,
982                respond_to: tx,
983            })
984            .await
985        {
986            error!("Failed to send SearchFlows request: {}", e);
987            return Vec::new();
988        }
989        rx.await
990            .unwrap_or_default()
991            .into_iter()
992            .map(|summary| redact_flow_summary(summary, &redaction))
993            .collect()
994    }
995
996    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
997        let redaction = self.current_redaction_policy();
998        redact_flow_update(update, &redaction)
999    }
1000
1001    /// 解除截获并可选地应用修改。
1002    ///
1003    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
1004    /// - 若 `mods` 为 None,直接 Continue;
1005    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
1006    ///
1007    /// `key` 格式约定:
1008    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
1009    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
1010    pub async fn resolve_intercept_with_modifications(
1011        &self,
1012        key: String,
1013        action: &str,
1014        mods: Option<relay_core_api::modification::FlowModification>,
1015    ) -> Result<(), String> {
1016        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
1017            .await
1018    }
1019
1020    pub async fn resolve_intercept_with_modifications_from(
1021        &self,
1022        actor: AuditActor,
1023        key: String,
1024        action: &str,
1025        mods: Option<relay_core_api::modification::FlowModification>,
1026    ) -> Result<(), String> {
1027        let modified_fields = modification_field_names(mods.as_ref());
1028        let result = match action {
1029            "drop" => InterceptionResult::Drop,
1030            _ => match mods {
1031                None => InterceptionResult::Continue,
1032                Some(m) => {
1033                    let parts: Vec<&str> = key.splitn(4, ':').collect();
1034                    match parts.as_slice() {
1035                        [flow_id, phase] => {
1036                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
1037                                modification::apply_flow_modification(&flow, phase, m)
1038                            } else {
1039                                InterceptionResult::Continue
1040                            }
1041                        }
1042                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
1043                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
1044                        [_, "ws_msg", _] => {
1045                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
1046                                modification::apply_ws_modification(&msg, m)
1047                            } else {
1048                                InterceptionResult::Continue
1049                            }
1050                        }
1051                        _ => InterceptionResult::Continue,
1052                    }
1053                }
1054            },
1055        };
1056        let outcome = self.resolve_intercept(key.clone(), result).await;
1057        let audit_outcome = if outcome.is_ok() {
1058            AuditOutcome::Success
1059        } else {
1060            AuditOutcome::Failed
1061        };
1062        let error_message = outcome.as_ref().err().cloned();
1063
1064        self.record_audit_event(AuditEvent::new(
1065            actor,
1066            AuditEventKind::InterceptResolved,
1067            key,
1068            audit_outcome,
1069            json!({
1070                "action": action,
1071                "has_modifications": !modified_fields.is_empty(),
1072                "modified_fields": modified_fields,
1073                "error": error_message
1074            }),
1075        ));
1076        outcome
1077    }
1078
1079    #[cfg(feature = "script")]
1080    pub async fn load_script_from(
1081        &self,
1082        actor: AuditActor,
1083        target: String,
1084        script: &str,
1085    ) -> Result<(), String> {
1086        let result = self
1087            .script_interceptor
1088            .load_script(script)
1089            .await
1090            .map_err(|e| e.to_string());
1091
1092        let outcome = if result.is_ok() {
1093            AuditOutcome::Success
1094        } else {
1095            AuditOutcome::Failed
1096        };
1097        let error_message = result.as_ref().err().cloned();
1098
1099        self.record_audit_event(AuditEvent::new(
1100            actor,
1101            AuditEventKind::ScriptReloaded,
1102            target,
1103            outcome,
1104            json!({
1105                "script_bytes": script.len(),
1106                "error": error_message
1107            }),
1108        ));
1109
1110        result
1111    }
1112
1113    /// S5: Set the env var whitelist for relay.env() in user scripts.
1114    #[cfg(feature = "script")]
1115    pub async fn set_script_env_allow(&self, env_allow: std::collections::HashSet<String>) {
1116        self.script_interceptor.set_env_allow(env_allow).await;
1117    }
1118
1119    fn record_audit_event(&self, event: AuditEvent) {
1120        const AUDIT_HISTORY_LIMIT: usize = 200;
1121        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
1122        if event.outcome == AuditOutcome::Failed {
1123            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
1124        }
1125
1126        let details = event.details.to_string();
1127        tracing::info!(
1128            target: "relay_core_audit",
1129            event_id = %event.id,
1130            actor = %event.actor.as_str(),
1131            kind = %event.kind.as_str(),
1132            target = %event.target,
1133            outcome = %event.outcome.as_str(),
1134            details = %details
1135        );
1136
1137        if let Ok(mut history) = self.audit_history.lock() {
1138            if history.len() >= AUDIT_HISTORY_LIMIT {
1139                history.pop_front();
1140            }
1141            history.push_back(event.clone());
1142        }
1143
1144        if let Some(store) = self.store.clone() {
1145            let event_json = serde_json::to_value(&event).unwrap_or_default();
1146            let event_id = event.id.clone();
1147            let timestamp_ms = event.timestamp_ms;
1148            let actor = event.actor.as_str().to_string();
1149            let kind = event.kind.as_str().to_string();
1150            let target = event.target.clone();
1151            let outcome = event.outcome.as_str().to_string();
1152            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1153                handle.spawn(async move {
1154                    if let Err(e) = store
1155                        .save_audit_event(AuditEventRecord {
1156                            id: &event_id,
1157                            timestamp_ms,
1158                            actor: &actor,
1159                            kind: &kind,
1160                            target: &target,
1161                            outcome: &outcome,
1162                            content: &event_json,
1163                        })
1164                        .await
1165                    {
1166                        tracing::error!("Failed to persist audit event: {}", e);
1167                    }
1168                });
1169            }
1170        }
1171
1172        let _ = self.audit_broadcast_tx.send(event);
1173    }
1174
1175    fn transition_to_running(&self, port: u16) {
1176        self.update_lifecycle(RuntimeLifecycle {
1177            phase: RuntimeLifecyclePhase::Running,
1178            port: Some(port),
1179            started_at_ms: Some(now_unix_ms()),
1180            last_error: None,
1181        });
1182    }
1183
1184    fn transition_to_stopped(&self) {
1185        if let Ok(mut guard) = self.shutdown_tx.lock() {
1186            *guard = None;
1187        }
1188
1189        self.update_lifecycle(RuntimeLifecycle {
1190            phase: RuntimeLifecyclePhase::Stopped,
1191            port: None,
1192            started_at_ms: None,
1193            last_error: None,
1194        });
1195    }
1196
1197    fn transition_to_failed(&self, port: u16, error: String) {
1198        if let Ok(mut guard) = self.shutdown_tx.lock() {
1199            *guard = None;
1200        }
1201
1202        self.update_lifecycle(RuntimeLifecycle {
1203            phase: RuntimeLifecyclePhase::Failed,
1204            port: Some(port),
1205            started_at_ms: None,
1206            last_error: Some(error),
1207        });
1208    }
1209
1210    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1211        tracing::info!(
1212            target: "relay_core_lifecycle",
1213            phase = %lifecycle.phase.as_str(),
1214            port = ?lifecycle.port,
1215            started_at_ms = ?lifecycle.started_at_ms,
1216            last_error = ?lifecycle.last_error
1217        );
1218        self.lifecycle_tx.send_replace(lifecycle);
1219    }
1220
1221    pub fn spawn_proxy(
1222        self: &Arc<Self>,
1223        config: ProxyConfig,
1224        sink: mpsc::Sender<FlowUpdate>,
1225        extra_interceptor: Option<Arc<dyn Interceptor>>,
1226    ) -> Result<ProxySpawnResult, String> {
1227        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1228        match self.prepare_start(config.port, shutdown_tx) {
1229            Ok(()) => {}
1230            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1231            Err(error) => return Err(error),
1232        }
1233        let state = self.clone();
1234        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1235            if let Err(error) = state
1236                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1237                .await
1238            {
1239                error!("Proxy failed: {}", error);
1240            }
1241        })))
1242    }
1243
1244    pub async fn start_proxy(
1245        self: &Arc<Self>,
1246        config: ProxyConfig,
1247        sink: mpsc::Sender<FlowUpdate>,
1248        extra_interceptor: Option<Arc<dyn Interceptor>>,
1249    ) -> Result<(), String> {
1250        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1251        self.prepare_start(config.port, shutdown_tx)?;
1252        self.run_proxy(config, sink, extra_interceptor, shutdown_rx)
1253            .await
1254    }
1255
1256    async fn run_proxy(
1257        self: &Arc<Self>,
1258        config: ProxyConfig,
1259        sink: mpsc::Sender<FlowUpdate>,
1260        extra_interceptor: Option<Arc<dyn Interceptor>>,
1261        shutdown_rx: oneshot::Receiver<()>,
1262    ) -> Result<(), String> {
1263        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1264        let state = self.clone();
1265
1266        if let Some(parent) = config.ca_cert_path.parent()
1267            && !parent.exists()
1268        {
1269            std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1270        }
1271
1272        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1273            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1274        let ca = Arc::new(ca);
1275
1276        #[cfg(feature = "script")]
1277        let script_interceptor = self.script_interceptor.clone();
1278
1279        #[cfg(feature = "script")]
1280        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1281
1282        #[cfg(not(feature = "script"))]
1283        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1284
1285        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(
1286            self.clone(),
1287        )));
1288
1289        if let Some(interceptor) = extra_interceptor {
1290            interceptors.push(interceptor);
1291        }
1292
1293        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1294        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1295
1296        tokio::spawn(async move {
1297            while let Some(update) = proxy_rx.recv().await {
1298                match update.clone() {
1299                    FlowUpdate::Full(flow) => {
1300                        state.upsert_flow(flow);
1301                    }
1302                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1303                        state.append_ws_message(flow_id, message);
1304                    }
1305                    FlowUpdate::HttpBody {
1306                        flow_id,
1307                        direction,
1308                        body,
1309                    } => {
1310                        state.update_http_body(flow_id, body, direction);
1311                    }
1312                }
1313
1314                let _ = state.flow_broadcast_tx.send(update.clone());
1315
1316                if sink.try_send(update).is_err() {
1317                    relay_core_lib::metrics::inc_flows_dropped();
1318                }
1319            }
1320        });
1321
1322        if let Some(udp_port) = config.udp_tproxy_port {
1323            let udp_proxy_tx = proxy_tx.clone();
1324            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1325            tokio::spawn(async move {
1326                match tokio::net::UdpSocket::bind(udp_addr).await {
1327                    Ok(socket) => {
1328                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1329                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1330                            error!("UDP TPROXY failed: {}", e);
1331                        }
1332                    }
1333                    Err(e) => {
1334                        error!("Failed to bind UDP TPROXY socket: {}", e);
1335                    }
1336                }
1337            });
1338        }
1339
1340        let listener = match TcpListener::bind(addr).await {
1341            Ok(listener) => listener,
1342            Err(e) => {
1343                if let Ok(mut guard) = self.shutdown_tx.lock() {
1344                    *guard = None;
1345                }
1346                let message = format!("Failed to bind to address {}: {}", addr, e);
1347                self.transition_to_failed(config.port, message.clone());
1348                return Err(message);
1349            }
1350        };
1351
1352        self.transition_to_running(config.port);
1353        let shutdown_rx = Some(shutdown_rx);
1354
1355        if config.transparent {
1356            let policy = ProxyPolicy {
1357                transparent_enabled: true,
1358                ..Default::default()
1359            };
1360            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1361            let policy_rx = self.policy_tx.subscribe();
1362
1363            let provider: Arc<dyn OriginalDstProvider> = {
1364                let mut addrs = BTreeSet::new();
1365                if let Ok(local) = listener.local_addr() {
1366                    addrs.insert(local);
1367                }
1368
1369                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1370                {
1371                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1372                }
1373                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1374                {
1375                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1376                        Ok(provider) => Arc::new(provider),
1377                        Err(e) => {
1378                            error!("Failed to initialize macOS PF provider: {}", e);
1379                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1380                        }
1381                    }
1382                }
1383                #[cfg(target_os = "windows")]
1384                {
1385                    let filter =
1386                        "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)"
1387                            .to_string();
1388                    let port = config.port;
1389                    tokio::spawn(async move {
1390                        relay_core_lib::capture::windows::start_windivert_capture(filter, port)
1391                            .await;
1392                    });
1393
1394                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1395                }
1396                #[cfg(not(any(
1397                    all(target_os = "linux", feature = "transparent-linux"),
1398                    all(target_os = "macos", feature = "transparent-macos"),
1399                    target_os = "windows"
1400                )))]
1401                {
1402                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1403                }
1404            };
1405
1406            let source = TransparentTcpCaptureSource::new(listener, provider);
1407            let result = relay_core_lib::start_proxy(
1408                source,
1409                proxy_tx,
1410                interceptor,
1411                ca,
1412                policy_rx,
1413                None,
1414                shutdown_rx,
1415            )
1416            .await
1417            .map_err(|e| e.to_string());
1418            if let Err(error) = &result {
1419                self.transition_to_failed(config.port, error.clone());
1420            } else {
1421                self.transition_to_stopped();
1422            }
1423            result
1424        } else {
1425            let source = TcpCaptureSource::new(listener);
1426            let policy = ProxyPolicy::default();
1427            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1428            let policy_rx = self.policy_tx.subscribe();
1429
1430            let result = relay_core_lib::start_proxy(
1431                source,
1432                proxy_tx,
1433                interceptor,
1434                ca,
1435                policy_rx,
1436                None,
1437                shutdown_rx,
1438            )
1439            .await
1440            .map_err(|e| e.to_string());
1441            if let Err(error) = &result {
1442                self.transition_to_failed(config.port, error.clone());
1443            } else {
1444                self.transition_to_stopped();
1445            }
1446            result
1447        }
1448    }
1449}
1450
1451fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1452    match update {
1453        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1454        FlowUpdate::WebSocketMessage {
1455            flow_id,
1456            mut message,
1457        } => {
1458            message.content = redact_body(message.content, redaction);
1459            FlowUpdate::WebSocketMessage { flow_id, message }
1460        }
1461        FlowUpdate::HttpBody {
1462            flow_id,
1463            direction,
1464            body,
1465        } => FlowUpdate::HttpBody {
1466            flow_id,
1467            direction,
1468            body: redact_body(body, redaction),
1469        },
1470    }
1471}
1472
1473fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1474    if !redaction.enabled {
1475        return flow;
1476    }
1477    match &mut flow.layer {
1478        Layer::Http(http) => {
1479            redact_http_request(&mut http.request, redaction);
1480            if let Some(response) = &mut http.response {
1481                redact_headers(&mut response.headers, redaction);
1482                response.body = response
1483                    .body
1484                    .take()
1485                    .map(|body| redact_body(body, redaction));
1486            }
1487        }
1488        Layer::WebSocket(ws) => {
1489            redact_http_request(&mut ws.handshake_request, redaction);
1490            redact_headers(&mut ws.handshake_response.headers, redaction);
1491            ws.handshake_response.body = ws
1492                .handshake_response
1493                .body
1494                .take()
1495                .map(|body| redact_body(body, redaction));
1496            for message in &mut ws.messages {
1497                message.content = redact_body(message.content.clone(), redaction);
1498            }
1499        }
1500        _ => {}
1501    }
1502    flow
1503}
1504
1505fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1506    if !redaction.enabled {
1507        return summary;
1508    }
1509    summary.url = redact_url_string(&summary.url, redaction);
1510    summary
1511}
1512
1513fn redact_http_request(
1514    request: &mut relay_core_api::flow::HttpRequest,
1515    redaction: &RedactionPolicy,
1516) {
1517    redact_headers(&mut request.headers, redaction);
1518    redact_query_pairs(&mut request.query, redaction);
1519    request.url = redact_url(&request.url, redaction);
1520    request.body = request.body.take().map(|body| redact_body(body, redaction));
1521}
1522
1523fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1524    if !redaction.enabled {
1525        return;
1526    }
1527    let sensitive = redaction_set(&redaction.sensitive_header_names);
1528    for (name, value) in headers.iter_mut() {
1529        if sensitive.contains(&name.to_ascii_lowercase()) {
1530            *value = "[REDACTED]".to_string();
1531        }
1532    }
1533}
1534
1535fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1536    if !redaction.enabled {
1537        return;
1538    }
1539    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1540    for (name, value) in query.iter_mut() {
1541        if sensitive.contains(&name.to_ascii_lowercase()) {
1542            *value = "[REDACTED]".to_string();
1543        }
1544    }
1545}
1546
1547fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1548    if !redaction.enabled {
1549        return url.clone();
1550    }
1551    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1552    let pairs: Vec<(String, String)> = url
1553        .query_pairs()
1554        .map(|(k, v)| {
1555            let key = k.to_string();
1556            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1557                "[REDACTED]".to_string()
1558            } else {
1559                v.to_string()
1560            };
1561            (key, value)
1562        })
1563        .collect();
1564    let mut next = url.clone();
1565    if pairs.is_empty() {
1566        return next;
1567    }
1568    next.query_pairs_mut().clear();
1569    for (k, v) in pairs {
1570        next.query_pairs_mut().append_pair(&k, &v);
1571    }
1572    next
1573}
1574
1575fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1576    match url::Url::parse(input) {
1577        Ok(url) => redact_url(&url, redaction).to_string(),
1578        Err(_) => input.to_string(),
1579    }
1580}
1581
1582fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1583    if redaction.enabled && redaction.redact_bodies {
1584        body.content = "[REDACTED]".to_string();
1585    }
1586    body
1587}
1588
1589fn redaction_set(values: &[String]) -> HashSet<String> {
1590    values
1591        .iter()
1592        .map(|value| value.to_ascii_lowercase())
1593        .collect()
1594}
1595
1596#[derive(Clone)]
1597pub struct ProxyConfig {
1598    pub port: u16,
1599    pub ca_cert_path: std::path::PathBuf,
1600    pub ca_key_path: std::path::PathBuf,
1601    pub transparent: bool,
1602    pub udp_tproxy_port: Option<u16>,
1603}
1604
1605impl ProxyConfig {
1606    pub fn new(
1607        port: u16,
1608        ca_cert_path: std::path::PathBuf,
1609        ca_key_path: std::path::PathBuf,
1610    ) -> Self {
1611        Self {
1612            port,
1613            ca_cert_path,
1614            ca_key_path,
1615            transparent: false,
1616            udp_tproxy_port: None,
1617        }
1618    }
1619
1620    pub fn from_app_data_dir(
1621        app_data_dir: impl Into<std::path::PathBuf>,
1622        port: u16,
1623    ) -> Result<Self, String> {
1624        let app_data_dir = app_data_dir.into();
1625        if !app_data_dir.exists() {
1626            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1627        }
1628
1629        Ok(Self::new(
1630            port,
1631            app_data_dir.join("ca_cert.pem"),
1632            app_data_dir.join("ca_key.pem"),
1633        ))
1634    }
1635
1636    pub fn with_transparent(mut self, transparent: bool) -> Self {
1637        self.transparent = transparent;
1638        self
1639    }
1640
1641    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1642        self.udp_tproxy_port = udp_tproxy_port;
1643        self
1644    }
1645}
1646
1647fn now_unix_ms() -> u64 {
1648    SystemTime::now()
1649        .duration_since(UNIX_EPOCH)
1650        .unwrap_or_default()
1651        .as_millis() as u64
1652}
1653
1654fn modification_field_names(
1655    mods: Option<&relay_core_api::modification::FlowModification>,
1656) -> Vec<&'static str> {
1657    let Some(mods) = mods else {
1658        return Vec::new();
1659    };
1660
1661    let mut fields = Vec::new();
1662    if mods.method.is_some() {
1663        fields.push("method");
1664    }
1665    if mods.url.is_some() {
1666        fields.push("url");
1667    }
1668    if mods.request_headers.is_some() {
1669        fields.push("request_headers");
1670    }
1671    if mods.request_body.is_some() {
1672        fields.push("request_body");
1673    }
1674    if mods.status_code.is_some() {
1675        fields.push("status_code");
1676    }
1677    if mods.response_headers.is_some() {
1678        fields.push("response_headers");
1679    }
1680    if mods.response_body.is_some() {
1681        fields.push("response_body");
1682    }
1683    if mods.message_content.is_some() {
1684        fields.push("message_content");
1685    }
1686    fields
1687}
1688
1689#[cfg(test)]
1690mod tests {
1691    use super::*;
1692    use chrono::Utc;
1693    use relay_core_api::flow::{
1694        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1695        ResponseTiming, TransportProtocol,
1696    };
1697    use std::sync::atomic::{AtomicU64, Ordering};
1698    use tokio::time::{Duration, sleep};
1699    use url::Url;
1700    use uuid::Uuid;
1701
1702    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1703
1704    fn sqlite_url() -> String {
1705        let nanos = SystemTime::now()
1706            .duration_since(UNIX_EPOCH)
1707            .expect("clock drift")
1708            .as_nanos();
1709        let pid = std::process::id();
1710        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1711        let db_dir = std::env::current_dir()
1712            .expect("cwd")
1713            .join("target")
1714            .join("test-dbs");
1715        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1716        let db_path = db_dir.join(format!(
1717            "relay-core-runtime-test-{}-{}-{}.db",
1718            pid, nanos, seq
1719        ));
1720        format!("sqlite://{}?mode=rwc", db_path.display())
1721    }
1722
1723    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1724        let start_time =
1725            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1726        let request_url =
1727            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1728        Flow {
1729            id: Uuid::new_v4(),
1730            start_time,
1731            end_time: Some(start_time),
1732            network: NetworkInfo {
1733                client_ip: "127.0.0.1".to_string(),
1734                client_port: 12000,
1735                server_ip: "127.0.0.1".to_string(),
1736                server_port: 8080,
1737                protocol: TransportProtocol::TCP,
1738                tls: false,
1739                tls_version: None,
1740                sni: None,
1741            },
1742            layer: Layer::Http(HttpLayer {
1743                request: HttpRequest {
1744                    method: method.to_string(),
1745                    url: request_url,
1746                    version: "HTTP/1.1".to_string(),
1747                    headers: vec![],
1748                    cookies: vec![],
1749                    query: vec![],
1750                    body: None,
1751                },
1752                response: Some(HttpResponse {
1753                    status,
1754                    status_text: "OK".to_string(),
1755                    version: "HTTP/1.1".to_string(),
1756                    headers: vec![],
1757                    cookies: vec![],
1758                    body: None,
1759                    timing: ResponseTiming {
1760                        time_to_first_byte: None,
1761                        time_to_last_byte: None,
1762                        connect_time_ms: None,
1763                        ssl_time_ms: None,
1764                    },
1765                }),
1766                error: None,
1767            }),
1768            tags: vec![],
1769            meta: std::collections::HashMap::new(),
1770        }
1771    }
1772
1773    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1774        let start_time =
1775            chrono::DateTime::<Utc>::from_timestamp_millis(ts).expect("timestamp should be valid");
1776        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1777            .expect("url should parse");
1778        Flow {
1779            id: Uuid::new_v4(),
1780            start_time,
1781            end_time: Some(start_time),
1782            network: NetworkInfo {
1783                client_ip: "127.0.0.1".to_string(),
1784                client_port: 12000,
1785                server_ip: "127.0.0.1".to_string(),
1786                server_port: 8080,
1787                protocol: TransportProtocol::TCP,
1788                tls: false,
1789                tls_version: None,
1790                sni: None,
1791            },
1792            layer: Layer::Http(HttpLayer {
1793                request: HttpRequest {
1794                    method: "GET".to_string(),
1795                    url: request_url,
1796                    version: "HTTP/1.1".to_string(),
1797                    headers: vec![
1798                        (
1799                            "Authorization".to_string(),
1800                            "Bearer secret-token".to_string(),
1801                        ),
1802                        ("X-Normal".to_string(), "visible".to_string()),
1803                    ],
1804                    cookies: vec![],
1805                    query: vec![
1806                        ("token".to_string(), "abc123".to_string()),
1807                        ("ok".to_string(), "1".to_string()),
1808                    ],
1809                    body: Some(BodyData {
1810                        encoding: "utf-8".to_string(),
1811                        content: "secret request body".to_string(),
1812                        size: 19,
1813                    }),
1814                },
1815                response: Some(HttpResponse {
1816                    status: 200,
1817                    status_text: "OK".to_string(),
1818                    version: "HTTP/1.1".to_string(),
1819                    headers: vec![
1820                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1821                        ("X-Response".to_string(), "visible".to_string()),
1822                    ],
1823                    cookies: vec![],
1824                    body: Some(BodyData {
1825                        encoding: "utf-8".to_string(),
1826                        content: "secret response body".to_string(),
1827                        size: 20,
1828                    }),
1829                    timing: ResponseTiming {
1830                        time_to_first_byte: None,
1831                        time_to_last_byte: None,
1832                        connect_time_ms: None,
1833                        ssl_time_ms: None,
1834                    },
1835                }),
1836                error: None,
1837            }),
1838            tags: vec![],
1839            meta: std::collections::HashMap::new(),
1840        }
1841    }
1842
1843    #[tokio::test]
1844    async fn set_rules_from_records_audit_event() {
1845        let state = CoreState::new(None).await;
1846
1847        state
1848            .set_rules_from(
1849                AuditActor::Http,
1850                "rule.upsert",
1851                "rule-1".to_string(),
1852                json!({ "route": "/api/v1/rules" }),
1853                Vec::new(),
1854            )
1855            .await
1856            .expect("set rules should succeed");
1857
1858        let events = state.recent_audit_events();
1859        let event = events.last().expect("audit event should exist");
1860        assert_eq!(event.actor, AuditActor::Http);
1861        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1862        assert_eq!(event.outcome, AuditOutcome::Success);
1863        assert_eq!(event.target, "rule-1");
1864        assert_eq!(event.details["operation"], "rule.upsert");
1865        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1866    }
1867
1868    #[tokio::test]
1869    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1870        let state = CoreState::new(None).await;
1871
1872        state
1873            .upsert_rule_from(
1874                AuditActor::Probe,
1875                "rule.upsert",
1876                "rule-1".to_string(),
1877                json!({ "tool": "set_rule" }),
1878                Rule {
1879                    id: "rule-1".to_string(),
1880                    name: "first".to_string(),
1881                    active: true,
1882                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1883                    priority: 1,
1884                    termination: relay_core_lib::rule::RuleTermination::Continue,
1885                    filter: relay_core_lib::rule::Filter::Url(
1886                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1887                    ),
1888                    actions: vec![],
1889                    constraints: None,
1890                },
1891            )
1892            .await
1893            .expect("initial upsert should succeed");
1894
1895        state
1896            .upsert_rule_from(
1897                AuditActor::Probe,
1898                "rule.upsert",
1899                "rule-1".to_string(),
1900                json!({ "tool": "set_rule" }),
1901                Rule {
1902                    id: "rule-1".to_string(),
1903                    name: "second".to_string(),
1904                    active: true,
1905                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1906                    priority: 2,
1907                    termination: relay_core_lib::rule::RuleTermination::Continue,
1908                    filter: relay_core_lib::rule::Filter::Url(
1909                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1910                    ),
1911                    actions: vec![],
1912                    constraints: None,
1913                },
1914            )
1915            .await
1916            .expect("replacement upsert should succeed");
1917
1918        let rules = state.get_rules().await;
1919        assert_eq!(rules.len(), 1);
1920        assert_eq!(rules[0].name, "second");
1921        let event = state
1922            .recent_audit_events()
1923            .last()
1924            .cloned()
1925            .expect("audit event");
1926        assert_eq!(event.details["operation"], "rule.upsert");
1927        assert_eq!(event.details["details"]["tool"], "set_rule");
1928    }
1929
1930    #[tokio::test]
1931    async fn delete_rule_from_returns_false_when_rule_missing() {
1932        let state = CoreState::new(None).await;
1933
1934        let deleted = state
1935            .delete_rule_from(
1936                AuditActor::Http,
1937                "rule.delete",
1938                "missing".to_string(),
1939                json!({ "route": "/api/v1/rules/{id}" }),
1940                "missing",
1941            )
1942            .await
1943            .expect("delete should not fail");
1944
1945        assert!(!deleted);
1946        assert!(state.recent_audit_events().is_empty());
1947    }
1948
1949    #[tokio::test]
1950    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1951        let state = CoreState::new(None).await;
1952
1953        let rule_id = state
1954            .create_mock_response_rule_from(
1955                AuditActor::Http,
1956                "api-mock-1".to_string(),
1957                json!({ "route": "/api/v1/mock", "status": 201 }),
1958                MockResponseRuleConfig {
1959                    rule_id: "api-mock-1".to_string(),
1960                    url_pattern: "example.com".to_string(),
1961                    name: "api-mock:example.com".to_string(),
1962                    status: 201,
1963                    content_type: "application/json".to_string(),
1964                    body: "{\"ok\":true}".to_string(),
1965                },
1966            )
1967            .await
1968            .expect("mock rule should be created");
1969
1970        assert_eq!(rule_id, "api-mock-1");
1971        let rules = state.get_rules().await;
1972        assert_eq!(rules.len(), 1);
1973        assert_eq!(rules[0].id, "api-mock-1");
1974        let event = state
1975            .recent_audit_events()
1976            .last()
1977            .cloned()
1978            .expect("audit event");
1979        assert_eq!(event.details["operation"], "rule.mock_create");
1980        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1981    }
1982
1983    #[tokio::test]
1984    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1985        let state = CoreState::new(None).await;
1986
1987        let rule_id = state
1988            .create_intercept_rule_from(
1989                AuditActor::Http,
1990                "intercept-1".to_string(),
1991                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1992                InterceptRuleConfig {
1993                    rule_id: "intercept-1".to_string(),
1994                    active: true,
1995                    url_pattern: "example.com".to_string(),
1996                    method: None,
1997                    phase: "request".to_string(),
1998                    name: "api-intercept:example.com".to_string(),
1999                    priority: 100,
2000                    termination: relay_core_lib::rule::RuleTermination::Stop,
2001                },
2002            )
2003            .await
2004            .expect("intercept rule should be created");
2005
2006        assert_eq!(rule_id, "intercept-1");
2007        let rules = state.get_rules().await;
2008        assert_eq!(rules.len(), 1);
2009        assert_eq!(rules[0].id, "intercept-1");
2010        assert_eq!(rules[0].name, "api-intercept:example.com");
2011        let event = state
2012            .recent_audit_events()
2013            .last()
2014            .cloned()
2015            .expect("audit event");
2016        assert_eq!(event.details["operation"], "rule.intercept_create");
2017        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
2018    }
2019
2020    #[tokio::test]
2021    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
2022        let state = CoreState::new(None).await;
2023
2024        state
2025            .upsert_legacy_intercept_rule_from(
2026                AuditActor::Tauri,
2027                "legacy-1".to_string(),
2028                json!({ "command": "set_intercept_rule" }),
2029                InterceptRule {
2030                    id: "legacy-1".to_string(),
2031                    active: true,
2032                    url_pattern: "example.com".to_string(),
2033                    method: None,
2034                    phase: "both".to_string(),
2035                },
2036            )
2037            .await
2038            .expect("initial family upsert should succeed");
2039        assert_eq!(state.get_rules().await.len(), 2);
2040
2041        state
2042            .upsert_legacy_intercept_rule_from(
2043                AuditActor::Tauri,
2044                "legacy-1".to_string(),
2045                json!({ "command": "set_intercept_rule" }),
2046                InterceptRule {
2047                    id: "legacy-1".to_string(),
2048                    active: true,
2049                    url_pattern: "example.org".to_string(),
2050                    method: Some("POST".to_string()),
2051                    phase: "request".to_string(),
2052                },
2053            )
2054            .await
2055            .expect("replacement family upsert should succeed");
2056
2057        let rules = state.get_rules().await;
2058        assert_eq!(rules.len(), 1);
2059        assert_eq!(rules[0].id, "legacy-1");
2060        let event = state
2061            .recent_audit_events()
2062            .last()
2063            .cloned()
2064            .expect("audit event");
2065        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
2066        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
2067    }
2068
2069    #[tokio::test]
2070    async fn resolve_intercept_failure_records_failed_audit_event() {
2071        let state = CoreState::new(None).await;
2072
2073        let result = state
2074            .resolve_intercept_with_modifications_from(
2075                AuditActor::Probe,
2076                "missing-flow:request".to_string(),
2077                "drop",
2078                None,
2079            )
2080            .await;
2081
2082        assert!(result.is_err());
2083        let events = state.recent_audit_events();
2084        let event = events.last().expect("audit event should exist");
2085        assert_eq!(event.actor, AuditActor::Probe);
2086        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
2087        assert_eq!(event.outcome, AuditOutcome::Failed);
2088        assert_eq!(event.details["action"], "drop");
2089        assert!(
2090            event.details["error"]
2091                .as_str()
2092                .unwrap_or_default()
2093                .contains("Interception not found")
2094        );
2095    }
2096
2097    #[tokio::test]
2098    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
2099        let state = CoreState::new(None).await;
2100        let (shutdown_tx, shutdown_rx) = oneshot::channel();
2101
2102        state
2103            .prepare_start(8080, shutdown_tx)
2104            .expect("prepare start should succeed");
2105        let lifecycle = state.lifecycle();
2106        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
2107        assert_eq!(lifecycle.port, Some(8080));
2108        assert!(lifecycle.started_at_ms.is_none());
2109        assert!(lifecycle.last_error.is_none());
2110
2111        assert_eq!(
2112            state.stop_proxy().expect("stop should succeed"),
2113            ProxyStopResult::Stopping
2114        );
2115        let lifecycle = state.lifecycle();
2116        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
2117        assert_eq!(lifecycle.port, Some(8080));
2118        assert!(shutdown_rx.await.is_ok());
2119    }
2120
2121    #[tokio::test]
2122    async fn status_snapshot_derives_runtime_facing_fields() {
2123        let state = CoreState::new(None).await;
2124        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2125        state
2126            .prepare_start(8080, shutdown_tx)
2127            .expect("prepare start should succeed");
2128
2129        let status = state.status_snapshot();
2130        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
2131        assert!(status.running);
2132        assert_eq!(status.port, Some(8080));
2133        assert!(status.uptime.is_none());
2134        assert!(status.last_error.is_none());
2135    }
2136
2137    #[tokio::test]
2138    async fn status_report_combines_status_and_metrics() {
2139        let state = CoreState::new(None).await;
2140        let report = state.status_report().await;
2141
2142        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
2143        assert!(!report.status.running);
2144        assert_eq!(report.metrics.intercepts_pending, 0);
2145        assert_eq!(report.metrics.ws_pending_messages, 0);
2146        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
2147        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
2148        assert_eq!(report.metrics.audit_events_total, 0);
2149        assert_eq!(report.metrics.audit_events_failed, 0);
2150        assert_eq!(report.metrics.flow_events_lagged_total, 0);
2151        assert_eq!(report.metrics.audit_events_lagged_total, 0);
2152    }
2153
2154    #[test]
2155    fn proxy_config_new_and_transport_setters_preserve_values() {
2156        let config = ProxyConfig::new(
2157            8080,
2158            std::path::PathBuf::from("/tmp/ca_cert.pem"),
2159            std::path::PathBuf::from("/tmp/ca_key.pem"),
2160        )
2161        .with_transparent(true)
2162        .with_udp_tproxy_port(Some(15000));
2163
2164        assert_eq!(config.port, 8080);
2165        assert_eq!(
2166            config.ca_cert_path,
2167            std::path::PathBuf::from("/tmp/ca_cert.pem")
2168        );
2169        assert_eq!(
2170            config.ca_key_path,
2171            std::path::PathBuf::from("/tmp/ca_key.pem")
2172        );
2173        assert!(config.transparent);
2174        assert_eq!(config.udp_tproxy_port, Some(15000));
2175    }
2176
2177    #[test]
2178    fn proxy_config_from_app_data_dir_creates_default_paths() {
2179        let unique = SystemTime::now()
2180            .duration_since(UNIX_EPOCH)
2181            .expect("clock drift")
2182            .as_nanos();
2183        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2184
2185        let config =
2186            ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2187
2188        assert!(dir.exists());
2189        assert_eq!(config.port, 8899);
2190        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2191        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2192        assert!(!config.transparent);
2193        assert!(config.udp_tproxy_port.is_none());
2194    }
2195
2196    #[tokio::test]
2197    async fn intercept_snapshot_maps_pending_counts() {
2198        let state = CoreState::new(None).await;
2199        let snapshot = state.intercept_snapshot().await;
2200
2201        assert_eq!(snapshot.pending_count, 0);
2202        assert_eq!(snapshot.ws_pending_count, 0);
2203    }
2204
2205    #[tokio::test]
2206    async fn audit_snapshot_returns_latest_events_in_order() {
2207        let state = CoreState::new(None).await;
2208        state.record_audit_event(AuditEvent::new(
2209            AuditActor::Runtime,
2210            AuditEventKind::RuleChanged,
2211            "first",
2212            AuditOutcome::Success,
2213            json!({ "index": 1 }),
2214        ));
2215        state.record_audit_event(AuditEvent::new(
2216            AuditActor::Http,
2217            AuditEventKind::PolicyUpdated,
2218            "second",
2219            AuditOutcome::Success,
2220            json!({ "index": 2 }),
2221        ));
2222
2223        let snapshot = state.audit_snapshot(1);
2224
2225        assert_eq!(snapshot.events.len(), 1);
2226        assert_eq!(snapshot.events[0].target, "second");
2227        assert_eq!(snapshot.events[0].details["index"], 2);
2228    }
2229
2230    #[tokio::test]
2231    async fn query_audit_snapshot_filters_in_memory_events() {
2232        let state = CoreState::new(None).await;
2233        state.record_audit_event(AuditEvent::new(
2234            AuditActor::Http,
2235            AuditEventKind::RuleChanged,
2236            "rule-1",
2237            AuditOutcome::Success,
2238            json!({ "idx": 1 }),
2239        ));
2240        state.record_audit_event(AuditEvent::new(
2241            AuditActor::Probe,
2242            AuditEventKind::PolicyUpdated,
2243            "policy",
2244            AuditOutcome::Failed,
2245            json!({ "idx": 2 }),
2246        ));
2247
2248        let snapshot = state
2249            .query_audit_snapshot(CoreAuditQuery {
2250                actor: Some(AuditActor::Probe),
2251                kind: Some(AuditEventKind::PolicyUpdated),
2252                outcome: Some(AuditOutcome::Failed),
2253                limit: 10,
2254                ..Default::default()
2255            })
2256            .await;
2257
2258        assert_eq!(snapshot.events.len(), 1);
2259        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2260        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2261        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2262    }
2263
2264    #[tokio::test]
2265    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2266        let state = CoreState::new(Some(sqlite_url())).await;
2267        state.update_policy_from(
2268            AuditActor::Http,
2269            "policy".to_string(),
2270            ProxyPolicy {
2271                transparent_enabled: true,
2272                ..Default::default()
2273            },
2274        );
2275
2276        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2277        for _ in 0..10 {
2278            snapshot = state
2279                .query_audit_snapshot(CoreAuditQuery {
2280                    actor: Some(AuditActor::Http),
2281                    kind: Some(AuditEventKind::PolicyUpdated),
2282                    limit: 10,
2283                    ..Default::default()
2284                })
2285                .await;
2286            if !snapshot.events.is_empty() {
2287                break;
2288            }
2289            sleep(Duration::from_millis(20)).await;
2290        }
2291
2292        assert!(!snapshot.events.is_empty());
2293        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2294        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2295    }
2296
2297    #[tokio::test]
2298    async fn prepare_start_rejects_second_active_start() {
2299        let state = CoreState::new(None).await;
2300        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2301        state
2302            .prepare_start(8080, shutdown_tx)
2303            .expect("first start should succeed");
2304
2305        let (second_tx, _second_rx) = oneshot::channel();
2306        let error = state
2307            .prepare_start(8081, second_tx)
2308            .expect_err("second active start should be rejected");
2309        assert!(error.contains("already"));
2310    }
2311
2312    #[test]
2313    fn update_policy_records_audit_event() {
2314        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2315        let state = runtime.block_on(CoreState::new(None));
2316
2317        state.update_policy_from(
2318            AuditActor::Runtime,
2319            "policy".to_string(),
2320            ProxyPolicy {
2321                transparent_enabled: true,
2322                ..Default::default()
2323            },
2324        );
2325
2326        let events = state.recent_audit_events();
2327        let event = events.last().expect("audit event should exist");
2328        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2329        assert_eq!(event.outcome, AuditOutcome::Success);
2330        assert_eq!(event.details["transparent_enabled"], true);
2331    }
2332
2333    #[test]
2334    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2335        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2336        let state = runtime.block_on(CoreState::new(None));
2337        let original_timeout = state.policy_snapshot().request_timeout_ms;
2338
2339        state.patch_policy_from(
2340            AuditActor::Runtime,
2341            "policy.patch".to_string(),
2342            relay_core_api::policy::ProxyPolicyPatch {
2343                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2344                    enabled: Some(true),
2345                    redact_bodies: Some(true),
2346                    ..Default::default()
2347                }),
2348            },
2349        );
2350
2351        let policy = state.policy_snapshot();
2352        assert_eq!(policy.request_timeout_ms, original_timeout);
2353        assert!(policy.redaction.enabled);
2354        assert!(policy.redaction.redact_bodies);
2355
2356        let events = state.recent_audit_events();
2357        let event = events.last().expect("audit event should exist");
2358        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2359        assert_eq!(event.details["redaction_enabled"], true);
2360    }
2361
2362    #[tokio::test]
2363    async fn metrics_include_audit_and_lagged_event_counters() {
2364        let state = CoreState::new(None).await;
2365
2366        state.update_policy_from(
2367            AuditActor::Runtime,
2368            "policy".to_string(),
2369            ProxyPolicy::default(),
2370        );
2371        let _ = state
2372            .resolve_intercept_with_modifications_from(
2373                AuditActor::Probe,
2374                "missing-flow:request".to_string(),
2375                "drop",
2376                None,
2377            )
2378            .await;
2379
2380        state.record_flow_events_lagged(3);
2381        state.record_audit_events_lagged(5);
2382
2383        let metrics = state.get_metrics().await;
2384        assert_eq!(metrics.audit_events_total, 2);
2385        assert_eq!(metrics.audit_events_failed, 1);
2386        assert_eq!(metrics.flow_events_lagged_total, 3);
2387        assert_eq!(metrics.audit_events_lagged_total, 5);
2388    }
2389
2390    #[tokio::test]
2391    async fn prometheus_metrics_text_contains_observability_fields() {
2392        let state = CoreState::new(None).await;
2393        state.record_flow_events_lagged(2);
2394        state.record_audit_events_lagged(4);
2395
2396        let text = state.get_metrics_prometheus_text().await;
2397        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2398        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2399        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2400        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2401    }
2402
2403    #[tokio::test]
2404    async fn search_flows_uses_store_with_offset_pagination() {
2405        let state = CoreState::new(Some(sqlite_url())).await;
2406        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2407        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2408        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2409
2410        state.upsert_flow(Box::new(flow_a));
2411        state.upsert_flow(Box::new(flow_b));
2412        state.upsert_flow(Box::new(flow_c));
2413
2414        let mut baseline = Vec::new();
2415        for _ in 0..20 {
2416            baseline = state
2417                .search_flows(FlowQuery {
2418                    host: Some("api.example.com".to_string()),
2419                    path_contains: None,
2420                    method: None,
2421                    status_min: None,
2422                    status_max: None,
2423                    has_error: None,
2424                    is_websocket: None,
2425                    limit: Some(3),
2426                    offset: Some(0),
2427                })
2428                .await;
2429            if baseline.len() == 3 {
2430                break;
2431            }
2432            sleep(Duration::from_millis(20)).await;
2433        }
2434
2435        assert_eq!(baseline.len(), 3);
2436        let page = state
2437            .search_flows(FlowQuery {
2438                host: Some("api.example.com".to_string()),
2439                path_contains: None,
2440                method: None,
2441                status_min: None,
2442                status_max: None,
2443                has_error: None,
2444                is_websocket: None,
2445                limit: Some(1),
2446                offset: Some(1),
2447            })
2448            .await;
2449        assert_eq!(page.len(), 1);
2450        assert_eq!(page[0].id, baseline[1].id);
2451    }
2452
2453    #[tokio::test]
2454    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2455        let state = CoreState::new(Some(sqlite_url())).await;
2456        let first_flow = sample_http_flow(
2457            "persist.example.com",
2458            "/first",
2459            "GET",
2460            200,
2461            1_700_000_010_000,
2462        );
2463        let first_id = first_flow.id.to_string();
2464        state.upsert_flow(Box::new(first_flow));
2465        for i in 0..240 {
2466            state.upsert_flow(Box::new(sample_http_flow(
2467                "persist.example.com",
2468                &format!("/{}", i),
2469                "GET",
2470                200,
2471                1_700_000_020_000 + i,
2472            )));
2473        }
2474
2475        sleep(Duration::from_millis(200)).await;
2476
2477        let loaded = state.get_flow(first_id).await;
2478        assert!(loaded.is_some());
2479    }
2480
2481    #[tokio::test]
2482    async fn search_flows_redacts_summary_url_when_enabled() {
2483        let state = CoreState::new(None).await;
2484        state.update_policy_from(
2485            AuditActor::Runtime,
2486            "policy.redaction".to_string(),
2487            ProxyPolicy {
2488                redaction: RedactionPolicy {
2489                    enabled: true,
2490                    sensitive_query_keys: vec!["token".to_string()],
2491                    redact_bodies: false,
2492                    ..Default::default()
2493                },
2494                ..Default::default()
2495            },
2496        );
2497        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2498
2499        let mut items = Vec::new();
2500        for _ in 0..20 {
2501            items = state
2502                .search_flows(FlowQuery {
2503                    host: Some("api.example.com".to_string()),
2504                    path_contains: Some("/private".to_string()),
2505                    ..Default::default()
2506                })
2507                .await;
2508            if !items.is_empty() {
2509                break;
2510            }
2511            sleep(Duration::from_millis(20)).await;
2512        }
2513
2514        assert!(!items.is_empty());
2515        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2516        let token = redacted
2517            .query_pairs()
2518            .find(|(k, _)| k == "token")
2519            .map(|(_, v)| v.to_string());
2520        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2521    }
2522
2523    #[tokio::test]
2524    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2525        let state = CoreState::new(Some(sqlite_url())).await;
2526        state.update_policy_from(
2527            AuditActor::Runtime,
2528            "policy.redaction".to_string(),
2529            ProxyPolicy {
2530                redaction: RedactionPolicy {
2531                    enabled: true,
2532                    sensitive_header_names: vec![
2533                        "authorization".to_string(),
2534                        "set-cookie".to_string(),
2535                    ],
2536                    sensitive_query_keys: vec!["token".to_string()],
2537                    redact_bodies: true,
2538                },
2539                ..Default::default()
2540            },
2541        );
2542
2543        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2544        let flow_id = flow.id.to_string();
2545        state.upsert_flow(Box::new(flow));
2546        sleep(Duration::from_millis(80)).await;
2547
2548        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2549        let Layer::Http(http) = loaded.layer else {
2550            panic!("expected http layer");
2551        };
2552
2553        let auth = http
2554            .request
2555            .headers
2556            .iter()
2557            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2558            .map(|(_, v)| v.as_str());
2559        assert_eq!(auth, Some("[REDACTED]"));
2560
2561        let req_query_token = http
2562            .request
2563            .query
2564            .iter()
2565            .find(|(k, _)| k == "token")
2566            .map(|(_, v)| v.as_str());
2567        assert_eq!(req_query_token, Some("[REDACTED]"));
2568
2569        let req_body = http.request.body.as_ref().map(|b| b.content.as_str());
2570        assert_eq!(req_body, Some("[REDACTED]"));
2571
2572        let response = http.response.expect("response should exist");
2573        let set_cookie = response
2574            .headers
2575            .iter()
2576            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2577            .map(|(_, v)| v.as_str());
2578        assert_eq!(set_cookie, Some("[REDACTED]"));
2579        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2580        assert_eq!(res_body, Some("[REDACTED]"));
2581    }
2582
2583    #[test]
2584    fn redact_flow_update_masks_http_body_when_enabled() {
2585        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2586        let state = runtime.block_on(CoreState::new(None));
2587        state.update_policy_from(
2588            AuditActor::Runtime,
2589            "policy.redaction".to_string(),
2590            ProxyPolicy {
2591                redaction: RedactionPolicy {
2592                    enabled: true,
2593                    redact_bodies: true,
2594                    ..Default::default()
2595                },
2596                ..Default::default()
2597            },
2598        );
2599
2600        let update = FlowUpdate::HttpBody {
2601            flow_id: "f-1".to_string(),
2602            direction: relay_core_api::flow::Direction::ClientToServer,
2603            body: BodyData {
2604                encoding: "utf-8".to_string(),
2605                content: "super-secret".to_string(),
2606                size: 12,
2607            },
2608        };
2609        let redacted = state.redact_flow_update_for_output(update);
2610        match redacted {
2611            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2612            _ => panic!("expected http body update"),
2613        }
2614    }
2615
2616    #[cfg(feature = "script")]
2617    #[tokio::test]
2618    async fn load_script_from_records_audit_event() {
2619        let state = CoreState::new(None).await;
2620
2621        state
2622            .load_script_from(
2623                AuditActor::Tauri,
2624                "tauri.load_script".to_string(),
2625                "globalThis.onRequestHeaders = (_flow) => {};",
2626            )
2627            .await
2628            .expect("script should load");
2629
2630        let events = state.recent_audit_events();
2631        let event = events.last().expect("audit event should exist");
2632        assert_eq!(event.actor, AuditActor::Tauri);
2633        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2634        assert_eq!(event.outcome, AuditOutcome::Success);
2635        assert_eq!(event.target, "tauri.load_script");
2636    }
2637}