Skip to main content

relay_core_runtime/
lib.rs

1//! The main Rust API for embedding RelayCore.
2//!
3//! Most users should start here. This crate owns the runtime state, proxy lifecycle,
4//! intercept rules, policy management, and event streams.
5//!
6//! > **Note:** The `relay-core` crate name was unavailable on crates.io.
7//! > `relay-core-runtime` is the official main package for RelayCore.
8//!
9//! ```toml
10//! [dependencies]
11//! relay-core-runtime = "0.1"
12//! relay-core-http = "0.1"   # optional REST/SSE adapter
13//! ```
14//!
15//! Common types are re-exported for convenience:
16//! ```rust
17//! use relay_core_runtime::CoreState;
18//! use relay_core_runtime::flow::Flow;
19//! use relay_core_runtime::policy::ProxyPolicy;
20//! use relay_core_runtime::audit::AuditActor;
21//! ```
22
23use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::collections::{BTreeSet, HashSet, VecDeque};
26use std::sync::Mutex;
27use std::time::{SystemTime, UNIX_EPOCH};
28use tokio::sync::{broadcast, mpsc, oneshot, watch};
29use relay_core_api::flow::{Flow, WebSocketMessage, FlowUpdate, BodyData, Direction, Layer};
30#[cfg(feature = "script")]
31use relay_core_script::ScriptInterceptor;
32use relay_core_lib::interceptor::{Interceptor, CompositeInterceptor};
33use relay_core_lib::tls::CertificateAuthority;
34use relay_core_lib::capture::{TcpCaptureSource, TransparentTcpCaptureSource, OriginalDstProvider};
35use relay_core_lib::capture::udp::UdpProxy;
36#[cfg(all(target_os = "linux", feature = "transparent-linux"))]
37use relay_core_lib::capture::LinuxOriginalDstProvider;
38#[cfg(all(target_os = "macos", feature = "transparent-macos"))]
39use relay_core_lib::capture::MacOsOriginalDstProvider;
40#[cfg(target_os = "windows")]
41use relay_core_lib::capture::WindowsOriginalDstProvider;
42use tokio::net::TcpListener;
43use std::net::SocketAddr;
44use relay_core_api::policy::{ProxyPolicy, ProxyPolicyPatch, RedactionPolicy};
45
46use tracing::error;
47
48use relay_core_lib::rule::Rule;
49use relay_core_lib::rule::engine::RuleEngine;
50use crate::rule::{
51    InterceptRule, InterceptRuleConfig, MockResponseRuleConfig, build_intercept_rules,
52    build_mock_response_rule,
53};
54use relay_core_storage::store::{AuditEventRecord, Store};
55use relay_core_api::modification::{FlowQuery, FlowSummary};
56use serde_json::json;
57use crate::audit::{AuditActor, AuditEvent, AuditEventKind, AuditOutcome};
58
59pub mod audit;
60pub mod rule;
61pub mod actors;
62pub mod interceptors;
63pub mod modification;
64pub mod services;
65
66// ── Re-exports for user convenience ──
67// Users can do `use relay_core_runtime::flow::Flow;` without knowing relay_core_api exists.
68pub use relay_core_api::{flow, policy};
69pub use relay_core_lib::InterceptionResult;
70pub use relay_core_lib::rule as lib_rule;
71
72use actors::flow_store::{FlowStoreActor, FlowStoreMessage};
73use actors::intercept_broker::{InterceptBrokerActor, InterceptBrokerMessage};
74use actors::rule_store::{RuleStoreActor, RuleStoreMessage};
75
76pub mod rule_engine {
77    pub use relay_core_lib::rule_engine::*;
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
81pub struct CoreMetrics {
82    pub flows_total: usize,
83    pub flows_in_memory: usize,
84    pub flows_dropped: usize,
85    pub intercepts_pending: usize,
86    pub ws_pending_messages: usize,
87    pub oldest_intercept_age_ms: Option<u64>,
88    pub oldest_ws_message_age_ms: Option<u64>,
89    pub rule_exec_errors: usize,
90    pub audit_events_total: usize,
91    pub audit_events_failed: usize,
92    pub flow_events_lagged_total: usize,
93    pub audit_events_lagged_total: usize,
94}
95
96impl CoreMetrics {
97    pub fn to_prometheus_text(&self) -> String {
98        let oldest_intercept_age_ms = self.oldest_intercept_age_ms.unwrap_or(0);
99        let oldest_ws_message_age_ms = self.oldest_ws_message_age_ms.unwrap_or(0);
100        format!(
101            "relay_core_flows_total {}\n\
102relay_core_flows_in_memory {}\n\
103relay_core_flows_dropped_total {}\n\
104relay_core_intercepts_pending {}\n\
105relay_core_ws_pending_messages {}\n\
106relay_core_oldest_intercept_age_ms {}\n\
107relay_core_oldest_ws_message_age_ms {}\n\
108relay_core_rule_exec_errors_total {}\n\
109relay_core_audit_events_total {}\n\
110relay_core_audit_events_failed_total {}\n\
111relay_core_flow_events_lagged_total {}\n\
112relay_core_audit_events_lagged_total {}\n",
113            self.flows_total,
114            self.flows_in_memory,
115            self.flows_dropped,
116            self.intercepts_pending,
117            self.ws_pending_messages,
118            oldest_intercept_age_ms,
119            oldest_ws_message_age_ms,
120            self.rule_exec_errors,
121            self.audit_events_total,
122            self.audit_events_failed,
123            self.flow_events_lagged_total,
124            self.audit_events_lagged_total,
125        )
126    }
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
130pub struct CoreStatusSnapshot {
131    pub phase: RuntimeLifecyclePhase,
132    pub running: bool,
133    pub port: Option<u16>,
134    pub uptime: Option<u64>,
135    pub last_error: Option<String>,
136}
137
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
139pub struct CoreStatusReport {
140    pub status: CoreStatusSnapshot,
141    pub metrics: CoreMetrics,
142}
143
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
145pub struct CoreInterceptSnapshot {
146    pub pending_count: usize,
147    pub ws_pending_count: usize,
148}
149
150#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
151pub struct CoreAuditSnapshot {
152    pub events: Vec<AuditEvent>,
153}
154
155#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
156pub struct CoreAuditQuery {
157    pub since_ms: Option<u64>,
158    pub until_ms: Option<u64>,
159    pub actor: Option<AuditActor>,
160    pub kind: Option<AuditEventKind>,
161    pub outcome: Option<AuditOutcome>,
162    pub limit: usize,
163}
164
165impl Default for CoreAuditQuery {
166    fn default() -> Self {
167        Self {
168            since_ms: None,
169            until_ms: None,
170            actor: None,
171            kind: None,
172            outcome: None,
173            limit: 50,
174        }
175    }
176}
177
178#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
179#[serde(rename_all = "snake_case")]
180pub enum RuntimeLifecyclePhase {
181    Created,
182    Starting,
183    Running,
184    Stopping,
185    Stopped,
186    Failed,
187}
188
189impl RuntimeLifecyclePhase {
190    pub fn as_str(&self) -> &'static str {
191        match self {
192            Self::Created => "created",
193            Self::Starting => "starting",
194            Self::Running => "running",
195            Self::Stopping => "stopping",
196            Self::Stopped => "stopped",
197            Self::Failed => "failed",
198        }
199    }
200
201    pub fn is_active(&self) -> bool {
202        matches!(self, Self::Starting | Self::Running | Self::Stopping)
203    }
204}
205
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
207pub struct RuntimeLifecycle {
208    pub phase: RuntimeLifecyclePhase,
209    pub port: Option<u16>,
210    pub started_at_ms: Option<u64>,
211    pub last_error: Option<String>,
212}
213
214impl RuntimeLifecycle {
215    pub fn created() -> Self {
216        Self {
217            phase: RuntimeLifecyclePhase::Created,
218            port: None,
219            started_at_ms: None,
220            last_error: None,
221        }
222    }
223
224    pub fn is_active(&self) -> bool {
225        self.phase.is_active()
226    }
227
228    pub fn uptime_seconds(&self) -> Option<u64> {
229        let started_at_ms = self.started_at_ms?;
230        let now_ms = now_unix_ms();
231        Some(now_ms.saturating_sub(started_at_ms) / 1_000)
232    }
233}
234
235pub enum ProxySpawnResult {
236    Started(tokio::task::JoinHandle<()>),
237    AlreadyRunning,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ProxyStopResult {
242    Stopping,
243    NotRunning,
244}
245
246impl From<RuntimeLifecycle> for CoreStatusSnapshot {
247    fn from(lifecycle: RuntimeLifecycle) -> Self {
248        Self {
249            running: lifecycle.is_active(),
250            uptime: lifecycle.uptime_seconds(),
251            port: lifecycle.port,
252            last_error: lifecycle.last_error,
253            phase: lifecycle.phase,
254        }
255    }
256}
257
258pub struct CoreState {
259    flow_store: mpsc::Sender<FlowStoreMessage>,
260    intercept_broker: mpsc::Sender<InterceptBrokerMessage>,
261    rule_store: mpsc::Sender<RuleStoreMessage>,
262    store: Option<Store>,
263    #[cfg(feature = "script")]
264    pub script_interceptor: Arc<ScriptInterceptor>,
265    pub policy_tx: watch::Sender<ProxyPolicy>,
266    pub flows_dropped: Arc<AtomicUsize>,
267    audit_events_total: Arc<AtomicUsize>,
268    audit_events_failed: Arc<AtomicUsize>,
269    flow_events_lagged_total: Arc<AtomicUsize>,
270    audit_events_lagged_total: Arc<AtomicUsize>,
271    /// 内部广播 channel:Tauri、Probe 等多个消费者均可订阅
272    flow_broadcast_tx: broadcast::Sender<FlowUpdate>,
273    audit_broadcast_tx: broadcast::Sender<AuditEvent>,
274    audit_history: Arc<Mutex<VecDeque<AuditEvent>>>,
275    lifecycle_tx: watch::Sender<RuntimeLifecycle>,
276    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
277}
278
279impl CoreState {
280    pub async fn new(db_url: Option<String>) -> Self {
281        const AUDIT_HISTORY_LIMIT: usize = 200;
282
283        let store = if let Some(url) = db_url {
284            match Store::connect(&url).await {
285                Ok(s) => {
286                    if let Err(e) = s.init().await {
287                        tracing::error!("Failed to init store: {}", e);
288                    }
289                    Some(s)
290                },
291                Err(e) => {
292                    tracing::error!("Failed to connect to store: {}", e);
293                    None
294                }
295            }
296        } else {
297            None
298        };
299
300        let (flow_tx, flow_rx) = mpsc::channel(10000);
301        let flow_actor = FlowStoreActor::new(flow_rx, store.clone());
302        tokio::spawn(flow_actor.run());
303
304        let (intercept_tx, intercept_rx) = mpsc::channel(1000);
305        let intercept_actor = InterceptBrokerActor::new(intercept_rx);
306        tokio::spawn(intercept_actor.run());
307
308        let (rule_tx, rule_rx) = mpsc::channel(100);
309        let rule_actor = RuleStoreActor::new(rule_rx, store.clone());
310        tokio::spawn(rule_actor.run());
311        
312        let (policy_tx, _) = watch::channel(ProxyPolicy::default());
313        let (flow_broadcast_tx, _) = broadcast::channel(1000);
314        let (audit_broadcast_tx, _) = broadcast::channel(256);
315        let (lifecycle_tx, _) = watch::channel(RuntimeLifecycle::created());
316
317        #[cfg(feature = "script")]
318        let script_interceptor = ScriptInterceptor::new().await.expect("Failed to initialize ScriptInterceptor");
319        Self {
320            flow_store: flow_tx,
321            intercept_broker: intercept_tx,
322            rule_store: rule_tx,
323            store,
324            #[cfg(feature = "script")]
325            script_interceptor: Arc::new(script_interceptor),
326            policy_tx,
327            flows_dropped: Arc::new(AtomicUsize::new(0)),
328            audit_events_total: Arc::new(AtomicUsize::new(0)),
329            audit_events_failed: Arc::new(AtomicUsize::new(0)),
330            flow_events_lagged_total: Arc::new(AtomicUsize::new(0)),
331            audit_events_lagged_total: Arc::new(AtomicUsize::new(0)),
332            flow_broadcast_tx,
333            audit_broadcast_tx,
334            audit_history: Arc::new(Mutex::new(VecDeque::with_capacity(AUDIT_HISTORY_LIMIT))),
335            lifecycle_tx,
336            shutdown_tx: Mutex::new(None),
337        }
338    }
339    
340    pub async fn get_metrics(&self) -> CoreMetrics {
341        let (flow_tx, flow_rx) = oneshot::channel();
342        let _ = self.flow_store.send(FlowStoreMessage::GetMetrics(flow_tx)).await;
343        let (flows_total, flows_in_memory) = flow_rx.await.unwrap_or((0, 0));
344
345        let (int_tx, int_rx) = oneshot::channel();
346        let _ = self.intercept_broker.send(InterceptBrokerMessage::GetMetrics { respond_to: int_tx }).await;
347        let (intercepts_pending, ws_pending_messages, oldest_intercept_age_ms, oldest_ws_message_age_ms) =
348            int_rx.await.unwrap_or((0, 0, None, None));
349
350        let (rule_tx, rule_rx) = oneshot::channel();
351        let _ = self.rule_store.send(RuleStoreMessage::GetMetrics(rule_tx)).await;
352        let rule_exec_errors = rule_rx.await.unwrap_or(0);
353
354        CoreMetrics {
355            flows_total,
356            flows_in_memory,
357            flows_dropped: self.flows_dropped.load(Ordering::Relaxed),
358            intercepts_pending,
359            ws_pending_messages,
360            oldest_intercept_age_ms,
361            oldest_ws_message_age_ms,
362            rule_exec_errors,
363            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
364            audit_events_failed: self.audit_events_failed.load(Ordering::Relaxed),
365            flow_events_lagged_total: self.flow_events_lagged_total.load(Ordering::Relaxed),
366            audit_events_lagged_total: self.audit_events_lagged_total.load(Ordering::Relaxed),
367        }
368    }
369
370    pub async fn get_metrics_prometheus_text(&self) -> String {
371        self.get_metrics().await.to_prometheus_text()
372    }
373
374    pub fn status_snapshot(&self) -> CoreStatusSnapshot {
375        self.lifecycle().into()
376    }
377
378    pub async fn status_report(&self) -> CoreStatusReport {
379        CoreStatusReport {
380            status: self.status_snapshot(),
381            metrics: self.get_metrics().await,
382        }
383    }
384
385    pub async fn intercept_snapshot(&self) -> CoreInterceptSnapshot {
386        let metrics = self.get_metrics().await;
387        CoreInterceptSnapshot {
388            pending_count: metrics.intercepts_pending,
389            ws_pending_count: metrics.ws_pending_messages,
390        }
391    }
392
393    pub fn audit_snapshot(&self, limit: usize) -> CoreAuditSnapshot {
394        let events = self.recent_audit_events();
395        let start = events.len().saturating_sub(limit);
396        CoreAuditSnapshot {
397            events: events.into_iter().skip(start).collect(),
398        }
399    }
400
401    pub async fn query_audit_snapshot(&self, query: CoreAuditQuery) -> CoreAuditSnapshot {
402        let limit = query.limit.clamp(1, 500);
403        if let Some(store) = &self.store {
404            let rows = store
405                .query_audit_events(
406                    query.since_ms,
407                    query.until_ms,
408                    query.actor.as_ref().map(AuditActor::as_str),
409                    query.kind.as_ref().map(AuditEventKind::as_str),
410                    query.outcome.as_ref().map(AuditOutcome::as_str),
411                    limit,
412                )
413                .await
414                .unwrap_or_default();
415
416            let mut events = Vec::with_capacity(rows.len());
417            for row in rows {
418                if let Ok(event) = serde_json::from_value::<AuditEvent>(row) {
419                    events.push(event);
420                }
421            }
422            return CoreAuditSnapshot { events };
423        }
424
425        let mut events = self.recent_audit_events();
426        events.reverse();
427        let filtered = events
428            .into_iter()
429            .filter(|event| {
430                query
431                    .since_ms
432                    .map(|v| event.timestamp_ms >= v)
433                    .unwrap_or(true)
434            })
435            .filter(|event| {
436                query
437                    .until_ms
438                    .map(|v| event.timestamp_ms <= v)
439                    .unwrap_or(true)
440            })
441            .filter(|event| {
442                query
443                    .actor
444                    .as_ref()
445                    .map(|v| &event.actor == v)
446                    .unwrap_or(true)
447            })
448            .filter(|event| query.kind.as_ref().map(|v| &event.kind == v).unwrap_or(true))
449            .filter(|event| {
450                query
451                    .outcome
452                    .as_ref()
453                    .map(|v| &event.outcome == v)
454                    .unwrap_or(true)
455            })
456            .take(limit)
457            .collect();
458        CoreAuditSnapshot { events: filtered }
459    }
460
461    pub async fn get_flow(&self, id: String) -> Option<Flow> {
462        let (tx, rx) = oneshot::channel();
463        if let Err(e) = self.flow_store.send(FlowStoreMessage::GetFlow { id: id.clone(), respond_to: tx }).await {
464            error!("Failed to send GetFlow request: {}", e);
465            if let Some(store) = &self.store {
466                return store
467                    .load_flow(&id)
468                    .await
469                    .ok()
470                    .flatten()
471                    .and_then(|value| serde_json::from_value::<Flow>(value).ok());
472            }
473            return None;
474        }
475        let flow = rx.await.map_err(|e| {
476            error!("Failed to receive Flow response: {}", e);
477            e
478        }).unwrap_or(None);
479        if let Some(flow) = flow {
480            return Some(redact_flow(flow, &self.current_redaction_policy()));
481        }
482        if let Some(store) = &self.store {
483            return store
484                .load_flow(&id)
485                .await
486                .ok()
487                .flatten()
488                .and_then(|value| serde_json::from_value::<Flow>(value).ok())
489                .map(|flow| redact_flow(flow, &self.current_redaction_policy()));
490        }
491        None
492    }
493
494    pub async fn get_rules(&self) -> Vec<Rule> {
495        let (tx, rx) = oneshot::channel();
496        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRules(tx)).await {
497            error!("Failed to send GetRules request: {}", e);
498            return Vec::new();
499        }
500        rx.await.map_err(|e| {
501            error!("Failed to receive Rules response: {}", e);
502            e
503        }).unwrap_or_default()
504    }
505
506    pub async fn set_rules(&self, rules: Vec<Rule>) {
507        let _ = self
508            .set_rules_from(
509                AuditActor::Runtime,
510                "rules.replace",
511                "rule_set".to_string(),
512                json!({}),
513                rules,
514            )
515            .await;
516    }
517
518    pub async fn upsert_rule_from(
519        &self,
520        actor: AuditActor,
521        operation: &str,
522        target: String,
523        details: serde_json::Value,
524        rule: Rule,
525    ) -> Result<(), String> {
526        let rule_id = rule.id.clone();
527        let mut rules = self.get_rules().await;
528        rules.retain(|existing| existing.id != rule_id);
529        rules.push(rule);
530        self.set_rules_from(actor, operation, target, details, rules).await
531    }
532
533    pub async fn delete_rule_from(
534        &self,
535        actor: AuditActor,
536        operation: &str,
537        target: String,
538        details: serde_json::Value,
539        rule_id: &str,
540    ) -> Result<bool, String> {
541        let mut rules = self.get_rules().await;
542        let before = rules.len();
543        rules.retain(|rule| rule.id != rule_id);
544        if rules.len() == before {
545            return Ok(false);
546        }
547        self.set_rules_from(actor, operation, target, details, rules)
548            .await
549            .map(|_| true)
550    }
551
552    pub async fn create_mock_response_rule_from(
553        &self,
554        actor: AuditActor,
555        target: String,
556        details: serde_json::Value,
557        config: MockResponseRuleConfig,
558    ) -> Result<String, String> {
559        let rule_id = config.rule_id.clone();
560        let rule = build_mock_response_rule(config);
561        self.upsert_rule_from(actor, "rule.mock_create", target, details, rule)
562            .await
563            .map(|_| rule_id)
564    }
565
566    pub async fn create_intercept_rule_from(
567        &self,
568        actor: AuditActor,
569        target: String,
570        details: serde_json::Value,
571        config: InterceptRuleConfig,
572    ) -> Result<String, String> {
573        let rule_id = config.rule_id.clone();
574        let mut rules = self.get_rules().await;
575        rules.extend(build_intercept_rules(config));
576        self.set_rules_from(actor, "rule.intercept_create", target, details, rules)
577            .await
578            .map(|_| rule_id)
579    }
580
581    pub async fn upsert_legacy_intercept_rule_from(
582        &self,
583        actor: AuditActor,
584        target: String,
585        details: serde_json::Value,
586        rule: InterceptRule,
587    ) -> Result<(), String> {
588        let rule_id = rule.id.clone();
589        let mut rules = self.get_rules().await;
590        rules.retain(|existing| existing.id != rule_id && !existing.id.starts_with(&format!("{}-", rule_id)));
591        rules.extend(rule.to_rules());
592        self.set_rules_from(actor, "rule.intercept_legacy_upsert", target, details, rules)
593            .await
594    }
595
596    pub async fn set_rules_from(
597        &self,
598        actor: AuditActor,
599        operation: &str,
600        target: String,
601        details: serde_json::Value,
602        rules: Vec<Rule>,
603    ) -> Result<(), String> {
604        let rule_count = rules.len();
605        if let Err(e) = self.rule_store.send(RuleStoreMessage::SetRules(rules)).await {
606            error!("Failed to send SetRules request: {}", e);
607            self.record_audit_event(AuditEvent::new(
608                actor,
609                AuditEventKind::RuleChanged,
610                target,
611                AuditOutcome::Failed,
612                json!({
613                    "operation": operation,
614                    "rule_count": rule_count,
615                    "details": details,
616                    "error": e.to_string()
617                }),
618            ));
619            return Err(e.to_string());
620        }
621
622        self.record_audit_event(AuditEvent::new(
623            actor,
624            AuditEventKind::RuleChanged,
625            target,
626            AuditOutcome::Success,
627            json!({
628                "operation": operation,
629                "rule_count": rule_count,
630                "details": details
631            }),
632        ));
633        Ok(())
634    }
635
636    pub async fn set_legacy_rules(&self, rules: Vec<InterceptRule>) {
637        let mut new_rules = Vec::new();
638        for rule in rules {
639            new_rules.extend(rule.to_rules());
640        }
641        self.set_rules(new_rules).await;
642    }
643
644    pub async fn get_rule_engine(&self) -> Arc<RuleEngine> {
645        let (tx, rx) = oneshot::channel();
646        if let Err(e) = self.rule_store.send(RuleStoreMessage::GetRuleEngine(tx)).await {
647            error!("Failed to send GetRuleEngine request: {}", e);
648            return Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None));
649        }
650        rx.await.map_err(|e| {
651            error!("Failed to receive RuleEngine response: {}", e);
652            e
653        }).unwrap_or_else(|_| Arc::new(RuleEngine::new(Vec::new(), Vec::new(), None, None)))
654    }
655
656    pub fn update_policy(&self, policy: ProxyPolicy) {
657        self.update_policy_from(AuditActor::Runtime, "policy".to_string(), policy);
658    }
659
660    pub fn patch_policy_from(&self, actor: AuditActor, target: String, patch: ProxyPolicyPatch) {
661        let mut policy = self.policy_snapshot();
662        policy.apply_patch(patch);
663        self.update_policy_from(actor, target, policy);
664    }
665
666    pub fn policy_snapshot(&self) -> ProxyPolicy {
667        self.policy_tx.borrow().clone()
668    }
669
670    pub fn update_policy_from(&self, actor: AuditActor, target: String, policy: ProxyPolicy) {
671        let details = json!({
672            "strict_http_semantics": policy.strict_http_semantics,
673            "request_timeout_ms": policy.request_timeout_ms,
674            "max_body_size": policy.max_body_size,
675            "transparent_enabled": policy.transparent_enabled,
676            "redaction_enabled": policy.redaction.enabled,
677            "redact_bodies": policy.redaction.redact_bodies
678        });
679
680        self.policy_tx.send_replace(policy);
681        self.record_audit_event(AuditEvent::new(
682            actor,
683            AuditEventKind::PolicyUpdated,
684            target,
685            AuditOutcome::Success,
686            details,
687        ));
688    }
689
690    pub async fn register_intercept(&self, key: String, tx: oneshot::Sender<InterceptionResult>) {
691        if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::RegisterIntercept { key, tx }).await {
692             error!("Failed to send RegisterIntercept request: {}", e);
693        }
694    }
695
696    pub async fn resolve_intercept(&self, key: String, result: InterceptionResult) -> Result<(), String> {
697        let (tx, rx) = oneshot::channel();
698        if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::ResolveIntercept { key, result, respond_to: tx }).await {
699            error!("Failed to send ResolveIntercept request: {}", e);
700            return Err(e.to_string());
701        }
702        rx.await.map_err(|_| "Actor dropped".to_string())?
703    }
704
705    pub async fn get_pending_ws_message(&self, key: String) -> Option<WebSocketMessage> {
706        let (tx, rx) = oneshot::channel();
707        if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingWebSocketMessage { key, respond_to: tx }).await {
708             error!("Failed to send GetPendingWebSocketMessage request: {}", e);
709             return None;
710        }
711        rx.await.map_err(|e| {
712             error!("Failed to receive WebSocketMessage response: {}", e);
713             e
714        }).unwrap_or(None)
715    }
716
717    pub async fn set_pending_ws_message(&self, key: String, message: WebSocketMessage) {
718        if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::SetPendingWebSocketMessage { key, message }).await {
719             error!("Failed to send SetPendingWebSocketMessage request: {}", e);
720        }
721    }
722
723    pub async fn is_intercept_pending(&self, key: String) -> bool {
724        let (tx, rx) = oneshot::channel();
725        if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingIntercept { key, respond_to: tx }).await {
726             error!("Failed to send GetPendingIntercept request: {}", e);
727             return false;
728        }
729        rx.await.map_err(|e| {
730             error!("Failed to receive PendingIntercept response: {}", e);
731             e
732        }).unwrap_or(false)
733    }
734
735    pub async fn is_flow_intercepted(&self, flow_id: String) -> bool {
736        let (tx, rx) = oneshot::channel();
737        if let Err(e) = self.intercept_broker.send(InterceptBrokerMessage::GetPendingInterceptByFlowId { flow_id, respond_to: tx }).await {
738            error!("Failed to send GetPendingInterceptByFlowId request: {}", e);
739            return false;
740        }
741        rx.await.map_err(|e| {
742            error!("Failed to receive PendingInterceptByFlowId response: {}", e);
743            e
744        }).unwrap_or(false)
745    }
746
747    pub fn upsert_flow(&self, flow: Box<Flow>) {
748        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpsertFlow(flow)) {
749            // Drop flow if channel is full
750            error!("FlowStore dropped flow: {}", e);
751            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
752        }
753    }
754
755    pub fn append_ws_message(&self, flow_id: String, message: WebSocketMessage) {
756        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::AppendWebSocketMessage { flow_id, message }) {
757            error!("FlowStore dropped WS message: {}", e);
758            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
759        }
760    }
761
762    pub fn update_http_body(&self, flow_id: String, body: BodyData, direction: Direction) {
763        if let Err(e) = self.flow_store.try_send(FlowStoreMessage::UpdateHttpBody { flow_id, body, direction }) {
764            error!("FlowStore dropped HTTP body: {}", e);
765            self.flows_dropped.fetch_add(1, Ordering::Relaxed);
766        }
767    }
768
769    /// 订阅 FlowUpdate 广播。调用者获得独立 Receiver,lag 时自动跳过过期消息。
770    pub fn subscribe_flow_updates(&self) -> broadcast::Receiver<FlowUpdate> {
771        self.flow_broadcast_tx.subscribe()
772    }
773
774    pub fn subscribe_audit_events(&self) -> broadcast::Receiver<AuditEvent> {
775        self.audit_broadcast_tx.subscribe()
776    }
777
778    fn current_redaction_policy(&self) -> RedactionPolicy {
779        self.policy_tx.borrow().redaction.clone()
780    }
781
782    pub fn record_flow_events_lagged(&self, skipped: u64) {
783        self.flow_events_lagged_total
784            .fetch_add(skipped as usize, Ordering::Relaxed);
785    }
786
787    pub fn record_audit_events_lagged(&self, skipped: u64) {
788        self.audit_events_lagged_total
789            .fetch_add(skipped as usize, Ordering::Relaxed);
790    }
791
792    pub fn lifecycle(&self) -> RuntimeLifecycle {
793        self.lifecycle_tx.borrow().clone()
794    }
795
796    pub fn subscribe_lifecycle(&self) -> watch::Receiver<RuntimeLifecycle> {
797        self.lifecycle_tx.subscribe()
798    }
799
800    fn prepare_start(&self, port: u16, shutdown_tx: oneshot::Sender<()>) -> Result<(), String> {
801        let current = self.lifecycle();
802        if current.is_active() {
803            return Err(format!(
804                "Proxy is already {} on port {}",
805                current.phase.as_str(),
806                current.port.unwrap_or(port)
807            ));
808        }
809
810        let mut guard = self.shutdown_tx.lock().map_err(|_| "shutdown state poisoned".to_string())?;
811        *guard = Some(shutdown_tx);
812        drop(guard);
813
814        self.update_lifecycle(RuntimeLifecycle {
815            phase: RuntimeLifecyclePhase::Starting,
816            port: Some(port),
817            started_at_ms: None,
818            last_error: None,
819        });
820        Ok(())
821    }
822
823    pub fn stop_proxy(&self) -> Result<ProxyStopResult, String> {
824        let mut guard = self.shutdown_tx.lock().map_err(|_| "shutdown state poisoned".to_string())?;
825        let Some(tx) = guard.take() else {
826            return Ok(ProxyStopResult::NotRunning);
827        };
828        drop(guard);
829
830        let current = self.lifecycle();
831        self.update_lifecycle(RuntimeLifecycle {
832            phase: RuntimeLifecyclePhase::Stopping,
833            port: current.port,
834            started_at_ms: current.started_at_ms,
835            last_error: current.last_error,
836        });
837        let _ = tx.send(());
838        Ok(ProxyStopResult::Stopping)
839    }
840
841    pub fn recent_audit_events(&self) -> Vec<AuditEvent> {
842        self.audit_history
843            .lock()
844            .map(|events| events.iter().cloned().collect())
845            .unwrap_or_default()
846    }
847
848    /// 搜索内存中的 Flow 列表,返回轻量摘要。
849    pub async fn search_flows(&self, query: FlowQuery) -> Vec<FlowSummary> {
850        let redaction = self.current_redaction_policy();
851        if let Some(store) = &self.store {
852            return store
853                .query_flow_summaries(&query)
854                .await
855                .unwrap_or_default()
856                .into_iter()
857                .map(|summary| redact_flow_summary(summary, &redaction))
858                .collect();
859        }
860        let (tx, rx) = oneshot::channel();
861        if let Err(e) = self.flow_store.send(FlowStoreMessage::SearchFlows { query, respond_to: tx }).await {
862            error!("Failed to send SearchFlows request: {}", e);
863            return Vec::new();
864        }
865        rx.await
866            .unwrap_or_default()
867            .into_iter()
868            .map(|summary| redact_flow_summary(summary, &redaction))
869            .collect()
870    }
871
872    pub fn redact_flow_update_for_output(&self, update: FlowUpdate) -> FlowUpdate {
873        let redaction = self.current_redaction_policy();
874        redact_flow_update(update, &redaction)
875    }
876
877    /// 解除截获并可选地应用修改。
878    ///
879    /// `action` 为 `"drop"` 时直接丢弃;其他值(含 `"continue"`)则:
880    /// - 若 `mods` 为 None,直接 Continue;
881    /// - 若 `mods` 存在,根据 `key` 格式决定修改请求/响应还是 WebSocket 消息。
882    ///
883    /// `key` 格式约定:
884    /// - `"<flow_id>:<phase>"` — 修改请求或响应(phase 以 "request"/"response" 开头)
885    /// - `"<flow_id>:ws_msg:<msg_id>"` — 修改 WebSocket 消息
886    pub async fn resolve_intercept_with_modifications(
887        &self,
888        key: String,
889        action: &str,
890        mods: Option<relay_core_api::modification::FlowModification>,
891    ) -> Result<(), String> {
892        self.resolve_intercept_with_modifications_from(AuditActor::Runtime, key, action, mods)
893            .await
894    }
895
896    pub async fn resolve_intercept_with_modifications_from(
897        &self,
898        actor: AuditActor,
899        key: String,
900        action: &str,
901        mods: Option<relay_core_api::modification::FlowModification>,
902    ) -> Result<(), String> {
903        let modified_fields = modification_field_names(mods.as_ref());
904        let result = match action {
905            "drop" => InterceptionResult::Drop,
906            _ => match mods {
907                None => InterceptionResult::Continue,
908                Some(m) => {
909                    let parts: Vec<&str> = key.splitn(4, ':').collect();
910                    match parts.as_slice() {
911                        [flow_id, phase] => {
912                            if let Some(flow) = self.get_flow(flow_id.to_string()).await {
913                                modification::apply_flow_modification(&flow, phase, m)
914                            } else {
915                                InterceptionResult::Continue
916                            }
917                        }
918                        // ws_msg 格式:<flow_id>:ws_msg:<msg_id>
919                        // msg_id 本身是 UUID(含 -),不含 :,所以 splitn(4) 给出恰好 3 段
920                        [_, "ws_msg", _] => {
921                            if let Some(msg) = self.get_pending_ws_message(key.clone()).await {
922                                modification::apply_ws_modification(&msg, m)
923                            } else {
924                                InterceptionResult::Continue
925                            }
926                        }
927                        _ => InterceptionResult::Continue,
928                    }
929                }
930            },
931        };
932        let outcome = self.resolve_intercept(key.clone(), result).await;
933        let audit_outcome = if outcome.is_ok() {
934            AuditOutcome::Success
935        } else {
936            AuditOutcome::Failed
937        };
938        let error_message = outcome.as_ref().err().cloned();
939
940        self.record_audit_event(AuditEvent::new(
941            actor,
942            AuditEventKind::InterceptResolved,
943            key,
944            audit_outcome,
945            json!({
946                "action": action,
947                "has_modifications": !modified_fields.is_empty(),
948                "modified_fields": modified_fields,
949                "error": error_message
950            }),
951        ));
952        outcome
953    }
954
955    #[cfg(feature = "script")]
956    pub async fn load_script_from(
957        &self,
958        actor: AuditActor,
959        target: String,
960        script: &str,
961    ) -> Result<(), String> {
962        let result = self
963            .script_interceptor
964            .load_script(script)
965            .await
966            .map_err(|e| e.to_string());
967
968        let outcome = if result.is_ok() {
969            AuditOutcome::Success
970        } else {
971            AuditOutcome::Failed
972        };
973        let error_message = result.as_ref().err().cloned();
974
975        self.record_audit_event(AuditEvent::new(
976            actor,
977            AuditEventKind::ScriptReloaded,
978            target,
979            outcome,
980            json!({
981                "script_bytes": script.len(),
982                "error": error_message
983            }),
984        ));
985
986        result
987    }
988
989    fn record_audit_event(&self, event: AuditEvent) {
990        const AUDIT_HISTORY_LIMIT: usize = 200;
991        self.audit_events_total.fetch_add(1, Ordering::Relaxed);
992        if event.outcome == AuditOutcome::Failed {
993            self.audit_events_failed.fetch_add(1, Ordering::Relaxed);
994        }
995
996        let details = event.details.to_string();
997        tracing::info!(
998            target: "relay_core_audit",
999            event_id = %event.id,
1000            actor = %event.actor.as_str(),
1001            kind = %event.kind.as_str(),
1002            target = %event.target,
1003            outcome = %event.outcome.as_str(),
1004            details = %details
1005        );
1006
1007        if let Ok(mut history) = self.audit_history.lock() {
1008            if history.len() >= AUDIT_HISTORY_LIMIT {
1009                history.pop_front();
1010            }
1011            history.push_back(event.clone());
1012        }
1013
1014        if let Some(store) = self.store.clone() {
1015            let event_json = serde_json::to_value(&event).unwrap_or_default();
1016            let event_id = event.id.clone();
1017            let timestamp_ms = event.timestamp_ms;
1018            let actor = event.actor.as_str().to_string();
1019            let kind = event.kind.as_str().to_string();
1020            let target = event.target.clone();
1021            let outcome = event.outcome.as_str().to_string();
1022            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1023                handle.spawn(async move {
1024                    if let Err(e) = store
1025                        .save_audit_event(AuditEventRecord {
1026                            id: &event_id,
1027                            timestamp_ms,
1028                            actor: &actor,
1029                            kind: &kind,
1030                            target: &target,
1031                            outcome: &outcome,
1032                            content: &event_json,
1033                        })
1034                        .await
1035                    {
1036                        tracing::error!("Failed to persist audit event: {}", e);
1037                    }
1038                });
1039            }
1040        }
1041
1042        let _ = self.audit_broadcast_tx.send(event);
1043    }
1044
1045    fn transition_to_running(&self, port: u16) {
1046        self.update_lifecycle(RuntimeLifecycle {
1047            phase: RuntimeLifecyclePhase::Running,
1048            port: Some(port),
1049            started_at_ms: Some(now_unix_ms()),
1050            last_error: None,
1051        });
1052    }
1053
1054    fn transition_to_stopped(&self) {
1055        if let Ok(mut guard) = self.shutdown_tx.lock() {
1056            *guard = None;
1057        }
1058
1059        self.update_lifecycle(RuntimeLifecycle {
1060            phase: RuntimeLifecyclePhase::Stopped,
1061            port: None,
1062            started_at_ms: None,
1063            last_error: None,
1064        });
1065    }
1066
1067    fn transition_to_failed(&self, port: u16, error: String) {
1068        if let Ok(mut guard) = self.shutdown_tx.lock() {
1069            *guard = None;
1070        }
1071
1072        self.update_lifecycle(RuntimeLifecycle {
1073            phase: RuntimeLifecyclePhase::Failed,
1074            port: Some(port),
1075            started_at_ms: None,
1076            last_error: Some(error),
1077        });
1078    }
1079
1080    fn update_lifecycle(&self, lifecycle: RuntimeLifecycle) {
1081        tracing::info!(
1082            target: "relay_core_lifecycle",
1083            phase = %lifecycle.phase.as_str(),
1084            port = ?lifecycle.port,
1085            started_at_ms = ?lifecycle.started_at_ms,
1086            last_error = ?lifecycle.last_error
1087        );
1088        self.lifecycle_tx.send_replace(lifecycle);
1089    }
1090
1091    pub fn spawn_proxy(
1092        self: &Arc<Self>,
1093        config: ProxyConfig,
1094        sink: mpsc::Sender<FlowUpdate>,
1095        extra_interceptor: Option<Arc<dyn Interceptor>>,
1096    ) -> Result<ProxySpawnResult, String> {
1097        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1098        match self.prepare_start(config.port, shutdown_tx) {
1099            Ok(()) => {}
1100            Err(error) if error.contains("already") => return Ok(ProxySpawnResult::AlreadyRunning),
1101            Err(error) => return Err(error),
1102        }
1103        let state = self.clone();
1104        Ok(ProxySpawnResult::Started(tokio::spawn(async move {
1105            if let Err(error) = state
1106                .run_proxy(config, sink, extra_interceptor, shutdown_rx)
1107                .await
1108            {
1109                error!("Proxy failed: {}", error);
1110            }
1111        })))
1112    }
1113
1114    pub async fn start_proxy(
1115        self: &Arc<Self>,
1116        config: ProxyConfig,
1117        sink: mpsc::Sender<FlowUpdate>,
1118        extra_interceptor: Option<Arc<dyn Interceptor>>,
1119    ) -> Result<(), String> {
1120        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1121        self.prepare_start(config.port, shutdown_tx)?;
1122        self.run_proxy(config, sink, extra_interceptor, shutdown_rx).await
1123    }
1124
1125    async fn run_proxy(
1126        self: &Arc<Self>,
1127        config: ProxyConfig,
1128        sink: mpsc::Sender<FlowUpdate>,
1129        extra_interceptor: Option<Arc<dyn Interceptor>>,
1130        shutdown_rx: oneshot::Receiver<()>,
1131    ) -> Result<(), String> {
1132        let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
1133        let state = self.clone();
1134
1135        if let Some(parent) = config.ca_cert_path.parent()
1136            && !parent.exists() {
1137                std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1138            }
1139
1140        let ca = CertificateAuthority::load_or_create(&config.ca_cert_path, &config.ca_key_path)
1141            .map_err(|e| format!("Failed to load/create CA: {}", e))?;
1142        let ca = Arc::new(ca);
1143
1144        #[cfg(feature = "script")]
1145        let script_interceptor = self.script_interceptor.clone();
1146
1147        #[cfg(feature = "script")]
1148        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![script_interceptor];
1149
1150        #[cfg(not(feature = "script"))]
1151        let mut interceptors: Vec<Arc<dyn Interceptor>> = vec![];
1152
1153        interceptors.push(Arc::new(interceptors::metrics::MetricsInterceptor::new(self.clone())));
1154
1155        if let Some(interceptor) = extra_interceptor {
1156            interceptors.push(interceptor);
1157        }
1158
1159        let interceptor = Arc::new(CompositeInterceptor::new(interceptors));
1160        let (proxy_tx, mut proxy_rx) = mpsc::channel::<FlowUpdate>(1000);
1161
1162        tokio::spawn(async move {
1163            while let Some(update) = proxy_rx.recv().await {
1164                match update.clone() {
1165                    FlowUpdate::Full(flow) => {
1166                        state.upsert_flow(flow);
1167                    }
1168                    FlowUpdate::WebSocketMessage { flow_id, message } => {
1169                        state.append_ws_message(flow_id, message);
1170                    }
1171                    FlowUpdate::HttpBody {
1172                        flow_id,
1173                        direction,
1174                        body,
1175                    } => {
1176                        state.update_http_body(flow_id, body, direction);
1177                    }
1178                }
1179
1180                let _ = state.flow_broadcast_tx.send(update.clone());
1181
1182                if sink.try_send(update).is_err() {
1183                    relay_core_lib::metrics::inc_flows_dropped();
1184                }
1185            }
1186        });
1187
1188        if let Some(udp_port) = config.udp_tproxy_port {
1189            let udp_proxy_tx = proxy_tx.clone();
1190            let udp_addr = SocketAddr::from(([0, 0, 0, 0], udp_port));
1191            tokio::spawn(async move {
1192                match tokio::net::UdpSocket::bind(udp_addr).await {
1193                    Ok(socket) => {
1194                        let proxy = UdpProxy::new(socket, std::time::Duration::from_secs(60));
1195                        if let Err(e) = proxy.run(udp_proxy_tx).await {
1196                            error!("UDP TPROXY failed: {}", e);
1197                        }
1198                    }
1199                    Err(e) => {
1200                        error!("Failed to bind UDP TPROXY socket: {}", e);
1201                    }
1202                }
1203            });
1204        }
1205
1206        let listener = match TcpListener::bind(addr).await {
1207            Ok(listener) => listener,
1208            Err(e) => {
1209                if let Ok(mut guard) = self.shutdown_tx.lock() {
1210                    *guard = None;
1211                }
1212                let message = format!("Failed to bind to address {}: {}", addr, e);
1213                self.transition_to_failed(config.port, message.clone());
1214                return Err(message);
1215            }
1216        };
1217
1218        self.transition_to_running(config.port);
1219        let shutdown_rx = Some(shutdown_rx);
1220
1221        if config.transparent {
1222            let policy = ProxyPolicy {
1223                transparent_enabled: true,
1224                ..Default::default()
1225            };
1226            self.update_policy_from(AuditActor::Runtime, "proxy.transparent".to_string(), policy);
1227            let policy_rx = self.policy_tx.subscribe();
1228
1229            let provider: Arc<dyn OriginalDstProvider> = {
1230                let mut addrs = BTreeSet::new();
1231                if let Ok(local) = listener.local_addr() {
1232                    addrs.insert(local);
1233                }
1234
1235                #[cfg(all(target_os = "linux", feature = "transparent-linux"))]
1236                {
1237                    Arc::new(LinuxOriginalDstProvider::new(addrs))
1238                }
1239                #[cfg(all(target_os = "macos", feature = "transparent-macos"))]
1240                {
1241                    match MacOsOriginalDstProvider::new(addrs.clone()) {
1242                        Ok(provider) => Arc::new(provider),
1243                        Err(e) => {
1244                            error!("Failed to initialize macOS PF provider: {}", e);
1245                            Arc::new(relay_core_lib::capture::NoOpOriginalDstProvider::new(addrs))
1246                        }
1247                    }
1248                }
1249                #[cfg(target_os = "windows")]
1250                {
1251                    let filter = "outbound and !loopback and (tcp.DstPort == 80 or tcp.DstPort == 443)".to_string();
1252                    let port = config.port;
1253                    tokio::spawn(async move {
1254                        relay_core_lib::capture::windows::start_windivert_capture(filter, port).await;
1255                    });
1256
1257                    Arc::new(WindowsOriginalDstProvider::new(addrs))
1258                }
1259                #[cfg(not(any(
1260                    all(target_os = "linux", feature = "transparent-linux"),
1261                    all(target_os = "macos", feature = "transparent-macos"),
1262                    target_os = "windows"
1263                )))]
1264                {
1265                    Arc::new(NoOpOriginalDstProvider::new(addrs))
1266                }
1267            };
1268
1269            let source = TransparentTcpCaptureSource::new(listener, provider);
1270            let result = relay_core_lib::start_proxy(
1271                source,
1272                proxy_tx,
1273                interceptor,
1274                ca,
1275                policy_rx,
1276                None,
1277                shutdown_rx,
1278            )
1279            .await
1280            .map_err(|e| e.to_string());
1281            if let Err(error) = &result {
1282                self.transition_to_failed(config.port, error.clone());
1283            } else {
1284                self.transition_to_stopped();
1285            }
1286            result
1287        } else {
1288            let source = TcpCaptureSource::new(listener);
1289            let policy = ProxyPolicy::default();
1290            self.update_policy_from(AuditActor::Runtime, "proxy.standard".to_string(), policy);
1291            let policy_rx = self.policy_tx.subscribe();
1292
1293            let result = relay_core_lib::start_proxy(
1294                source,
1295                proxy_tx,
1296                interceptor,
1297                ca,
1298                policy_rx,
1299                None,
1300                shutdown_rx,
1301            )
1302            .await
1303            .map_err(|e| e.to_string());
1304            if let Err(error) = &result {
1305                self.transition_to_failed(config.port, error.clone());
1306            } else {
1307                self.transition_to_stopped();
1308            }
1309            result
1310        }
1311    }
1312}
1313
1314fn redact_flow_update(update: FlowUpdate, redaction: &RedactionPolicy) -> FlowUpdate {
1315    match update {
1316        FlowUpdate::Full(flow) => FlowUpdate::Full(Box::new(redact_flow(*flow, redaction))),
1317        FlowUpdate::WebSocketMessage { flow_id, mut message } => {
1318            message.content = redact_body(message.content, redaction);
1319            FlowUpdate::WebSocketMessage { flow_id, message }
1320        }
1321        FlowUpdate::HttpBody {
1322            flow_id,
1323            direction,
1324            body,
1325        } => FlowUpdate::HttpBody {
1326            flow_id,
1327            direction,
1328            body: redact_body(body, redaction),
1329        },
1330    }
1331}
1332
1333fn redact_flow(mut flow: Flow, redaction: &RedactionPolicy) -> Flow {
1334    if !redaction.enabled {
1335        return flow;
1336    }
1337    match &mut flow.layer {
1338        Layer::Http(http) => {
1339            redact_http_request(&mut http.request, redaction);
1340            if let Some(response) = &mut http.response {
1341                redact_headers(&mut response.headers, redaction);
1342                response.body = response.body.take().map(|body| redact_body(body, redaction));
1343            }
1344        }
1345        Layer::WebSocket(ws) => {
1346            redact_http_request(&mut ws.handshake_request, redaction);
1347            redact_headers(&mut ws.handshake_response.headers, redaction);
1348            ws.handshake_response.body = ws
1349                .handshake_response
1350                .body
1351                .take()
1352                .map(|body| redact_body(body, redaction));
1353            for message in &mut ws.messages {
1354                message.content = redact_body(message.content.clone(), redaction);
1355            }
1356        }
1357        _ => {}
1358    }
1359    flow
1360}
1361
1362fn redact_flow_summary(mut summary: FlowSummary, redaction: &RedactionPolicy) -> FlowSummary {
1363    if !redaction.enabled {
1364        return summary;
1365    }
1366    summary.url = redact_url_string(&summary.url, redaction);
1367    summary
1368}
1369
1370fn redact_http_request(request: &mut relay_core_api::flow::HttpRequest, redaction: &RedactionPolicy) {
1371    redact_headers(&mut request.headers, redaction);
1372    redact_query_pairs(&mut request.query, redaction);
1373    request.url = redact_url(&request.url, redaction);
1374    request.body = request.body.take().map(|body| redact_body(body, redaction));
1375}
1376
1377fn redact_headers(headers: &mut [(String, String)], redaction: &RedactionPolicy) {
1378    if !redaction.enabled {
1379        return;
1380    }
1381    let sensitive = redaction_set(&redaction.sensitive_header_names);
1382    for (name, value) in headers.iter_mut() {
1383        if sensitive.contains(&name.to_ascii_lowercase()) {
1384            *value = "[REDACTED]".to_string();
1385        }
1386    }
1387}
1388
1389fn redact_query_pairs(query: &mut [(String, String)], redaction: &RedactionPolicy) {
1390    if !redaction.enabled {
1391        return;
1392    }
1393    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1394    for (name, value) in query.iter_mut() {
1395        if sensitive.contains(&name.to_ascii_lowercase()) {
1396            *value = "[REDACTED]".to_string();
1397        }
1398    }
1399}
1400
1401fn redact_url(url: &url::Url, redaction: &RedactionPolicy) -> url::Url {
1402    if !redaction.enabled {
1403        return url.clone();
1404    }
1405    let sensitive = redaction_set(&redaction.sensitive_query_keys);
1406    let pairs: Vec<(String, String)> = url
1407        .query_pairs()
1408        .map(|(k, v)| {
1409            let key = k.to_string();
1410            let value = if sensitive.contains(&key.to_ascii_lowercase()) {
1411                "[REDACTED]".to_string()
1412            } else {
1413                v.to_string()
1414            };
1415            (key, value)
1416        })
1417        .collect();
1418    let mut next = url.clone();
1419    if pairs.is_empty() {
1420        return next;
1421    }
1422    next.query_pairs_mut().clear();
1423    for (k, v) in pairs {
1424        next.query_pairs_mut().append_pair(&k, &v);
1425    }
1426    next
1427}
1428
1429fn redact_url_string(input: &str, redaction: &RedactionPolicy) -> String {
1430    match url::Url::parse(input) {
1431        Ok(url) => redact_url(&url, redaction).to_string(),
1432        Err(_) => input.to_string(),
1433    }
1434}
1435
1436fn redact_body(mut body: BodyData, redaction: &RedactionPolicy) -> BodyData {
1437    if redaction.enabled && redaction.redact_bodies {
1438        body.content = "[REDACTED]".to_string();
1439    }
1440    body
1441}
1442
1443fn redaction_set(values: &[String]) -> HashSet<String> {
1444    values
1445        .iter()
1446        .map(|value| value.to_ascii_lowercase())
1447        .collect()
1448}
1449
1450#[derive(Clone)]
1451pub struct ProxyConfig {
1452    pub port: u16,
1453    pub ca_cert_path: std::path::PathBuf,
1454    pub ca_key_path: std::path::PathBuf,
1455    pub transparent: bool,
1456    pub udp_tproxy_port: Option<u16>,
1457}
1458
1459impl ProxyConfig {
1460    pub fn new(
1461        port: u16,
1462        ca_cert_path: std::path::PathBuf,
1463        ca_key_path: std::path::PathBuf,
1464    ) -> Self {
1465        Self {
1466            port,
1467            ca_cert_path,
1468            ca_key_path,
1469            transparent: false,
1470            udp_tproxy_port: None,
1471        }
1472    }
1473
1474    pub fn from_app_data_dir(
1475        app_data_dir: impl Into<std::path::PathBuf>,
1476        port: u16,
1477    ) -> Result<Self, String> {
1478        let app_data_dir = app_data_dir.into();
1479        if !app_data_dir.exists() {
1480            std::fs::create_dir_all(&app_data_dir).map_err(|e| e.to_string())?;
1481        }
1482
1483        Ok(Self::new(
1484            port,
1485            app_data_dir.join("ca_cert.pem"),
1486            app_data_dir.join("ca_key.pem"),
1487        ))
1488    }
1489
1490    pub fn with_transparent(mut self, transparent: bool) -> Self {
1491        self.transparent = transparent;
1492        self
1493    }
1494
1495    pub fn with_udp_tproxy_port(mut self, udp_tproxy_port: Option<u16>) -> Self {
1496        self.udp_tproxy_port = udp_tproxy_port;
1497        self
1498    }
1499}
1500
1501fn now_unix_ms() -> u64 {
1502    SystemTime::now()
1503        .duration_since(UNIX_EPOCH)
1504        .unwrap_or_default()
1505        .as_millis() as u64
1506}
1507
1508fn modification_field_names(
1509    mods: Option<&relay_core_api::modification::FlowModification>,
1510) -> Vec<&'static str> {
1511    let Some(mods) = mods else {
1512        return Vec::new();
1513    };
1514
1515    let mut fields = Vec::new();
1516    if mods.method.is_some() {
1517        fields.push("method");
1518    }
1519    if mods.url.is_some() {
1520        fields.push("url");
1521    }
1522    if mods.request_headers.is_some() {
1523        fields.push("request_headers");
1524    }
1525    if mods.request_body.is_some() {
1526        fields.push("request_body");
1527    }
1528    if mods.status_code.is_some() {
1529        fields.push("status_code");
1530    }
1531    if mods.response_headers.is_some() {
1532        fields.push("response_headers");
1533    }
1534    if mods.response_body.is_some() {
1535        fields.push("response_body");
1536    }
1537    if mods.message_content.is_some() {
1538        fields.push("message_content");
1539    }
1540    fields
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545    use super::*;
1546    use chrono::Utc;
1547    use relay_core_api::flow::{
1548        BodyData, Flow, FlowUpdate, HttpLayer, HttpRequest, HttpResponse, Layer, NetworkInfo,
1549        ResponseTiming, TransportProtocol,
1550    };
1551    use std::sync::atomic::{AtomicU64, Ordering};
1552    use tokio::time::{Duration, sleep};
1553    use url::Url;
1554    use uuid::Uuid;
1555
1556    static TEST_DB_COUNTER: AtomicU64 = AtomicU64::new(0);
1557
1558    fn sqlite_url() -> String {
1559        let nanos = SystemTime::now()
1560            .duration_since(UNIX_EPOCH)
1561            .expect("clock drift")
1562            .as_nanos();
1563        let pid = std::process::id();
1564        let seq = TEST_DB_COUNTER.fetch_add(1, Ordering::Relaxed);
1565        let db_dir = std::env::current_dir()
1566            .expect("cwd")
1567            .join("target")
1568            .join("test-dbs");
1569        std::fs::create_dir_all(&db_dir).expect("create test db dir");
1570        let db_path = db_dir.join(format!("relay-core-runtime-test-{}-{}-{}.db", pid, nanos, seq));
1571        format!("sqlite://{}?mode=rwc", db_path.display())
1572    }
1573
1574    fn sample_http_flow(host: &str, path: &str, method: &str, status: u16, ts: i64) -> Flow {
1575        let start_time = chrono::DateTime::<Utc>::from_timestamp_millis(ts)
1576            .expect("timestamp should be valid");
1577        let request_url =
1578            Url::parse(&format!("http://{}{}", host, path)).expect("url should parse");
1579        Flow {
1580            id: Uuid::new_v4(),
1581            start_time,
1582            end_time: Some(start_time),
1583            network: NetworkInfo {
1584                client_ip: "127.0.0.1".to_string(),
1585                client_port: 12000,
1586                server_ip: "127.0.0.1".to_string(),
1587                server_port: 8080,
1588                protocol: TransportProtocol::TCP,
1589                tls: false,
1590                tls_version: None,
1591                sni: None,
1592            },
1593            layer: Layer::Http(HttpLayer {
1594                request: HttpRequest {
1595                    method: method.to_string(),
1596                    url: request_url,
1597                    version: "HTTP/1.1".to_string(),
1598                    headers: vec![],
1599                    cookies: vec![],
1600                    query: vec![],
1601                    body: None,
1602                },
1603                response: Some(HttpResponse {
1604                    status,
1605                    status_text: "OK".to_string(),
1606                    version: "HTTP/1.1".to_string(),
1607                    headers: vec![],
1608                    cookies: vec![],
1609                    body: None,
1610                    timing: ResponseTiming {
1611                        time_to_first_byte: None,
1612                        time_to_last_byte: None,
1613                        connect_time_ms: None,
1614                        ssl_time_ms: None,
1615                    },
1616                }),
1617                error: None,
1618            }),
1619            tags: vec![],
1620            meta: std::collections::HashMap::new(),
1621        }
1622    }
1623
1624    fn sample_sensitive_http_flow(ts: i64) -> Flow {
1625        let start_time = chrono::DateTime::<Utc>::from_timestamp_millis(ts)
1626            .expect("timestamp should be valid");
1627        let request_url = Url::parse("http://api.example.com/private?token=abc123&ok=1")
1628            .expect("url should parse");
1629        Flow {
1630            id: Uuid::new_v4(),
1631            start_time,
1632            end_time: Some(start_time),
1633            network: NetworkInfo {
1634                client_ip: "127.0.0.1".to_string(),
1635                client_port: 12000,
1636                server_ip: "127.0.0.1".to_string(),
1637                server_port: 8080,
1638                protocol: TransportProtocol::TCP,
1639                tls: false,
1640                tls_version: None,
1641                sni: None,
1642            },
1643            layer: Layer::Http(HttpLayer {
1644                request: HttpRequest {
1645                    method: "GET".to_string(),
1646                    url: request_url,
1647                    version: "HTTP/1.1".to_string(),
1648                    headers: vec![
1649                        ("Authorization".to_string(), "Bearer secret-token".to_string()),
1650                        ("X-Normal".to_string(), "visible".to_string()),
1651                    ],
1652                    cookies: vec![],
1653                    query: vec![
1654                        ("token".to_string(), "abc123".to_string()),
1655                        ("ok".to_string(), "1".to_string()),
1656                    ],
1657                    body: Some(BodyData {
1658                        encoding: "utf-8".to_string(),
1659                        content: "secret request body".to_string(),
1660                        size: 19,
1661                    }),
1662                },
1663                response: Some(HttpResponse {
1664                    status: 200,
1665                    status_text: "OK".to_string(),
1666                    version: "HTTP/1.1".to_string(),
1667                    headers: vec![
1668                        ("Set-Cookie".to_string(), "session=abcd".to_string()),
1669                        ("X-Response".to_string(), "visible".to_string()),
1670                    ],
1671                    cookies: vec![],
1672                    body: Some(BodyData {
1673                        encoding: "utf-8".to_string(),
1674                        content: "secret response body".to_string(),
1675                        size: 20,
1676                    }),
1677                    timing: ResponseTiming {
1678                        time_to_first_byte: None,
1679                        time_to_last_byte: None,
1680                        connect_time_ms: None,
1681                        ssl_time_ms: None,
1682                    },
1683                }),
1684                error: None,
1685            }),
1686            tags: vec![],
1687            meta: std::collections::HashMap::new(),
1688        }
1689    }
1690
1691    #[tokio::test]
1692    async fn set_rules_from_records_audit_event() {
1693        let state = CoreState::new(None).await;
1694
1695        state
1696            .set_rules_from(
1697                AuditActor::Http,
1698                "rule.upsert",
1699                "rule-1".to_string(),
1700                json!({ "route": "/api/v1/rules" }),
1701                Vec::new(),
1702            )
1703            .await
1704            .expect("set rules should succeed");
1705
1706        let events = state.recent_audit_events();
1707        let event = events.last().expect("audit event should exist");
1708        assert_eq!(event.actor, AuditActor::Http);
1709        assert_eq!(event.kind, AuditEventKind::RuleChanged);
1710        assert_eq!(event.outcome, AuditOutcome::Success);
1711        assert_eq!(event.target, "rule-1");
1712        assert_eq!(event.details["operation"], "rule.upsert");
1713        assert_eq!(event.details["details"]["route"], "/api/v1/rules");
1714    }
1715
1716    #[tokio::test]
1717    async fn upsert_rule_from_replaces_existing_rule_and_records_audit_event() {
1718        let state = CoreState::new(None).await;
1719
1720        state
1721            .upsert_rule_from(
1722                AuditActor::Probe,
1723                "rule.upsert",
1724                "rule-1".to_string(),
1725                json!({ "tool": "set_rule" }),
1726                Rule {
1727                    id: "rule-1".to_string(),
1728                    name: "first".to_string(),
1729                    active: true,
1730                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1731                    priority: 1,
1732                    termination: relay_core_lib::rule::RuleTermination::Continue,
1733                    filter: relay_core_lib::rule::Filter::Url(
1734                        relay_core_lib::rule::StringMatcher::Contains("a".to_string()),
1735                    ),
1736                    actions: vec![],
1737                    constraints: None,
1738                },
1739            )
1740            .await
1741            .expect("initial upsert should succeed");
1742
1743        state
1744            .upsert_rule_from(
1745                AuditActor::Probe,
1746                "rule.upsert",
1747                "rule-1".to_string(),
1748                json!({ "tool": "set_rule" }),
1749                Rule {
1750                    id: "rule-1".to_string(),
1751                    name: "second".to_string(),
1752                    active: true,
1753                    stage: relay_core_lib::rule::RuleStage::RequestHeaders,
1754                    priority: 2,
1755                    termination: relay_core_lib::rule::RuleTermination::Continue,
1756                    filter: relay_core_lib::rule::Filter::Url(
1757                        relay_core_lib::rule::StringMatcher::Contains("b".to_string()),
1758                    ),
1759                    actions: vec![],
1760                    constraints: None,
1761                },
1762            )
1763            .await
1764            .expect("replacement upsert should succeed");
1765
1766        let rules = state.get_rules().await;
1767        assert_eq!(rules.len(), 1);
1768        assert_eq!(rules[0].name, "second");
1769        let event = state.recent_audit_events().last().cloned().expect("audit event");
1770        assert_eq!(event.details["operation"], "rule.upsert");
1771        assert_eq!(event.details["details"]["tool"], "set_rule");
1772    }
1773
1774    #[tokio::test]
1775    async fn delete_rule_from_returns_false_when_rule_missing() {
1776        let state = CoreState::new(None).await;
1777
1778        let deleted = state
1779            .delete_rule_from(
1780                AuditActor::Http,
1781                "rule.delete",
1782                "missing".to_string(),
1783                json!({ "route": "/api/v1/rules/{id}" }),
1784                "missing",
1785            )
1786            .await
1787            .expect("delete should not fail");
1788
1789        assert!(!deleted);
1790        assert!(state.recent_audit_events().is_empty());
1791    }
1792
1793    #[tokio::test]
1794    async fn create_mock_response_rule_from_adds_rule_and_records_audit_event() {
1795        let state = CoreState::new(None).await;
1796
1797        let rule_id = state
1798            .create_mock_response_rule_from(
1799                AuditActor::Http,
1800                "api-mock-1".to_string(),
1801                json!({ "route": "/api/v1/mock", "status": 201 }),
1802                MockResponseRuleConfig {
1803                    rule_id: "api-mock-1".to_string(),
1804                    url_pattern: "example.com".to_string(),
1805                    name: "api-mock:example.com".to_string(),
1806                    status: 201,
1807                    content_type: "application/json".to_string(),
1808                    body: "{\"ok\":true}".to_string(),
1809                },
1810            )
1811            .await
1812            .expect("mock rule should be created");
1813
1814        assert_eq!(rule_id, "api-mock-1");
1815        let rules = state.get_rules().await;
1816        assert_eq!(rules.len(), 1);
1817        assert_eq!(rules[0].id, "api-mock-1");
1818        let event = state.recent_audit_events().last().cloned().expect("audit event");
1819        assert_eq!(event.details["operation"], "rule.mock_create");
1820        assert_eq!(event.details["details"]["route"], "/api/v1/mock");
1821    }
1822
1823    #[tokio::test]
1824    async fn create_intercept_rule_from_adds_stop_rule_and_records_audit_event() {
1825        let state = CoreState::new(None).await;
1826
1827        let rule_id = state
1828            .create_intercept_rule_from(
1829                AuditActor::Http,
1830                "intercept-1".to_string(),
1831                json!({ "route": "/api/v1/intercepts", "phase": "request" }),
1832                InterceptRuleConfig {
1833                    rule_id: "intercept-1".to_string(),
1834                    active: true,
1835                    url_pattern: "example.com".to_string(),
1836                    method: None,
1837                    phase: "request".to_string(),
1838                    name: "api-intercept:example.com".to_string(),
1839                    priority: 100,
1840                    termination: relay_core_lib::rule::RuleTermination::Stop,
1841                },
1842            )
1843            .await
1844            .expect("intercept rule should be created");
1845
1846        assert_eq!(rule_id, "intercept-1");
1847        let rules = state.get_rules().await;
1848        assert_eq!(rules.len(), 1);
1849        assert_eq!(rules[0].id, "intercept-1");
1850        assert_eq!(rules[0].name, "api-intercept:example.com");
1851        let event = state.recent_audit_events().last().cloned().expect("audit event");
1852        assert_eq!(event.details["operation"], "rule.intercept_create");
1853        assert_eq!(event.details["details"]["route"], "/api/v1/intercepts");
1854    }
1855
1856    #[tokio::test]
1857    async fn upsert_legacy_intercept_rule_from_replaces_existing_family() {
1858        let state = CoreState::new(None).await;
1859
1860        state
1861            .upsert_legacy_intercept_rule_from(
1862                AuditActor::Tauri,
1863                "legacy-1".to_string(),
1864                json!({ "command": "set_intercept_rule" }),
1865                InterceptRule {
1866                    id: "legacy-1".to_string(),
1867                    active: true,
1868                    url_pattern: "example.com".to_string(),
1869                    method: None,
1870                    phase: "both".to_string(),
1871                },
1872            )
1873            .await
1874            .expect("initial family upsert should succeed");
1875        assert_eq!(state.get_rules().await.len(), 2);
1876
1877        state
1878            .upsert_legacy_intercept_rule_from(
1879                AuditActor::Tauri,
1880                "legacy-1".to_string(),
1881                json!({ "command": "set_intercept_rule" }),
1882                InterceptRule {
1883                    id: "legacy-1".to_string(),
1884                    active: true,
1885                    url_pattern: "example.org".to_string(),
1886                    method: Some("POST".to_string()),
1887                    phase: "request".to_string(),
1888                },
1889            )
1890            .await
1891            .expect("replacement family upsert should succeed");
1892
1893        let rules = state.get_rules().await;
1894        assert_eq!(rules.len(), 1);
1895        assert_eq!(rules[0].id, "legacy-1");
1896        let event = state.recent_audit_events().last().cloned().expect("audit event");
1897        assert_eq!(event.details["operation"], "rule.intercept_legacy_upsert");
1898        assert_eq!(event.details["details"]["command"], "set_intercept_rule");
1899    }
1900
1901    #[tokio::test]
1902    async fn resolve_intercept_failure_records_failed_audit_event() {
1903        let state = CoreState::new(None).await;
1904
1905        let result = state
1906            .resolve_intercept_with_modifications_from(
1907                AuditActor::Probe,
1908                "missing-flow:request".to_string(),
1909                "drop",
1910                None,
1911            )
1912            .await;
1913
1914        assert!(result.is_err());
1915        let events = state.recent_audit_events();
1916        let event = events.last().expect("audit event should exist");
1917        assert_eq!(event.actor, AuditActor::Probe);
1918        assert_eq!(event.kind, AuditEventKind::InterceptResolved);
1919        assert_eq!(event.outcome, AuditOutcome::Failed);
1920        assert_eq!(event.details["action"], "drop");
1921        assert!(event.details["error"].as_str().unwrap_or_default().contains("Interception not found"));
1922    }
1923
1924    #[tokio::test]
1925    async fn lifecycle_prepare_start_and_stop_updates_snapshot() {
1926        let state = CoreState::new(None).await;
1927        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1928
1929        state
1930            .prepare_start(8080, shutdown_tx)
1931            .expect("prepare start should succeed");
1932        let lifecycle = state.lifecycle();
1933        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Starting);
1934        assert_eq!(lifecycle.port, Some(8080));
1935        assert!(lifecycle.started_at_ms.is_none());
1936        assert!(lifecycle.last_error.is_none());
1937
1938        assert_eq!(
1939            state.stop_proxy().expect("stop should succeed"),
1940            ProxyStopResult::Stopping
1941        );
1942        let lifecycle = state.lifecycle();
1943        assert_eq!(lifecycle.phase, RuntimeLifecyclePhase::Stopping);
1944        assert_eq!(lifecycle.port, Some(8080));
1945        assert!(shutdown_rx.await.is_ok());
1946    }
1947
1948    #[tokio::test]
1949    async fn status_snapshot_derives_runtime_facing_fields() {
1950        let state = CoreState::new(None).await;
1951        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
1952        state
1953            .prepare_start(8080, shutdown_tx)
1954            .expect("prepare start should succeed");
1955
1956        let status = state.status_snapshot();
1957        assert_eq!(status.phase, RuntimeLifecyclePhase::Starting);
1958        assert!(status.running);
1959        assert_eq!(status.port, Some(8080));
1960        assert!(status.uptime.is_none());
1961        assert!(status.last_error.is_none());
1962    }
1963
1964    #[tokio::test]
1965    async fn status_report_combines_status_and_metrics() {
1966        let state = CoreState::new(None).await;
1967        let report = state.status_report().await;
1968
1969        assert_eq!(report.status.phase, RuntimeLifecyclePhase::Created);
1970        assert!(!report.status.running);
1971        assert_eq!(report.metrics.intercepts_pending, 0);
1972        assert_eq!(report.metrics.ws_pending_messages, 0);
1973        assert_eq!(report.metrics.oldest_intercept_age_ms, None);
1974        assert_eq!(report.metrics.oldest_ws_message_age_ms, None);
1975        assert_eq!(report.metrics.audit_events_total, 0);
1976        assert_eq!(report.metrics.audit_events_failed, 0);
1977        assert_eq!(report.metrics.flow_events_lagged_total, 0);
1978        assert_eq!(report.metrics.audit_events_lagged_total, 0);
1979    }
1980
1981    #[test]
1982    fn proxy_config_new_and_transport_setters_preserve_values() {
1983        let config = ProxyConfig::new(
1984            8080,
1985            std::path::PathBuf::from("/tmp/ca_cert.pem"),
1986            std::path::PathBuf::from("/tmp/ca_key.pem"),
1987        )
1988        .with_transparent(true)
1989        .with_udp_tproxy_port(Some(15000));
1990
1991        assert_eq!(config.port, 8080);
1992        assert_eq!(config.ca_cert_path, std::path::PathBuf::from("/tmp/ca_cert.pem"));
1993        assert_eq!(config.ca_key_path, std::path::PathBuf::from("/tmp/ca_key.pem"));
1994        assert!(config.transparent);
1995        assert_eq!(config.udp_tproxy_port, Some(15000));
1996    }
1997
1998    #[test]
1999    fn proxy_config_from_app_data_dir_creates_default_paths() {
2000        let unique = SystemTime::now()
2001            .duration_since(UNIX_EPOCH)
2002            .expect("clock drift")
2003            .as_nanos();
2004        let dir = std::env::temp_dir().join(format!("relaycraft-runtime-config-{}", unique));
2005
2006        let config = ProxyConfig::from_app_data_dir(dir.clone(), 8899).expect("config should build");
2007
2008        assert!(dir.exists());
2009        assert_eq!(config.port, 8899);
2010        assert_eq!(config.ca_cert_path, dir.join("ca_cert.pem"));
2011        assert_eq!(config.ca_key_path, dir.join("ca_key.pem"));
2012        assert!(!config.transparent);
2013        assert!(config.udp_tproxy_port.is_none());
2014    }
2015
2016    #[tokio::test]
2017    async fn intercept_snapshot_maps_pending_counts() {
2018        let state = CoreState::new(None).await;
2019        let snapshot = state.intercept_snapshot().await;
2020
2021        assert_eq!(snapshot.pending_count, 0);
2022        assert_eq!(snapshot.ws_pending_count, 0);
2023    }
2024
2025    #[tokio::test]
2026    async fn audit_snapshot_returns_latest_events_in_order() {
2027        let state = CoreState::new(None).await;
2028        state.record_audit_event(AuditEvent::new(
2029            AuditActor::Runtime,
2030            AuditEventKind::RuleChanged,
2031            "first",
2032            AuditOutcome::Success,
2033            json!({ "index": 1 }),
2034        ));
2035        state.record_audit_event(AuditEvent::new(
2036            AuditActor::Http,
2037            AuditEventKind::PolicyUpdated,
2038            "second",
2039            AuditOutcome::Success,
2040            json!({ "index": 2 }),
2041        ));
2042
2043        let snapshot = state.audit_snapshot(1);
2044
2045        assert_eq!(snapshot.events.len(), 1);
2046        assert_eq!(snapshot.events[0].target, "second");
2047        assert_eq!(snapshot.events[0].details["index"], 2);
2048    }
2049
2050    #[tokio::test]
2051    async fn query_audit_snapshot_filters_in_memory_events() {
2052        let state = CoreState::new(None).await;
2053        state.record_audit_event(AuditEvent::new(
2054            AuditActor::Http,
2055            AuditEventKind::RuleChanged,
2056            "rule-1",
2057            AuditOutcome::Success,
2058            json!({ "idx": 1 }),
2059        ));
2060        state.record_audit_event(AuditEvent::new(
2061            AuditActor::Probe,
2062            AuditEventKind::PolicyUpdated,
2063            "policy",
2064            AuditOutcome::Failed,
2065            json!({ "idx": 2 }),
2066        ));
2067
2068        let snapshot = state
2069            .query_audit_snapshot(CoreAuditQuery {
2070                actor: Some(AuditActor::Probe),
2071                kind: Some(AuditEventKind::PolicyUpdated),
2072                outcome: Some(AuditOutcome::Failed),
2073                limit: 10,
2074                ..Default::default()
2075            })
2076            .await;
2077
2078        assert_eq!(snapshot.events.len(), 1);
2079        assert_eq!(snapshot.events[0].actor, AuditActor::Probe);
2080        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2081        assert_eq!(snapshot.events[0].outcome, AuditOutcome::Failed);
2082    }
2083
2084    #[tokio::test]
2085    async fn query_audit_snapshot_reads_persisted_events_when_storage_enabled() {
2086        let state = CoreState::new(Some(sqlite_url())).await;
2087        state.update_policy_from(
2088            AuditActor::Http,
2089            "policy".to_string(),
2090            ProxyPolicy {
2091                transparent_enabled: true,
2092                ..Default::default()
2093            },
2094        );
2095
2096        let mut snapshot = CoreAuditSnapshot { events: Vec::new() };
2097        for _ in 0..10 {
2098            snapshot = state
2099                .query_audit_snapshot(CoreAuditQuery {
2100                    actor: Some(AuditActor::Http),
2101                    kind: Some(AuditEventKind::PolicyUpdated),
2102                    limit: 10,
2103                    ..Default::default()
2104                })
2105                .await;
2106            if !snapshot.events.is_empty() {
2107                break;
2108            }
2109            sleep(Duration::from_millis(20)).await;
2110        }
2111
2112        assert!(!snapshot.events.is_empty());
2113        assert_eq!(snapshot.events[0].actor, AuditActor::Http);
2114        assert_eq!(snapshot.events[0].kind, AuditEventKind::PolicyUpdated);
2115    }
2116
2117    #[tokio::test]
2118    async fn prepare_start_rejects_second_active_start() {
2119        let state = CoreState::new(None).await;
2120        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
2121        state
2122            .prepare_start(8080, shutdown_tx)
2123            .expect("first start should succeed");
2124
2125        let (second_tx, _second_rx) = oneshot::channel();
2126        let error = state
2127            .prepare_start(8081, second_tx)
2128            .expect_err("second active start should be rejected");
2129        assert!(error.contains("already"));
2130    }
2131
2132    #[test]
2133    fn update_policy_records_audit_event() {
2134        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2135        let state = runtime.block_on(CoreState::new(None));
2136
2137        state.update_policy_from(
2138            AuditActor::Runtime,
2139            "policy".to_string(),
2140            ProxyPolicy {
2141                transparent_enabled: true,
2142                ..Default::default()
2143            },
2144        );
2145
2146        let events = state.recent_audit_events();
2147        let event = events.last().expect("audit event should exist");
2148        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2149        assert_eq!(event.outcome, AuditOutcome::Success);
2150        assert_eq!(event.details["transparent_enabled"], true);
2151    }
2152
2153    #[test]
2154    fn patch_policy_updates_redaction_without_replacing_other_fields() {
2155        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2156        let state = runtime.block_on(CoreState::new(None));
2157        let original_timeout = state.policy_snapshot().request_timeout_ms;
2158
2159        state.patch_policy_from(
2160            AuditActor::Runtime,
2161            "policy.patch".to_string(),
2162            relay_core_api::policy::ProxyPolicyPatch {
2163                redaction: Some(relay_core_api::policy::RedactionPolicyPatch {
2164                    enabled: Some(true),
2165                    redact_bodies: Some(true),
2166                    ..Default::default()
2167                }),
2168            },
2169        );
2170
2171        let policy = state.policy_snapshot();
2172        assert_eq!(policy.request_timeout_ms, original_timeout);
2173        assert!(policy.redaction.enabled);
2174        assert!(policy.redaction.redact_bodies);
2175
2176        let events = state.recent_audit_events();
2177        let event = events.last().expect("audit event should exist");
2178        assert_eq!(event.kind, AuditEventKind::PolicyUpdated);
2179        assert_eq!(event.details["redaction_enabled"], true);
2180    }
2181
2182    #[tokio::test]
2183    async fn metrics_include_audit_and_lagged_event_counters() {
2184        let state = CoreState::new(None).await;
2185
2186        state.update_policy_from(
2187            AuditActor::Runtime,
2188            "policy".to_string(),
2189            ProxyPolicy::default(),
2190        );
2191        let _ = state
2192            .resolve_intercept_with_modifications_from(
2193                AuditActor::Probe,
2194                "missing-flow:request".to_string(),
2195                "drop",
2196                None,
2197            )
2198            .await;
2199
2200        state.record_flow_events_lagged(3);
2201        state.record_audit_events_lagged(5);
2202
2203        let metrics = state.get_metrics().await;
2204        assert_eq!(metrics.audit_events_total, 2);
2205        assert_eq!(metrics.audit_events_failed, 1);
2206        assert_eq!(metrics.flow_events_lagged_total, 3);
2207        assert_eq!(metrics.audit_events_lagged_total, 5);
2208    }
2209
2210    #[tokio::test]
2211    async fn prometheus_metrics_text_contains_observability_fields() {
2212        let state = CoreState::new(None).await;
2213        state.record_flow_events_lagged(2);
2214        state.record_audit_events_lagged(4);
2215
2216        let text = state.get_metrics_prometheus_text().await;
2217        assert!(text.contains("relay_core_flow_events_lagged_total 2"));
2218        assert!(text.contains("relay_core_audit_events_lagged_total 4"));
2219        assert!(text.contains("relay_core_oldest_intercept_age_ms 0"));
2220        assert!(text.contains("relay_core_oldest_ws_message_age_ms 0"));
2221    }
2222
2223    #[tokio::test]
2224    async fn search_flows_uses_store_with_offset_pagination() {
2225        let state = CoreState::new(Some(sqlite_url())).await;
2226        let flow_a = sample_http_flow("api.example.com", "/a", "GET", 200, 1_700_000_001_000);
2227        let flow_b = sample_http_flow("api.example.com", "/b", "POST", 500, 1_700_000_002_000);
2228        let flow_c = sample_http_flow("api.example.com", "/c", "GET", 201, 1_700_000_003_000);
2229
2230        state.upsert_flow(Box::new(flow_a));
2231        state.upsert_flow(Box::new(flow_b));
2232        state.upsert_flow(Box::new(flow_c));
2233
2234        let mut baseline = Vec::new();
2235        for _ in 0..20 {
2236            baseline = state
2237                .search_flows(FlowQuery {
2238                    host: Some("api.example.com".to_string()),
2239                    path_contains: None,
2240                    method: None,
2241                    status_min: None,
2242                    status_max: None,
2243                    has_error: None,
2244                    is_websocket: None,
2245                    limit: Some(3),
2246                    offset: Some(0),
2247                })
2248                .await;
2249            if baseline.len() == 3 {
2250                break;
2251            }
2252            sleep(Duration::from_millis(20)).await;
2253        }
2254
2255        assert_eq!(baseline.len(), 3);
2256        let page = state
2257            .search_flows(FlowQuery {
2258                host: Some("api.example.com".to_string()),
2259                path_contains: None,
2260                method: None,
2261                status_min: None,
2262                status_max: None,
2263                has_error: None,
2264                is_websocket: None,
2265                limit: Some(1),
2266                offset: Some(1),
2267            })
2268            .await;
2269        assert_eq!(page.len(), 1);
2270        assert_eq!(page[0].id, baseline[1].id);
2271    }
2272
2273    #[tokio::test]
2274    async fn get_flow_falls_back_to_store_after_lru_eviction() {
2275        let state = CoreState::new(Some(sqlite_url())).await;
2276        let first_flow = sample_http_flow("persist.example.com", "/first", "GET", 200, 1_700_000_010_000);
2277        let first_id = first_flow.id.to_string();
2278        state.upsert_flow(Box::new(first_flow));
2279        for i in 0..240 {
2280            state.upsert_flow(Box::new(sample_http_flow(
2281                "persist.example.com",
2282                &format!("/{}", i),
2283                "GET",
2284                200,
2285                1_700_000_020_000 + i,
2286            )));
2287        }
2288
2289        sleep(Duration::from_millis(200)).await;
2290
2291        let loaded = state.get_flow(first_id).await;
2292        assert!(loaded.is_some());
2293    }
2294
2295    #[tokio::test]
2296    async fn search_flows_redacts_summary_url_when_enabled() {
2297        let state = CoreState::new(None).await;
2298        state.update_policy_from(
2299            AuditActor::Runtime,
2300            "policy.redaction".to_string(),
2301            ProxyPolicy {
2302                redaction: RedactionPolicy {
2303                    enabled: true,
2304                    sensitive_query_keys: vec!["token".to_string()],
2305                    redact_bodies: false,
2306                    ..Default::default()
2307                },
2308                ..Default::default()
2309            },
2310        );
2311        state.upsert_flow(Box::new(sample_sensitive_http_flow(1_700_000_100_000)));
2312
2313        let mut items = Vec::new();
2314        for _ in 0..20 {
2315            items = state
2316                .search_flows(FlowQuery {
2317                    host: Some("api.example.com".to_string()),
2318                    path_contains: Some("/private".to_string()),
2319                    ..Default::default()
2320                })
2321                .await;
2322            if !items.is_empty() {
2323                break;
2324            }
2325            sleep(Duration::from_millis(20)).await;
2326        }
2327
2328        assert!(!items.is_empty());
2329        let redacted = Url::parse(&items[0].url).expect("summary url should parse");
2330        let token = redacted
2331            .query_pairs()
2332            .find(|(k, _)| k == "token")
2333            .map(|(_, v)| v.to_string());
2334        assert_eq!(token.as_deref(), Some("[REDACTED]"));
2335    }
2336
2337    #[tokio::test]
2338    async fn get_flow_applies_header_query_and_body_redaction_when_enabled() {
2339        let state = CoreState::new(Some(sqlite_url())).await;
2340        state.update_policy_from(
2341            AuditActor::Runtime,
2342            "policy.redaction".to_string(),
2343            ProxyPolicy {
2344                redaction: RedactionPolicy {
2345                    enabled: true,
2346                    sensitive_header_names: vec![
2347                        "authorization".to_string(),
2348                        "set-cookie".to_string(),
2349                    ],
2350                    sensitive_query_keys: vec!["token".to_string()],
2351                    redact_bodies: true,
2352                },
2353                ..Default::default()
2354            },
2355        );
2356
2357        let flow = sample_sensitive_http_flow(1_700_000_200_000);
2358        let flow_id = flow.id.to_string();
2359        state.upsert_flow(Box::new(flow));
2360        sleep(Duration::from_millis(80)).await;
2361
2362        let loaded = state.get_flow(flow_id).await.expect("flow should exist");
2363        let Layer::Http(http) = loaded.layer else {
2364            panic!("expected http layer");
2365        };
2366
2367        let auth = http
2368            .request
2369            .headers
2370            .iter()
2371            .find(|(k, _)| k.eq_ignore_ascii_case("authorization"))
2372            .map(|(_, v)| v.as_str());
2373        assert_eq!(auth, Some("[REDACTED]"));
2374
2375        let req_query_token = http
2376            .request
2377            .query
2378            .iter()
2379            .find(|(k, _)| k == "token")
2380            .map(|(_, v)| v.as_str());
2381        assert_eq!(req_query_token, Some("[REDACTED]"));
2382
2383        let req_body = http
2384            .request
2385            .body
2386            .as_ref()
2387            .map(|b| b.content.as_str());
2388        assert_eq!(req_body, Some("[REDACTED]"));
2389
2390        let response = http.response.expect("response should exist");
2391        let set_cookie = response
2392            .headers
2393            .iter()
2394            .find(|(k, _)| k.eq_ignore_ascii_case("set-cookie"))
2395            .map(|(_, v)| v.as_str());
2396        assert_eq!(set_cookie, Some("[REDACTED]"));
2397        let res_body = response.body.as_ref().map(|b| b.content.as_str());
2398        assert_eq!(res_body, Some("[REDACTED]"));
2399    }
2400
2401    #[test]
2402    fn redact_flow_update_masks_http_body_when_enabled() {
2403        let runtime = tokio::runtime::Runtime::new().expect("runtime should build");
2404        let state = runtime.block_on(CoreState::new(None));
2405        state.update_policy_from(
2406            AuditActor::Runtime,
2407            "policy.redaction".to_string(),
2408            ProxyPolicy {
2409                redaction: RedactionPolicy {
2410                    enabled: true,
2411                    redact_bodies: true,
2412                    ..Default::default()
2413                },
2414                ..Default::default()
2415            },
2416        );
2417
2418        let update = FlowUpdate::HttpBody {
2419            flow_id: "f-1".to_string(),
2420            direction: relay_core_api::flow::Direction::ClientToServer,
2421            body: BodyData {
2422                encoding: "utf-8".to_string(),
2423                content: "super-secret".to_string(),
2424                size: 12,
2425            },
2426        };
2427        let redacted = state.redact_flow_update_for_output(update);
2428        match redacted {
2429            FlowUpdate::HttpBody { body, .. } => assert_eq!(body.content, "[REDACTED]"),
2430            _ => panic!("expected http body update"),
2431        }
2432    }
2433
2434    #[cfg(feature = "script")]
2435    #[tokio::test]
2436    async fn load_script_from_records_audit_event() {
2437        let state = CoreState::new(None).await;
2438
2439        state
2440            .load_script_from(
2441                AuditActor::Tauri,
2442                "tauri.load_script".to_string(),
2443                "globalThis.onRequestHeaders = (_flow) => {};",
2444            )
2445            .await
2446            .expect("script should load");
2447
2448        let events = state.recent_audit_events();
2449        let event = events.last().expect("audit event should exist");
2450        assert_eq!(event.actor, AuditActor::Tauri);
2451        assert_eq!(event.kind, AuditEventKind::ScriptReloaded);
2452        assert_eq!(event.outcome, AuditOutcome::Success);
2453        assert_eq!(event.target, "tauri.load_script");
2454    }
2455}