Skip to main content

ranvier_inspector/
lib.rs

1pub mod alert;
2pub mod auth;
3pub mod breakpoint;
4pub mod metrics;
5pub mod payload;
6pub mod prometheus;
7pub mod relay;
8pub mod routes;
9pub mod schema;
10pub mod stall;
11pub mod trace_store;
12
13use async_trait::async_trait;
14use axum::{
15    Json, Router,
16    extract::{
17        Path as AxPath, State,
18        ws::{Message, WebSocket, WebSocketUpgrade},
19    },
20    http::{HeaderMap, StatusCode, header},
21    response::IntoResponse,
22    routing::get,
23};
24use ranvier_core::event::DlqReader;
25use ranvier_core::prelude::DebugControl;
26use ranvier_core::schematic::{NodeKind, Schematic};
27use serde::Deserialize;
28use serde_json::Value;
29use std::collections::{HashMap, HashSet};
30use std::fs;
31use std::net::SocketAddr;
32use std::sync::{Arc, Mutex, OnceLock};
33use std::time::{Instant, SystemTime, UNIX_EPOCH};
34
35#[async_trait]
36pub trait StateInspector: Send + Sync {
37    async fn get_state(&self, trace_id: &str) -> Option<Value>;
38    async fn force_resume(
39        &self,
40        trace_id: &str,
41        target_node: &str,
42        payload_override: Option<Value>,
43    ) -> Result<(), String>;
44}
45use tokio::sync::broadcast;
46use tower_http::cors::CorsLayer;
47use tracing::{Event, Id, Subscriber};
48use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan};
49
50static EVENT_CHANNEL: OnceLock<broadcast::Sender<String>> = OnceLock::new();
51static TRACE_REGISTRY: OnceLock<Arc<Mutex<ActiveTraceRegistry>>> = OnceLock::new();
52static PAYLOAD_POLICY: OnceLock<payload::PayloadCapturePolicy> = OnceLock::new();
53const QUICK_VIEW_HTML: &str = include_str!("quick_view/index.html");
54const QUICK_VIEW_JS: &str = include_str!("quick_view/app.js");
55const QUICK_VIEW_CSS: &str = include_str!("quick_view/styles.css");
56const INSPECTOR_API_VERSION: &str = "1.0";
57
58#[derive(Clone, Debug, serde::Serialize)]
59pub struct TraceRecord {
60    pub trace_id: String,
61    pub circuit: String,
62    pub status: TraceStatus,
63    pub started_at: u64,
64    pub finished_at: Option<u64>,
65    pub duration_ms: Option<u64>,
66    pub outcome_type: Option<String>,
67}
68
69#[derive(Clone, Debug, serde::Serialize, Eq, PartialEq)]
70#[serde(rename_all = "lowercase")]
71pub enum TraceStatus {
72    Active,
73    Completed,
74    Faulted,
75}
76
77pub struct ActiveTraceRegistry {
78    active: HashMap<String, TraceRecord>,
79    recent: Vec<TraceRecord>,
80    max_recent: usize,
81}
82
83impl ActiveTraceRegistry {
84    fn new() -> Self {
85        Self {
86            active: HashMap::new(),
87            recent: Vec::new(),
88            max_recent: 100,
89        }
90    }
91
92    fn register(&mut self, circuit: String) {
93        let trace_id = format!(
94            "{}-{}",
95            circuit.replace(' ', "_").to_lowercase(),
96            epoch_ms()
97        );
98        self.active.insert(
99            trace_id.clone(),
100            TraceRecord {
101                trace_id,
102                circuit,
103                status: TraceStatus::Active,
104                started_at: epoch_ms(),
105                finished_at: None,
106                duration_ms: None,
107                outcome_type: None,
108            },
109        );
110    }
111
112    fn complete(
113        &mut self,
114        circuit: &str,
115        outcome_type: Option<String>,
116        duration_ms: Option<u64>,
117    ) {
118        // Find the active trace for this circuit (most recent)
119        let key = self
120            .active
121            .iter()
122            .filter(|(_, r)| r.circuit == circuit)
123            .max_by_key(|(_, r)| r.started_at)
124            .map(|(k, _)| k.clone());
125
126        if let Some(key) = key {
127            if let Some(mut record) = self.active.remove(&key) {
128                record.finished_at = Some(epoch_ms());
129                record.duration_ms = duration_ms;
130                record.outcome_type = outcome_type.clone();
131                record.status = if outcome_type.as_deref() == Some("Fault") {
132                    TraceStatus::Faulted
133                } else {
134                    TraceStatus::Completed
135                };
136                self.recent.push(record);
137                if self.recent.len() > self.max_recent {
138                    self.recent.remove(0);
139                }
140            }
141        }
142    }
143
144    fn list_all(&self) -> Vec<TraceRecord> {
145        let mut result: Vec<TraceRecord> = self.active.values().cloned().collect();
146        result.extend(self.recent.iter().cloned());
147        result.sort_by(|a, b| b.started_at.cmp(&a.started_at));
148        result
149    }
150
151    /// Number of currently active (in-flight) traces.
152    pub fn active_count(&self) -> usize {
153        self.active.len()
154    }
155}
156
157pub fn get_trace_registry() -> Arc<Mutex<ActiveTraceRegistry>> {
158    TRACE_REGISTRY
159        .get_or_init(|| Arc::new(Mutex::new(ActiveTraceRegistry::new())))
160        .clone()
161}
162
163#[derive(Clone, Copy, Debug, Eq, PartialEq)]
164enum InspectorMode {
165    Dev,
166    Prod,
167}
168
169impl InspectorMode {
170    fn from_env() -> Self {
171        match std::env::var("RANVIER_MODE")
172            .unwrap_or_else(|_| "dev".to_string())
173            .to_ascii_lowercase()
174            .as_str()
175        {
176            "prod" | "production" => Self::Prod,
177            _ => Self::Dev,
178        }
179    }
180
181    fn from_str(mode: &str) -> Self {
182        match mode.to_ascii_lowercase().as_str() {
183            "prod" | "production" => Self::Prod,
184            _ => Self::Dev,
185        }
186    }
187}
188
189#[derive(Clone, Copy, Debug)]
190struct SurfacePolicy {
191    expose_internal: bool,
192    expose_events: bool,
193    expose_quick_view: bool,
194}
195
196impl SurfacePolicy {
197    fn for_mode(mode: InspectorMode) -> Self {
198        match mode {
199            InspectorMode::Dev => Self {
200                expose_internal: true,
201                expose_events: true,
202                expose_quick_view: true,
203            },
204            InspectorMode::Prod => Self {
205                expose_internal: false,
206                expose_events: false,
207                expose_quick_view: false,
208            },
209        }
210    }
211}
212
213#[derive(Clone, Copy, Debug, Eq, PartialEq)]
214enum AccessRole {
215    Viewer,
216    Operator,
217    Admin,
218}
219
220#[derive(Clone, Copy, Debug)]
221struct AuthPolicy {
222    enforce_headers: bool,
223    require_tenant_for_internal: bool,
224}
225
226impl AuthPolicy {
227    fn default() -> Self {
228        Self {
229            enforce_headers: false,
230            require_tenant_for_internal: false,
231        }
232    }
233
234    fn from_env() -> Self {
235        Self {
236            enforce_headers: env_flag("RANVIER_AUTH_ENFORCE", false),
237            require_tenant_for_internal: env_flag("RANVIER_AUTH_REQUIRE_TENANT_INTERNAL", false),
238        }
239    }
240}
241
242#[derive(Clone, Copy, Debug, Eq, PartialEq)]
243enum ProjectionSurface {
244    Public,
245    Internal,
246}
247
248#[derive(Clone, Copy, Debug, Eq, PartialEq)]
249enum RedactionMode {
250    Off,
251    Public,
252    Strict,
253}
254
255impl RedactionMode {
256    fn parse(raw: &str) -> Option<Self> {
257        match raw.trim().to_ascii_lowercase().as_str() {
258            "off" => Some(Self::Off),
259            "public" => Some(Self::Public),
260            "strict" => Some(Self::Strict),
261            _ => None,
262        }
263    }
264}
265
266#[derive(Clone, Debug)]
267struct TelemetryRedactionPolicy {
268    mode_override: Option<RedactionMode>,
269    sensitive_patterns: Vec<String>,
270    allow_keys: HashSet<String>,
271}
272
273impl Default for TelemetryRedactionPolicy {
274    fn default() -> Self {
275        Self {
276            mode_override: None,
277            sensitive_patterns: default_sensitive_patterns(),
278            allow_keys: HashSet::new(),
279        }
280    }
281}
282
283impl TelemetryRedactionPolicy {
284    fn from_env() -> Self {
285        let mut policy = Self::default();
286
287        if let Ok(raw_mode) = std::env::var("RANVIER_TELEMETRY_REDACT_MODE") {
288            match RedactionMode::parse(&raw_mode) {
289                Some(mode) => policy.mode_override = Some(mode),
290                None => tracing::warn!(
291                    "Invalid RANVIER_TELEMETRY_REDACT_MODE='{}' (expected: off|public|strict)",
292                    raw_mode
293                ),
294            }
295        }
296
297        if let Ok(extra) = std::env::var("RANVIER_TELEMETRY_REDACT_KEYS") {
298            for key in parse_csv_lower(&extra) {
299                if !policy.sensitive_patterns.contains(&key) {
300                    policy.sensitive_patterns.push(key);
301                }
302            }
303        }
304
305        if let Ok(allow) = std::env::var("RANVIER_TELEMETRY_ALLOW_KEYS") {
306            policy.allow_keys.extend(parse_csv_lower(&allow));
307        }
308
309        policy
310    }
311
312    fn mode_for_surface(&self, surface: ProjectionSurface) -> RedactionMode {
313        if let Some(mode) = self.mode_override {
314            return mode;
315        }
316
317        match surface {
318            ProjectionSurface::Public => RedactionMode::Public,
319            ProjectionSurface::Internal => RedactionMode::Off,
320        }
321    }
322
323    fn is_sensitive_key(&self, key: &str) -> bool {
324        let lowered = key.to_ascii_lowercase();
325        self.sensitive_patterns
326            .iter()
327            .any(|pattern| lowered.contains(pattern))
328    }
329}
330
331fn get_sender() -> &'static broadcast::Sender<String> {
332    EVENT_CHANNEL.get_or_init(|| {
333        let (tx, _rx) = broadcast::channel(100);
334        tx
335    })
336}
337
338/// Start the Inspector Server.
339pub struct Inspector {
340    port: u16,
341    schematic: Arc<Mutex<Schematic>>,
342    public_projection: Arc<Mutex<Option<Value>>>,
343    internal_projection: Arc<Mutex<Option<Value>>>,
344    public_projection_path: Option<String>,
345    internal_projection_path: Option<String>,
346    surface_policy: SurfacePolicy,
347    auth_policy: AuthPolicy,
348    redaction_policy: TelemetryRedactionPolicy,
349    state_inspector: Option<Arc<dyn StateInspector>>,
350    dlq_reader: Option<Arc<dyn DlqReader>>,
351    payload_policy: payload::PayloadCapturePolicy,
352    relay_state: Option<relay::RelayState>,
353    bearer_auth: auth::BearerAuth,
354    trace_store: Option<Arc<dyn trace_store::TraceStore>>,
355    alert_dispatcher: Option<Arc<alert::AlertDispatcher>>,
356}
357
358impl Inspector {
359    pub fn new(schematic: Schematic, port: u16) -> Self {
360        // Ensure channel exists
361        get_sender();
362        let public_projection = default_public_projection(&schematic);
363        let internal_projection = default_internal_projection(&schematic);
364
365        Self {
366            port,
367            schematic: Arc::new(Mutex::new(schematic)),
368            public_projection: Arc::new(Mutex::new(Some(public_projection))),
369            internal_projection: Arc::new(Mutex::new(Some(internal_projection))),
370            public_projection_path: None,
371            internal_projection_path: None,
372            surface_policy: SurfacePolicy::for_mode(InspectorMode::Dev),
373            auth_policy: AuthPolicy::default(),
374            redaction_policy: TelemetryRedactionPolicy::from_env(),
375            state_inspector: None,
376            dlq_reader: None,
377            payload_policy: payload::PayloadCapturePolicy::from_env(),
378            relay_state: None,
379            bearer_auth: auth::BearerAuth::default(),
380            trace_store: None,
381            alert_dispatcher: None,
382        }
383    }
384
385    /// Configure the relay target for proxying API requests into the running application.
386    ///
387    /// When set, `/api/v1/relay` will forward requests to the specified URL via `reqwest`.
388    ///
389    /// ```rust,ignore
390    /// let inspector = Inspector::new(schematic, 9090)
391    ///     .with_relay_target("http://127.0.0.1:3111");
392    /// ```
393    pub fn with_relay_target(mut self, target_url: impl Into<String>) -> Self {
394        let config = relay::RelayConfig::new(target_url);
395        self.relay_state = Some(relay::RelayState::new(config));
396        self
397    }
398
399    /// Register HTTP route descriptors for the `/api/v1/routes` endpoint.
400    ///
401    /// Call this with the route descriptors from your `HttpIngress`.
402    pub fn with_routes(self, route_infos: Vec<routes::RouteInfo>) -> Self {
403        routes::register_routes(route_infos);
404        self
405    }
406
407    pub fn with_dlq_reader(mut self, reader: Arc<dyn DlqReader>) -> Self {
408        self.dlq_reader = Some(reader);
409        self
410    }
411
412    pub fn with_payload_capture(mut self, policy: payload::PayloadCapturePolicy) -> Self {
413        self.payload_policy = policy;
414        self
415    }
416
417    pub fn with_state_inspector(mut self, inspector: Arc<dyn StateInspector>) -> Self {
418        self.state_inspector = Some(inspector);
419        self
420    }
421
422    /// Attach a read-only public projection artifact.
423    pub fn with_public_projection(self, projection: Value) -> Self {
424        if let Ok(mut slot) = self.public_projection.lock() {
425            *slot = Some(projection);
426        }
427        self
428    }
429
430    /// Attach a read-only internal projection artifact.
431    pub fn with_internal_projection(self, projection: Value) -> Self {
432        if let Ok(mut slot) = self.internal_projection.lock() {
433            *slot = Some(projection);
434        }
435        self
436    }
437
438    /// Load optional projection artifacts from environment variables:
439    /// - `RANVIER_TRACE_PUBLIC_PATH`
440    /// - `RANVIER_TRACE_INTERNAL_PATH`
441    ///
442    /// Invalid files are ignored with warning logs; bootstrap projections remain active.
443    pub fn with_projection_files_from_env(self) -> Self {
444        let mut inspector = self;
445
446        if let Ok(path) = std::env::var("RANVIER_TRACE_PUBLIC_PATH") {
447            inspector.public_projection_path = Some(path.clone());
448            match read_projection_file(&path) {
449                Ok(v) => inspector = inspector.with_public_projection(v),
450                Err(e) => tracing::warn!("Failed to load public projection from {}: {}", path, e),
451            }
452        }
453
454        if let Ok(path) = std::env::var("RANVIER_TRACE_INTERNAL_PATH") {
455            inspector.internal_projection_path = Some(path.clone());
456            match read_projection_file(&path) {
457                Ok(v) => inspector = inspector.with_internal_projection(v),
458                Err(e) => tracing::warn!("Failed to load internal projection from {}: {}", path, e),
459            }
460        }
461
462        inspector
463    }
464
465    /// Configure inspector route surface using `RANVIER_MODE=dev|prod`.
466    ///
467    /// - `dev` (default): expose `/trace/internal`, `/events`, `/quick-view`
468    /// - `prod`: hide internal/event/quick-view routes and keep public read-only endpoints
469    pub fn with_mode_from_env(mut self) -> Self {
470        let mode = InspectorMode::from_env();
471        self.surface_policy = SurfacePolicy::for_mode(mode);
472        self
473    }
474
475    /// Configure inspector route surface explicitly.
476    ///
477    /// Accepted values:
478    /// - `dev` (default)
479    /// - `prod` / `production`
480    pub fn with_mode(mut self, mode: &str) -> Self {
481        let parsed = InspectorMode::from_str(mode);
482        self.surface_policy = SurfacePolicy::for_mode(parsed);
483        self
484    }
485
486    /// Configure auth policy using environment variables.
487    ///
488    /// - `RANVIER_AUTH_ENFORCE=1`: require `X-Ranvier-Role` on inspector endpoints.
489    /// - `RANVIER_AUTH_REQUIRE_TENANT_INTERNAL=1`: require `X-Ranvier-Tenant` for internal endpoints.
490    pub fn with_auth_policy_from_env(mut self) -> Self {
491        self.auth_policy = AuthPolicy::from_env();
492        self
493    }
494
495    /// Toggle role-header enforcement.
496    pub fn with_auth_enforcement(mut self, enabled: bool) -> Self {
497        self.auth_policy.enforce_headers = enabled;
498        self
499    }
500
501    /// Toggle tenant-header requirement for internal endpoints.
502    pub fn with_require_tenant_for_internal(mut self, required: bool) -> Self {
503        self.auth_policy.require_tenant_for_internal = required;
504        self
505    }
506
507    /// Reload telemetry redaction policy from environment variables.
508    ///
509    /// Variables:
510    /// - `RANVIER_TELEMETRY_REDACT_MODE=off|public|strict`
511    /// - `RANVIER_TELEMETRY_REDACT_KEYS=comma,separated,patterns`
512    /// - `RANVIER_TELEMETRY_ALLOW_KEYS=comma,separated,keys`
513    pub fn with_redaction_policy_from_env(mut self) -> Self {
514        self.redaction_policy = TelemetryRedactionPolicy::from_env();
515        self
516    }
517
518    /// Configure Bearer token authentication for production deployments.
519    pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
520        self.bearer_auth = auth::BearerAuth { token: Some(token.into()) };
521        self
522    }
523
524    /// Load bearer token from `RANVIER_INSPECTOR_TOKEN` environment variable.
525    pub fn with_bearer_token_from_env(mut self) -> Self {
526        self.bearer_auth = auth::BearerAuth::from_env();
527        self
528    }
529
530    /// Configure a persistent trace store for trace history.
531    pub fn with_trace_store(mut self, store: Arc<dyn trace_store::TraceStore>) -> Self {
532        self.trace_store = Some(store);
533        self
534    }
535
536    /// Configure alert hooks for production monitoring.
537    pub fn with_alert_dispatcher(mut self, dispatcher: Arc<alert::AlertDispatcher>) -> Self {
538        self.alert_dispatcher = Some(dispatcher);
539        self
540    }
541
542    pub async fn serve(self) -> Result<(), std::io::Error> {
543        let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
544        let listener = tokio::net::TcpListener::bind(addr).await?;
545        self.serve_with_listener(listener).await
546    }
547
548    pub async fn serve_with_listener(
549        self,
550        listener: tokio::net::TcpListener,
551    ) -> Result<(), std::io::Error> {
552        let state = InspectorState {
553            schematic: self.schematic.clone(),
554            public_projection: self.public_projection.clone(),
555            internal_projection: self.internal_projection.clone(),
556            public_projection_path: self.public_projection_path.clone(),
557            internal_projection_path: self.internal_projection_path.clone(),
558            auth_policy: self.auth_policy,
559            redaction_policy: self.redaction_policy.clone(),
560            state_inspector: self.state_inspector,
561            dlq_reader: self.dlq_reader,
562            relay_state: self.relay_state,
563            bearer_auth: self.bearer_auth,
564            trace_store: self.trace_store,
565            alert_dispatcher: self.alert_dispatcher,
566        };
567
568        // Store payload capture policy in a global for the tracing Layer to access
569        PAYLOAD_POLICY.get_or_init(|| self.payload_policy);
570
571        let mut app = Router::new()
572            .route("/schematic", get(get_schematic))
573            .route("/trace/public", get(get_public_projection))
574            .route("/debug/resume/:trace_id", get(debug_resume))
575            .route("/debug/step/:trace_id", get(debug_step))
576            .route("/debug/pause/:trace_id", get(debug_pause))
577            .route("/api/v1/state/:trace_id", get(api_get_state))
578            .route(
579                "/api/v1/state/:trace_id/resume",
580                axum::routing::post(api_post_resume),
581            )
582            .route("/metrics", get(prometheus_metrics_handler))
583            .layer(CorsLayer::permissive());
584
585        if self.surface_policy.expose_internal {
586            app = app
587                .route("/trace/internal", get(get_internal_projection))
588                .route("/inspector/circuits", get(get_inspector_circuits))
589                .route(
590                    "/inspector/circuits/:name",
591                    get(get_inspector_circuit_by_name),
592                )
593                .route("/inspector/bus", get(get_inspector_bus))
594                .route(
595                    "/inspector/timeline/:request_id",
596                    get(get_inspector_timeline_by_request_id),
597                )
598                .route("/api/v1/traces", get(api_get_traces))
599                .route("/api/v1/metrics", get(api_get_metrics_all))
600                .route("/api/v1/metrics/:circuit", get(api_get_metrics))
601                .route("/api/v1/events", get(api_get_events))
602                .route("/api/v1/dlq", get(api_get_dlq))
603                .route(
604                    "/api/v1/breakpoints",
605                    get(api_get_breakpoints).post(api_post_breakpoint),
606                )
607                .route(
608                    "/api/v1/breakpoints/:bp_id",
609                    axum::routing::delete(api_delete_breakpoint)
610                        .patch(api_patch_breakpoint),
611                )
612                .route("/api/v1/stalls", get(api_get_stalls))
613                .route("/api/v1/routes", get(api_get_routes))
614                .route(
615                    "/api/v1/routes/schema",
616                    axum::routing::post(api_post_routes_schema),
617                )
618                .route(
619                    "/api/v1/routes/sample",
620                    axum::routing::post(api_post_routes_sample),
621                )
622                .route(
623                    "/api/v1/relay",
624                    axum::routing::post(api_post_relay),
625                )
626                .route("/api/v1/traces/stored", get(api_get_stored_traces));
627        }
628
629        if self.surface_policy.expose_events {
630            app = app.route("/events", get(ws_handler));
631        }
632
633        if self.surface_policy.expose_quick_view {
634            app = app
635                .route("/quick-view", get(get_quick_view_html))
636                .route("/quick-view/app.js", get(get_quick_view_js))
637                .route("/quick-view/styles.css", get(get_quick_view_css));
638        }
639
640        let app = app.with_state(state);
641        let addr = listener.local_addr()?;
642        tracing::info!("Ranvier Inspector listening on http://{}", addr);
643
644        // Spawn periodic metrics broadcast task
645        if self.surface_policy.expose_events {
646            let broadcast_interval = std::env::var("RANVIER_INSPECTOR_METRICS_INTERVAL_MS")
647                .ok()
648                .and_then(|v| v.parse::<u64>().ok())
649                .unwrap_or(2000);
650            tokio::spawn(async move {
651                let mut interval =
652                    tokio::time::interval(std::time::Duration::from_millis(broadcast_interval));
653                loop {
654                    interval.tick().await;
655                    let snapshots = metrics::snapshot_all();
656                    if !snapshots.is_empty() {
657                        let msg = serde_json::json!({
658                            "type": "metrics",
659                            "circuits": snapshots,
660                            "timestamp": epoch_ms()
661                        })
662                        .to_string();
663                        let _ = get_sender().send(msg);
664                    }
665
666                    // Stall detection piggybacks on the same timer
667                    let stalls = stall::detect_stalls();
668                    if !stalls.is_empty() {
669                        let msg = serde_json::json!({
670                            "type": "stall_detected",
671                            "stalls": stalls,
672                            "timestamp": epoch_ms()
673                        })
674                        .to_string();
675                        let _ = get_sender().send(msg);
676                    }
677                }
678            });
679        }
680
681        axum::serve(listener, app).await
682    }
683}
684
685fn default_public_projection(schematic: &Schematic) -> Value {
686    serde_json::json!({
687        "service_name": schematic.name,
688        "window_start": "1970-01-01T00:00:00Z",
689        "window_end": "1970-01-01T00:00:00Z",
690        "overall_status": "operational",
691        "circuits": [
692            {
693                "name": schematic.name,
694                "status": "operational",
695                "success_rate": 1.0,
696                "error_rate": 0.0,
697                "p95_latency_ms": 0.0
698            }
699        ]
700    })
701}
702
703fn default_internal_projection(schematic: &Schematic) -> Value {
704    let nodes = schematic
705        .nodes
706        .iter()
707        .map(|n| {
708            serde_json::json!({
709                "node_id": n.id,
710                "label": n.label,
711                "kind": node_kind_name(&n.kind),
712                "entered_at": "1970-01-01T00:00:00Z",
713                "exited_at": "1970-01-01T00:00:00Z",
714                "latency_ms": 0.0,
715                "outcome_type": "Next",
716                "branch_id": Value::Null,
717                "error_code": Value::Null,
718                "error_category": Value::Null
719            })
720        })
721        .collect::<Vec<_>>();
722
723    serde_json::json!({
724        "trace_id": "bootstrap",
725        "circuit_id": schematic.id,
726        "started_at": "1970-01-01T00:00:00Z",
727        "finished_at": "1970-01-01T00:00:00Z",
728        "nodes": nodes,
729        "summary": {
730            "node_count": schematic.nodes.len(),
731            "fault_count": 0,
732            "branch_count": 0
733        }
734    })
735}
736
737fn node_kind_name(kind: &NodeKind) -> &'static str {
738    match kind {
739        NodeKind::Ingress => "Ingress",
740        NodeKind::Atom => "Atom",
741        NodeKind::Synapse => "Synapse",
742        NodeKind::Egress => "Egress",
743        NodeKind::Subgraph(_) => "Subgraph",
744        NodeKind::FanOut => "FanOut",
745        NodeKind::FanIn => "FanIn",
746    }
747}
748
749fn read_projection_file(path: &str) -> Result<Value, String> {
750    let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
751    serde_json::from_str::<Value>(&content).map_err(|e| e.to_string())
752}
753
754fn default_sensitive_patterns() -> Vec<String> {
755    vec![
756        "password".to_string(),
757        "secret".to_string(),
758        "token".to_string(),
759        "authorization".to_string(),
760        "cookie".to_string(),
761        "session".to_string(),
762        "api_key".to_string(),
763        "credit_card".to_string(),
764        "ssn".to_string(),
765        "email".to_string(),
766        "phone".to_string(),
767    ]
768}
769
770fn parse_csv_lower(raw: &str) -> Vec<String> {
771    raw.split(',')
772        .map(|s| s.trim().to_ascii_lowercase())
773        .filter(|s| !s.is_empty())
774        .collect()
775}
776
777fn apply_projection_redaction(
778    projection: Value,
779    surface: ProjectionSurface,
780    policy: &TelemetryRedactionPolicy,
781) -> Value {
782    let mode = policy.mode_for_surface(surface);
783    redact_json_value(projection, mode, policy, None)
784}
785
786fn redact_json_value(
787    value: Value,
788    mode: RedactionMode,
789    policy: &TelemetryRedactionPolicy,
790    parent_key: Option<&str>,
791) -> Value {
792    if mode == RedactionMode::Off {
793        return value;
794    }
795
796    match value {
797        Value::Object(map) => {
798            let mut out = serde_json::Map::with_capacity(map.len());
799            let strict_attribute_bag =
800                mode == RedactionMode::Strict && parent_key.is_some_and(is_attribute_bag_key);
801
802            for (key, child) in map {
803                let lowered = key.to_ascii_lowercase();
804
805                if strict_attribute_bag && !policy.allow_keys.contains(&lowered) {
806                    if policy.is_sensitive_key(&lowered) {
807                        out.insert(key, Value::String("[REDACTED]".to_string()));
808                    }
809                    continue;
810                }
811
812                if policy.is_sensitive_key(&lowered) {
813                    out.insert(key, Value::String("[REDACTED]".to_string()));
814                    continue;
815                }
816
817                out.insert(
818                    key.clone(),
819                    redact_json_value(child, mode, policy, Some(&key)),
820                );
821            }
822            Value::Object(out)
823        }
824        Value::Array(values) => Value::Array(
825            values
826                .into_iter()
827                .map(|v| redact_json_value(v, mode, policy, parent_key))
828                .collect(),
829        ),
830        other => other,
831    }
832}
833
834fn is_attribute_bag_key(key: &str) -> bool {
835    let lowered = key.to_ascii_lowercase();
836    lowered == "attributes" || lowered.ends_with("_attributes")
837}
838
839async fn debug_resume(AxPath(trace_id): AxPath<String>) -> impl IntoResponse {
840    if let Some(debug) = get_debug_control_for_trace(&trace_id) {
841        debug.resume();
842        StatusCode::OK
843    } else {
844        StatusCode::NOT_FOUND
845    }
846}
847
848async fn debug_step(AxPath(trace_id): AxPath<String>) -> impl IntoResponse {
849    if let Some(debug) = get_debug_control_for_trace(&trace_id) {
850        debug.step();
851        StatusCode::OK
852    } else {
853        StatusCode::NOT_FOUND
854    }
855}
856
857async fn debug_pause(AxPath(trace_id): AxPath<String>) -> impl IntoResponse {
858    if let Some(debug) = get_debug_control_for_trace(&trace_id) {
859        debug.pause();
860        StatusCode::OK
861    } else {
862        StatusCode::NOT_FOUND
863    }
864}
865
866async fn api_get_state(
867    AxPath(trace_id): AxPath<String>,
868    State(state): State<InspectorState>,
869) -> impl IntoResponse {
870    if let Some(inspector) = state.state_inspector.as_ref()
871        && let Some(val) = inspector.get_state(&trace_id).await
872    {
873        return Json(val).into_response();
874    }
875    StatusCode::NOT_FOUND.into_response()
876}
877
878#[derive(Deserialize)]
879struct ResumePayload {
880    target_node: String,
881    #[allow(dead_code)]
882    force: bool,
883    payload_override: Option<Value>,
884}
885
886async fn api_post_resume(
887    AxPath(trace_id): AxPath<String>,
888    State(state): State<InspectorState>,
889    Json(payload): Json<ResumePayload>,
890) -> impl IntoResponse {
891    if let Some(inspector) = state.state_inspector.as_ref() {
892        match inspector
893            .force_resume(&trace_id, &payload.target_node, payload.payload_override)
894            .await
895        {
896            Ok(_) => StatusCode::OK.into_response(),
897            Err(e) => (StatusCode::BAD_REQUEST, e).into_response(),
898        }
899    } else {
900        (
901            StatusCode::SERVICE_UNAVAILABLE,
902            "State inspector not configured",
903        )
904            .into_response()
905    }
906}
907
908async fn api_get_traces(
909    headers: HeaderMap,
910    State(state): State<InspectorState>,
911) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
912    ensure_internal_access(&headers, &state.auth_policy)?;
913    let traces = get_trace_registry()
914        .lock()
915        .map(|r| r.list_all())
916        .unwrap_or_default();
917    Ok(inspector_envelope(
918        "inspector.traces.v1",
919        serde_json::json!({
920            "count": traces.len(),
921            "traces": traces
922        }),
923    ))
924}
925
926async fn api_get_metrics(
927    headers: HeaderMap,
928    AxPath(circuit): AxPath<String>,
929    State(state): State<InspectorState>,
930) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
931    ensure_internal_access(&headers, &state.auth_policy)?;
932    match metrics::snapshot_circuit(&circuit) {
933        Some(snap) => Ok(inspector_envelope(
934            "inspector.metrics.v1",
935            serde_json::json!(snap),
936        )),
937        None => Err((
938            StatusCode::NOT_FOUND,
939            Json(serde_json::json!({ "error": "No metrics for circuit", "circuit": circuit })),
940        )),
941    }
942}
943
944async fn api_get_metrics_all(
945    headers: HeaderMap,
946    State(state): State<InspectorState>,
947) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
948    ensure_internal_access(&headers, &state.auth_policy)?;
949    let snapshots = metrics::snapshot_all();
950    Ok(inspector_envelope(
951        "inspector.metrics.v1",
952        serde_json::json!({
953            "count": snapshots.len(),
954            "circuits": snapshots
955        }),
956    ))
957}
958
959/// Prometheus exposition format endpoint — `GET /metrics`.
960///
961/// Protected by BearerAuth when configured. Returns per-node execution
962/// metrics (invocations, errors, throughput, latency percentiles) and
963/// active trace count in Prometheus text format.
964async fn prometheus_metrics_handler(
965    headers: HeaderMap,
966    State(state): State<InspectorState>,
967) -> Result<impl IntoResponse, (StatusCode, Json<Value>)> {
968    state.bearer_auth.validate(&headers)?;
969    let body = prometheus::render();
970    Ok((
971        [(header::CONTENT_TYPE, prometheus::CONTENT_TYPE)],
972        body,
973    ))
974}
975
976async fn api_get_events(
977    headers: HeaderMap,
978    State(state): State<InspectorState>,
979) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
980    ensure_internal_access(&headers, &state.auth_policy)?;
981    let events = payload::list_events(200);
982    Ok(inspector_envelope(
983        "inspector.events.v1",
984        serde_json::json!({
985            "count": events.len(),
986            "events": events
987        }),
988    ))
989}
990
991async fn api_get_dlq(
992    headers: HeaderMap,
993    State(state): State<InspectorState>,
994) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
995    ensure_internal_access(&headers, &state.auth_policy)?;
996    match &state.dlq_reader {
997        Some(reader) => {
998            let letters = reader
999                .list_dead_letters(None, 100)
1000                .await
1001                .unwrap_or_default();
1002            let count = reader.count_dead_letters().await.unwrap_or(0);
1003            Ok(inspector_envelope(
1004                "inspector.dlq.v1",
1005                serde_json::json!({
1006                    "total": count,
1007                    "items": letters
1008                }),
1009            ))
1010        }
1011        None => Ok(inspector_envelope(
1012            "inspector.dlq.v1",
1013            serde_json::json!({
1014                "total": 0,
1015                "items": [],
1016                "note": "No DLQ reader configured"
1017            }),
1018        )),
1019    }
1020}
1021
1022async fn api_get_breakpoints(
1023    headers: HeaderMap,
1024    State(state): State<InspectorState>,
1025) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1026    ensure_internal_access(&headers, &state.auth_policy)?;
1027    let bps = breakpoint::list_breakpoints();
1028    Ok(inspector_envelope(
1029        "inspector.breakpoints.v1",
1030        serde_json::json!({
1031            "count": bps.len(),
1032            "breakpoints": bps
1033        }),
1034    ))
1035}
1036
1037#[derive(Deserialize)]
1038struct CreateBreakpointPayload {
1039    node_id: String,
1040    condition: Option<String>,
1041}
1042
1043async fn api_post_breakpoint(
1044    headers: HeaderMap,
1045    State(state): State<InspectorState>,
1046    Json(body): Json<CreateBreakpointPayload>,
1047) -> Result<(StatusCode, Json<Value>), (StatusCode, Json<Value>)> {
1048    ensure_internal_access(&headers, &state.auth_policy)?;
1049    let bp = breakpoint::add_breakpoint(body.node_id, body.condition);
1050    Ok((
1051        StatusCode::CREATED,
1052        inspector_envelope("inspector.breakpoint.v1", serde_json::json!(bp)),
1053    ))
1054}
1055
1056async fn api_delete_breakpoint(
1057    headers: HeaderMap,
1058    AxPath(bp_id): AxPath<String>,
1059    State(state): State<InspectorState>,
1060) -> Result<StatusCode, (StatusCode, Json<Value>)> {
1061    ensure_internal_access(&headers, &state.auth_policy)?;
1062    if breakpoint::remove_breakpoint(&bp_id) {
1063        Ok(StatusCode::NO_CONTENT)
1064    } else {
1065        Err(policy_error(StatusCode::NOT_FOUND, "breakpoint_not_found"))
1066    }
1067}
1068
1069#[derive(Deserialize)]
1070struct PatchBreakpointPayload {
1071    enabled: Option<bool>,
1072    condition: Option<Option<String>>,
1073}
1074
1075async fn api_patch_breakpoint(
1076    headers: HeaderMap,
1077    AxPath(bp_id): AxPath<String>,
1078    State(state): State<InspectorState>,
1079    Json(body): Json<PatchBreakpointPayload>,
1080) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1081    ensure_internal_access(&headers, &state.auth_policy)?;
1082    match breakpoint::update_breakpoint(&bp_id, body.enabled, body.condition) {
1083        Some(bp) => Ok(inspector_envelope(
1084            "inspector.breakpoint.v1",
1085            serde_json::json!(bp),
1086        )),
1087        None => Err(policy_error(StatusCode::NOT_FOUND, "breakpoint_not_found")),
1088    }
1089}
1090
1091async fn api_get_stalls(
1092    headers: HeaderMap,
1093    State(state): State<InspectorState>,
1094) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1095    ensure_internal_access(&headers, &state.auth_policy)?;
1096    let stalls = stall::detect_stalls();
1097    Ok(inspector_envelope(
1098        "inspector.stalls.v1",
1099        serde_json::json!({
1100            "count": stalls.len(),
1101            "stalls": stalls
1102        }),
1103    ))
1104}
1105
1106static DEBUG_REGISTRY: OnceLock<Arc<Mutex<HashMap<String, DebugControl>>>> = OnceLock::new();
1107
1108fn get_debug_registry() -> Arc<Mutex<HashMap<String, DebugControl>>> {
1109    DEBUG_REGISTRY
1110        .get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
1111        .clone()
1112}
1113
1114pub fn get_debug_control_for_trace(trace_id: &str) -> Option<DebugControl> {
1115    get_debug_registry().lock().unwrap().get(trace_id).cloned()
1116}
1117
1118pub fn register_debug_control(trace_id: String, control: DebugControl) {
1119    get_debug_registry()
1120        .lock()
1121        .unwrap()
1122        .insert(trace_id, control);
1123}
1124
1125pub fn unregister_debug_control(trace_id: &str) {
1126    get_debug_registry().lock().unwrap().remove(trace_id);
1127}
1128
1129#[derive(Clone)]
1130struct InspectorState {
1131    schematic: Arc<Mutex<Schematic>>,
1132    public_projection: Arc<Mutex<Option<Value>>>,
1133    internal_projection: Arc<Mutex<Option<Value>>>,
1134    public_projection_path: Option<String>,
1135    internal_projection_path: Option<String>,
1136    auth_policy: AuthPolicy,
1137    redaction_policy: TelemetryRedactionPolicy,
1138    state_inspector: Option<Arc<dyn StateInspector>>,
1139    dlq_reader: Option<Arc<dyn DlqReader>>,
1140    relay_state: Option<relay::RelayState>,
1141    bearer_auth: auth::BearerAuth,
1142    trace_store: Option<Arc<dyn trace_store::TraceStore>>,
1143    #[allow(dead_code)]
1144    alert_dispatcher: Option<Arc<alert::AlertDispatcher>>,
1145}
1146
1147pub fn layer() -> InspectorLayer {
1148    InspectorLayer
1149}
1150
1151/// Per-span data stored in span extensions by InspectorLayer.
1152struct SpanData {
1153    node_id: Option<String>,
1154    resource_type: Option<String>,
1155    circuit: Option<String>,
1156    outcome_kind: Option<String>,
1157    outcome_target: Option<String>,
1158    entered_at: Option<Instant>,
1159    duration_ms: Option<u64>,
1160}
1161
1162impl SpanData {
1163    fn from_visitor(v: SpanFieldExtractor) -> Self {
1164        Self {
1165            node_id: v.node_id,
1166            resource_type: v.resource_type,
1167            circuit: v.circuit,
1168            outcome_kind: v.outcome_kind,
1169            outcome_target: v.outcome_target,
1170            entered_at: None,
1171            duration_ms: None,
1172        }
1173    }
1174
1175    fn update_from_visitor(&mut self, v: SpanFieldExtractor) {
1176        if let Some(val) = v.node_id {
1177            self.node_id = Some(val);
1178        }
1179        if let Some(val) = v.resource_type {
1180            self.resource_type = Some(val);
1181        }
1182        if let Some(val) = v.circuit {
1183            self.circuit = Some(val);
1184        }
1185        if let Some(val) = v.outcome_kind {
1186            self.outcome_kind = Some(val);
1187        }
1188        if let Some(val) = v.outcome_target {
1189            self.outcome_target = Some(val);
1190        }
1191    }
1192}
1193
1194struct SpanFieldExtractor {
1195    node_id: Option<String>,
1196    resource_type: Option<String>,
1197    circuit: Option<String>,
1198    outcome_kind: Option<String>,
1199    outcome_target: Option<String>,
1200}
1201
1202impl SpanFieldExtractor {
1203    fn new() -> Self {
1204        Self {
1205            node_id: None,
1206            resource_type: None,
1207            circuit: None,
1208            outcome_kind: None,
1209            outcome_target: None,
1210        }
1211    }
1212}
1213
1214impl tracing::field::Visit for SpanFieldExtractor {
1215    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1216        match field.name() {
1217            "ranvier.node" => self.node_id = Some(value.to_string()),
1218            "ranvier.resource_type" => self.resource_type = Some(value.to_string()),
1219            "ranvier.circuit" => self.circuit = Some(value.to_string()),
1220            "ranvier.outcome_kind" => self.outcome_kind = Some(value.to_string()),
1221            "ranvier.outcome_target" => self.outcome_target = Some(value.to_string()),
1222            _ => {}
1223        }
1224    }
1225
1226    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1227        // For %Display fields, the Debug impl delegates to Display
1228        let s = format!("{value:?}");
1229        match field.name() {
1230            "ranvier.node" => self.node_id = Some(s),
1231            "ranvier.resource_type" => self.resource_type = Some(s),
1232            "ranvier.circuit" => self.circuit = Some(s),
1233            "ranvier.outcome_kind" => self.outcome_kind = Some(s),
1234            "ranvier.outcome_target" => self.outcome_target = Some(s),
1235            _ => {}
1236        }
1237    }
1238}
1239
1240fn epoch_ms() -> u64 {
1241    SystemTime::now()
1242        .duration_since(UNIX_EPOCH)
1243        .unwrap_or_default()
1244        .as_millis() as u64
1245}
1246
1247pub struct InspectorLayer;
1248
1249impl<S> Layer<S> for InspectorLayer
1250where
1251    S: Subscriber + for<'a> LookupSpan<'a>,
1252{
1253    fn on_new_span(
1254        &self,
1255        attrs: &tracing::span::Attributes<'_>,
1256        id: &Id,
1257        ctx: Context<'_, S>,
1258    ) {
1259        if let Some(span) = ctx.span(id) {
1260            let name = span.name();
1261            if name == "Node" || name == "Circuit" || name == "NodeRetry" {
1262                let mut extractor = SpanFieldExtractor::new();
1263                attrs.record(&mut extractor);
1264                span.extensions_mut()
1265                    .insert(SpanData::from_visitor(extractor));
1266            }
1267        }
1268    }
1269
1270    fn on_record(
1271        &self,
1272        id: &Id,
1273        values: &tracing::span::Record<'_>,
1274        ctx: Context<'_, S>,
1275    ) {
1276        if let Some(span) = ctx.span(id) {
1277            let name = span.name();
1278            if name == "Node" || name == "Circuit" || name == "NodeRetry" {
1279                let mut extractor = SpanFieldExtractor::new();
1280                values.record(&mut extractor);
1281                if let Some(data) = span.extensions_mut().get_mut::<SpanData>() {
1282                    data.update_from_visitor(extractor);
1283                }
1284            }
1285        }
1286    }
1287
1288    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
1289        let metadata = event.metadata();
1290        if metadata.target() == "ranvier.debugger" {
1291            let mut fields = HashMap::new();
1292            let mut visitor = FieldVisitor {
1293                fields: &mut fields,
1294            };
1295            event.record(&mut visitor);
1296
1297            if let (Some(trace_id), Some(node_id)) =
1298                (fields.get("trace_id"), fields.get("node_id"))
1299            {
1300                let msg = serde_json::json!({
1301                    "type": "node_paused",
1302                    "trace_id": trace_id,
1303                    "node_id": node_id,
1304                    "timestamp": epoch_ms()
1305                })
1306                .to_string();
1307                let _ = get_sender().send(msg);
1308            }
1309            return;
1310        }
1311
1312        if metadata.target().starts_with("ranvier") {
1313            let mut fields = HashMap::new();
1314            let mut visitor = FieldVisitor {
1315                fields: &mut fields,
1316            };
1317            event.record(&mut visitor);
1318
1319            let msg = serde_json::json!({
1320                "type": "event",
1321                "target": metadata.target(),
1322                "level": format!("{}", metadata.level()),
1323                "fields": fields,
1324                "timestamp": epoch_ms()
1325            })
1326            .to_string();
1327            let _ = get_sender().send(msg);
1328        }
1329    }
1330
1331    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
1332        if let Some(span) = ctx.span(id) {
1333            let name = span.name();
1334            if name == "Node" || name == "Circuit" {
1335                let mut extensions = span.extensions_mut();
1336                if let Some(data) = extensions.get_mut::<SpanData>() {
1337                    data.entered_at = Some(Instant::now());
1338
1339                    if name == "Node" {
1340                        let msg = serde_json::json!({
1341                            "type": "node_enter",
1342                            "node_id": data.node_id,
1343                            "resource_type": data.resource_type,
1344                            "timestamp": epoch_ms()
1345                        })
1346                        .to_string();
1347                        let _ = get_sender().send(msg);
1348
1349                        // Register for stall detection
1350                        if let Some(node_id) = &data.node_id {
1351                            let circuit_name = data.circuit.clone().unwrap_or_default();
1352                            stall::register_node(
1353                                format!("{:?}", id),
1354                                node_id.clone(),
1355                                circuit_name,
1356                            );
1357                        }
1358                    } else if name == "Circuit" {
1359                        if let Some(circuit) = &data.circuit {
1360                            if let Ok(mut registry) = get_trace_registry().lock() {
1361                                registry.register(circuit.clone());
1362                            }
1363                        }
1364                    }
1365                }
1366            }
1367        }
1368    }
1369
1370    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
1371        if let Some(span) = ctx.span(id) {
1372            let name = span.name();
1373            if name == "Node" || name == "Circuit" {
1374                let mut extensions = span.extensions_mut();
1375                if let Some(data) = extensions.get_mut::<SpanData>() {
1376                    // Store duration for use in on_close (outcome fields not yet recorded)
1377                    data.duration_ms =
1378                        data.entered_at.map(|t| t.elapsed().as_millis() as u64);
1379                }
1380            }
1381        }
1382    }
1383
1384    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
1385        if let Some(span) = ctx.span(&id) {
1386            let name = span.name();
1387            if name == "Node" {
1388                // Unregister from stall detector
1389                stall::unregister_node(&format!("{:?}", id));
1390
1391                let extensions = span.extensions();
1392                if let Some(data) = extensions.get::<SpanData>() {
1393                    let duration = data.duration_ms.unwrap_or(0);
1394                    let is_error = data.outcome_kind.as_deref() == Some("Fault");
1395
1396                    let msg = serde_json::json!({
1397                        "type": "node_exit",
1398                        "node_id": data.node_id,
1399                        "resource_type": data.resource_type,
1400                        "outcome_type": data.outcome_kind,
1401                        "outcome_target": data.outcome_target,
1402                        "duration_ms": duration,
1403                        "timestamp": epoch_ms()
1404                    })
1405                    .to_string();
1406                    let _ = get_sender().send(msg);
1407
1408                    // Record metrics — resolve parent circuit name
1409                    let circuit_name = span
1410                        .parent()
1411                        .and_then(|p| {
1412                            p.extensions()
1413                                .get::<SpanData>()
1414                                .and_then(|d| d.circuit.clone())
1415                        });
1416                    if let Some(node_id) = &data.node_id {
1417                        metrics::record_global_node_exit(
1418                            circuit_name.as_deref().unwrap_or("default"),
1419                            node_id,
1420                            duration,
1421                            is_error,
1422                        );
1423                    }
1424
1425                    // Record event in ring buffer
1426                    payload::record_event(payload::CapturedEvent {
1427                        timestamp: epoch_ms(),
1428                        event_type: "node_exit".to_string(),
1429                        node_id: data.node_id.clone(),
1430                        circuit: circuit_name,
1431                        duration_ms: Some(duration),
1432                        outcome_type: data.outcome_kind.clone(),
1433                        payload_hash: None,
1434                        payload_json: None,
1435                    });
1436                }
1437            } else if name == "Circuit" {
1438                let extensions = span.extensions();
1439                if let Some(data) = extensions.get::<SpanData>() {
1440                    let msg = serde_json::json!({
1441                        "type": "circuit_exit",
1442                        "circuit": data.circuit,
1443                        "outcome_type": data.outcome_kind,
1444                        "outcome_target": data.outcome_target,
1445                        "duration_ms": data.duration_ms.unwrap_or(0),
1446                        "timestamp": epoch_ms()
1447                    })
1448                    .to_string();
1449                    let _ = get_sender().send(msg);
1450
1451                    // Complete trace in registry
1452                    if let Some(circuit) = &data.circuit {
1453                        if let Ok(mut registry) = get_trace_registry().lock() {
1454                            registry.complete(
1455                                circuit,
1456                                data.outcome_kind.clone(),
1457                                data.duration_ms,
1458                            );
1459                        }
1460                    }
1461
1462                    // Record circuit event
1463                    payload::record_event(payload::CapturedEvent {
1464                        timestamp: epoch_ms(),
1465                        event_type: "circuit_exit".to_string(),
1466                        node_id: None,
1467                        circuit: data.circuit.clone(),
1468                        duration_ms: data.duration_ms,
1469                        outcome_type: data.outcome_kind.clone(),
1470                        payload_hash: None,
1471                        payload_json: None,
1472                    });
1473                }
1474            }
1475        }
1476    }
1477}
1478
1479struct FieldVisitor<'a> {
1480    fields: &'a mut HashMap<String, String>,
1481}
1482
1483impl<'a> tracing::field::Visit for FieldVisitor<'a> {
1484    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1485        self.fields
1486            .insert(field.name().to_string(), format!("{:?}", value));
1487    }
1488
1489    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1490        self.fields
1491            .insert(field.name().to_string(), value.to_string());
1492    }
1493}
1494
1495async fn get_schematic(
1496    headers: HeaderMap,
1497    State(state): State<InspectorState>,
1498) -> Result<Json<Schematic>, (StatusCode, Json<Value>)> {
1499    ensure_public_access(&headers, &state.auth_policy)?;
1500    let schematic = state.schematic.lock().unwrap();
1501    Ok(Json(schematic.clone()))
1502}
1503
1504async fn get_public_projection(
1505    headers: HeaderMap,
1506    State(state): State<InspectorState>,
1507) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1508    ensure_public_access(&headers, &state.auth_policy)?;
1509    if let Some(path) = &state.public_projection_path
1510        && let Ok(v) = read_projection_file(path)
1511    {
1512        return Ok(Json(apply_projection_redaction(
1513            v,
1514            ProjectionSurface::Public,
1515            &state.redaction_policy,
1516        )));
1517    }
1518
1519    let projection = state
1520        .public_projection
1521        .lock()
1522        .ok()
1523        .and_then(|v| v.clone())
1524        .unwrap_or(Value::Null);
1525    Ok(Json(apply_projection_redaction(
1526        projection,
1527        ProjectionSurface::Public,
1528        &state.redaction_policy,
1529    )))
1530}
1531
1532async fn get_internal_projection(
1533    headers: HeaderMap,
1534    State(state): State<InspectorState>,
1535) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1536    ensure_internal_access(&headers, &state.auth_policy)?;
1537    if let Some(path) = &state.internal_projection_path
1538        && let Ok(v) = read_projection_file(path)
1539    {
1540        return Ok(Json(apply_projection_redaction(
1541            v,
1542            ProjectionSurface::Internal,
1543            &state.redaction_policy,
1544        )));
1545    }
1546
1547    let projection = state
1548        .internal_projection
1549        .lock()
1550        .ok()
1551        .and_then(|v| v.clone())
1552        .unwrap_or(Value::Null);
1553    Ok(Json(apply_projection_redaction(
1554        projection,
1555        ProjectionSurface::Internal,
1556        &state.redaction_policy,
1557    )))
1558}
1559
1560fn inspector_envelope(kind: &'static str, data: Value) -> Json<Value> {
1561    Json(serde_json::json!({
1562        "api_version": INSPECTOR_API_VERSION,
1563        "kind": kind,
1564        "data": data
1565    }))
1566}
1567
1568fn load_internal_projection_value(state: &InspectorState) -> Value {
1569    if let Some(path) = &state.internal_projection_path
1570        && let Ok(v) = read_projection_file(path)
1571    {
1572        return v;
1573    }
1574    state
1575        .internal_projection
1576        .lock()
1577        .ok()
1578        .and_then(|v| v.clone())
1579        .unwrap_or(Value::Null)
1580}
1581
1582fn latest_trace_from_projection(projection: &Value) -> Option<Value> {
1583    match projection {
1584        Value::Object(map) => {
1585            if let Some(Value::Array(traces)) = map.get("traces") {
1586                return traces.last().cloned();
1587            }
1588            if map.get("trace_id").is_some() {
1589                return Some(projection.clone());
1590            }
1591            None
1592        }
1593        Value::Array(traces) => traces.last().cloned(),
1594        _ => None,
1595    }
1596}
1597
1598fn find_trace_by_request_id(projection: &Value, request_id: &str) -> Option<Value> {
1599    if request_id.eq_ignore_ascii_case("latest") {
1600        return latest_trace_from_projection(projection);
1601    }
1602
1603    match projection {
1604        Value::Object(map) => {
1605            if map.get("trace_id").and_then(Value::as_str) == Some(request_id) {
1606                return Some(projection.clone());
1607            }
1608            if let Some(Value::Array(traces)) = map.get("traces") {
1609                return traces
1610                    .iter()
1611                    .find(|trace| trace.get("trace_id").and_then(Value::as_str) == Some(request_id))
1612                    .cloned();
1613            }
1614            None
1615        }
1616        Value::Array(traces) => traces
1617            .iter()
1618            .find(|trace| trace.get("trace_id").and_then(Value::as_str) == Some(request_id))
1619            .cloned(),
1620        _ => None,
1621    }
1622}
1623
1624async fn get_inspector_circuits(
1625    headers: HeaderMap,
1626    State(state): State<InspectorState>,
1627) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1628    ensure_internal_access(&headers, &state.auth_policy)?;
1629    let schematic = state.schematic.lock().unwrap();
1630    let transition_count = schematic
1631        .nodes
1632        .iter()
1633        .filter(|node| matches!(&node.kind, NodeKind::Atom))
1634        .count();
1635    let has_capability_rules = schematic.nodes.iter().any(|node| {
1636        node.bus_capability
1637            .as_ref()
1638            .map(|policy| !policy.allow.is_empty() || !policy.deny.is_empty())
1639            .unwrap_or(false)
1640    });
1641
1642    Ok(inspector_envelope(
1643        "inspector.circuits.v1",
1644        serde_json::json!({
1645            "count": 1,
1646            "items": [
1647                {
1648                    "id": schematic.id,
1649                    "name": schematic.name,
1650                    "schema_version": schematic.schema_version,
1651                    "node_count": schematic.nodes.len(),
1652                    "edge_count": schematic.edges.len(),
1653                    "transition_count": transition_count,
1654                    "has_bus_capability_rules": has_capability_rules
1655                }
1656            ]
1657        }),
1658    ))
1659}
1660
1661async fn get_inspector_circuit_by_name(
1662    headers: HeaderMap,
1663    AxPath(name): AxPath<String>,
1664    State(state): State<InspectorState>,
1665) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1666    ensure_internal_access(&headers, &state.auth_policy)?;
1667    let schematic = state.schematic.lock().unwrap().clone();
1668    if name != schematic.name && name != schematic.id {
1669        return Err(policy_error(StatusCode::NOT_FOUND, "circuit_not_found"));
1670    }
1671
1672    let public_projection_loaded = state
1673        .public_projection
1674        .lock()
1675        .ok()
1676        .map(|slot| slot.is_some())
1677        .unwrap_or(false);
1678    let internal_projection_loaded = state
1679        .internal_projection
1680        .lock()
1681        .ok()
1682        .map(|slot| slot.is_some())
1683        .unwrap_or(false);
1684
1685    Ok(inspector_envelope(
1686        "inspector.circuit.v1",
1687        serde_json::json!({
1688            "circuit": {
1689                "id": schematic.id,
1690                "name": schematic.name,
1691                "schema_version": schematic.schema_version,
1692                "node_count": schematic.nodes.len(),
1693                "edge_count": schematic.edges.len(),
1694            },
1695            "runtime_state": {
1696                "public_projection_loaded": public_projection_loaded,
1697                "internal_projection_loaded": internal_projection_loaded,
1698                "public_projection_path": state.public_projection_path,
1699                "internal_projection_path": state.internal_projection_path,
1700            },
1701            "schematic": schematic
1702        }),
1703    ))
1704}
1705
1706async fn get_inspector_bus(
1707    headers: HeaderMap,
1708    State(state): State<InspectorState>,
1709) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1710    ensure_internal_access(&headers, &state.auth_policy)?;
1711    let schematic = state.schematic.lock().unwrap();
1712
1713    let mut resource_types = HashSet::new();
1714    let mut transition_capabilities = Vec::new();
1715
1716    for node in &schematic.nodes {
1717        if !node.resource_type.trim().is_empty() && node.resource_type != "()" {
1718            resource_types.insert(node.resource_type.clone());
1719        }
1720        if !matches!(&node.kind, NodeKind::Atom) {
1721            continue;
1722        }
1723
1724        let mut allow = node
1725            .bus_capability
1726            .as_ref()
1727            .map(|policy| policy.allow.clone())
1728            .unwrap_or_default();
1729        let mut deny = node
1730            .bus_capability
1731            .as_ref()
1732            .map(|policy| policy.deny.clone())
1733            .unwrap_or_default();
1734        allow.sort();
1735        deny.sort();
1736        let access = if allow.is_empty() && deny.is_empty() {
1737            "unrestricted"
1738        } else {
1739            "restricted"
1740        };
1741
1742        transition_capabilities.push(serde_json::json!({
1743            "transition": node.label,
1744            "resource_type": node.resource_type,
1745            "access": access,
1746            "allow": allow,
1747            "deny": deny
1748        }));
1749    }
1750
1751    let mut resources = resource_types.into_iter().collect::<Vec<_>>();
1752    resources.sort();
1753
1754    Ok(inspector_envelope(
1755        "inspector.bus.v1",
1756        serde_json::json!({
1757            "resource_types": resources,
1758            "transition_capabilities": transition_capabilities
1759        }),
1760    ))
1761}
1762
1763async fn get_inspector_timeline_by_request_id(
1764    headers: HeaderMap,
1765    AxPath(request_id): AxPath<String>,
1766    State(state): State<InspectorState>,
1767) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1768    ensure_internal_access(&headers, &state.auth_policy)?;
1769    let projection = load_internal_projection_value(&state);
1770    let trace = find_trace_by_request_id(&projection, &request_id)
1771        .ok_or_else(|| policy_error(StatusCode::NOT_FOUND, "timeline_request_not_found"))?;
1772
1773    Ok(inspector_envelope(
1774        "inspector.timeline.v1",
1775        serde_json::json!({
1776            "request_id": request_id,
1777            "trace": trace
1778        }),
1779    ))
1780}
1781
1782async fn ws_handler(
1783    headers: HeaderMap,
1784    ws: WebSocketUpgrade,
1785    State(state): State<InspectorState>,
1786) -> impl IntoResponse {
1787    if let Err(err) = ensure_internal_access(&headers, &state.auth_policy) {
1788        return err.into_response();
1789    }
1790    ws.on_upgrade(handle_socket)
1791}
1792
1793async fn get_quick_view_html() -> impl IntoResponse {
1794    (
1795        [(header::CONTENT_TYPE, "text/html; charset=utf-8")],
1796        QUICK_VIEW_HTML,
1797    )
1798}
1799
1800async fn get_quick_view_js() -> impl IntoResponse {
1801    (
1802        [(
1803            header::CONTENT_TYPE,
1804            "application/javascript; charset=utf-8",
1805        )],
1806        QUICK_VIEW_JS,
1807    )
1808}
1809
1810async fn get_quick_view_css() -> impl IntoResponse {
1811    (
1812        [(header::CONTENT_TYPE, "text/css; charset=utf-8")],
1813        QUICK_VIEW_CSS,
1814    )
1815}
1816
1817async fn handle_socket(mut socket: WebSocket) {
1818    let mut rx = get_sender().subscribe();
1819
1820    while let Ok(msg) = rx.recv().await {
1821        if socket.send(Message::Text(msg)).await.is_err() {
1822            break;
1823        }
1824    }
1825}
1826
1827fn env_flag(name: &str, default: bool) -> bool {
1828    match std::env::var(name) {
1829        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1830        Err(_) => default,
1831    }
1832}
1833
1834fn parse_role(headers: &HeaderMap) -> Result<AccessRole, &'static str> {
1835    let raw = headers
1836        .get("x-ranvier-role")
1837        .ok_or("missing_x_ranvier_role")?
1838        .to_str()
1839        .map_err(|_| "invalid_x_ranvier_role")?
1840        .trim()
1841        .to_ascii_lowercase();
1842
1843    match raw.as_str() {
1844        "viewer" => Ok(AccessRole::Viewer),
1845        "operator" => Ok(AccessRole::Operator),
1846        "admin" => Ok(AccessRole::Admin),
1847        _ => Err("invalid_x_ranvier_role"),
1848    }
1849}
1850
1851fn has_tenant(headers: &HeaderMap) -> bool {
1852    headers
1853        .get("x-ranvier-tenant")
1854        .and_then(|v| v.to_str().ok())
1855        .map(|v| !v.trim().is_empty())
1856        .unwrap_or(false)
1857}
1858
1859fn policy_error(code: StatusCode, message: &'static str) -> (StatusCode, Json<Value>) {
1860    (
1861        code,
1862        Json(serde_json::json!({
1863            "error": message
1864        })),
1865    )
1866}
1867
1868fn ensure_public_access(
1869    headers: &HeaderMap,
1870    policy: &AuthPolicy,
1871) -> Result<(), (StatusCode, Json<Value>)> {
1872    if !policy.enforce_headers {
1873        return Ok(());
1874    }
1875    parse_role(headers)
1876        .map(|_| ())
1877        .map_err(|e| policy_error(StatusCode::UNAUTHORIZED, e))
1878}
1879
1880fn ensure_internal_access(
1881    headers: &HeaderMap,
1882    policy: &AuthPolicy,
1883) -> Result<(), (StatusCode, Json<Value>)> {
1884    if !policy.enforce_headers {
1885        return Ok(());
1886    }
1887
1888    let role = parse_role(headers).map_err(|e| policy_error(StatusCode::UNAUTHORIZED, e))?;
1889    if role == AccessRole::Viewer {
1890        return Err(policy_error(
1891            StatusCode::FORBIDDEN,
1892            "role_forbidden_for_internal_endpoint",
1893        ));
1894    }
1895    if policy.require_tenant_for_internal && !has_tenant(headers) {
1896        return Err(policy_error(
1897            StatusCode::FORBIDDEN,
1898            "missing_x_ranvier_tenant",
1899        ));
1900    }
1901    Ok(())
1902}
1903
1904// --- M201: New API endpoints ---
1905
1906async fn api_get_routes(
1907    headers: HeaderMap,
1908    State(state): State<InspectorState>,
1909) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1910    ensure_internal_access(&headers, &state.auth_policy)?;
1911
1912    let registered = routes::list_routes();
1913
1914    // Also enrich from schematic node schemas
1915    let schematic = state.schematic.lock().unwrap();
1916    let mut route_list: Vec<Value> = registered
1917        .iter()
1918        .map(|r| {
1919            // Find matching node in schematic by label
1920            let node_schemas = schematic.nodes.iter().find(|n| {
1921                Some(r.circuit_name.as_deref().unwrap_or(""))
1922                    == Some(n.label.as_str())
1923            });
1924
1925            serde_json::json!({
1926                "method": r.method,
1927                "path": r.path,
1928                "circuit_name": r.circuit_name,
1929                "input_schema": r.input_schema.clone().or_else(|| node_schemas.and_then(|n| n.input_schema.clone())),
1930                "output_schema": r.output_schema.clone().or_else(|| node_schemas.and_then(|n| n.output_schema.clone())),
1931            })
1932        })
1933        .collect();
1934
1935    // If no routes registered, build from schematic nodes
1936    if route_list.is_empty() {
1937        route_list = schematic
1938            .nodes
1939            .iter()
1940            .filter(|n| n.input_schema.is_some() || n.output_schema.is_some())
1941            .map(|n| {
1942                serde_json::json!({
1943                    "circuit_name": n.label,
1944                    "input_type": n.input_type,
1945                    "output_type": n.output_type,
1946                    "input_schema": n.input_schema,
1947                    "output_schema": n.output_schema,
1948                })
1949            })
1950            .collect();
1951    }
1952
1953    Ok(inspector_envelope(
1954        "inspector.routes.v1",
1955        serde_json::json!({
1956            "count": route_list.len(),
1957            "routes": route_list
1958        }),
1959    ))
1960}
1961
1962async fn api_post_routes_schema(
1963    headers: HeaderMap,
1964    State(state): State<InspectorState>,
1965    Json(body): Json<routes::SchemaLookupRequest>,
1966) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1967    ensure_internal_access(&headers, &state.auth_policy)?;
1968
1969    // Look up from route registry first
1970    if let Some(route) = routes::find_route(&body.method, &body.path) {
1971        if let Some(schema) = route.input_schema {
1972            return Ok(inspector_envelope(
1973                "inspector.route_schema.v1",
1974                serde_json::json!({
1975                    "method": body.method,
1976                    "path": body.path,
1977                    "schema": schema
1978                }),
1979            ));
1980        }
1981    }
1982
1983    // Fallback: look in schematic nodes
1984    let schematic = state.schematic.lock().unwrap();
1985    for node in &schematic.nodes {
1986        if let Some(schema) = &node.input_schema {
1987            return Ok(inspector_envelope(
1988                "inspector.route_schema.v1",
1989                serde_json::json!({
1990                    "method": body.method,
1991                    "path": body.path,
1992                    "schema": schema
1993                }),
1994            ));
1995        }
1996    }
1997
1998    Err(policy_error(StatusCode::NOT_FOUND, "schema_not_found"))
1999}
2000
2001async fn api_post_routes_sample(
2002    headers: HeaderMap,
2003    State(state): State<InspectorState>,
2004    Json(body): Json<routes::SampleRequest>,
2005) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
2006    ensure_internal_access(&headers, &state.auth_policy)?;
2007
2008    // Find the schema for this route
2009    let schema_val = {
2010        if let Some(route) = routes::find_route(&body.method, &body.path) {
2011            route.input_schema
2012        } else {
2013            // Fallback: look in schematic nodes
2014            let schematic = state.schematic.lock().unwrap();
2015            schematic
2016                .nodes
2017                .iter()
2018                .find_map(|n| n.input_schema.clone())
2019        }
2020    };
2021
2022    let Some(schema_val) = schema_val else {
2023        return Err(policy_error(StatusCode::NOT_FOUND, "schema_not_found"));
2024    };
2025
2026    let sample = match body.mode.as_str() {
2027        "random" => schema::generate_sample(&schema_val),
2028        _ => schema::generate_template(&schema_val),
2029    };
2030
2031    Ok(inspector_envelope(
2032        "inspector.route_sample.v1",
2033        serde_json::json!({
2034            "method": body.method,
2035            "path": body.path,
2036            "mode": body.mode,
2037            "sample": sample
2038        }),
2039    ))
2040}
2041
2042async fn api_post_relay(
2043    headers: HeaderMap,
2044    State(state): State<InspectorState>,
2045    Json(body): Json<relay::RelayRequest>,
2046) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
2047    ensure_internal_access(&headers, &state.auth_policy)?;
2048
2049    // Guard: dev mode only
2050    let mode = InspectorMode::from_env();
2051    if mode != InspectorMode::Dev {
2052        return Err(policy_error(
2053            StatusCode::FORBIDDEN,
2054            "relay_only_in_dev_mode",
2055        ));
2056    }
2057
2058    let Some(relay_state) = &state.relay_state else {
2059        return Err((
2060            StatusCode::SERVICE_UNAVAILABLE,
2061            Json(serde_json::json!({
2062                "error": "relay_not_configured",
2063                "message": "No relay target configured. Use Inspector::with_relay_target() to enable."
2064            })),
2065        ));
2066    };
2067
2068    match relay::execute_relay(relay_state, body).await {
2069        Ok(response) => Ok(inspector_envelope(
2070            "inspector.relay.v1",
2071            serde_json::to_value(&response).unwrap_or(Value::Null),
2072        )),
2073        Err(err) => Err((
2074            StatusCode::BAD_GATEWAY,
2075            Json(serde_json::to_value(&err).unwrap_or(Value::Null)),
2076        )),
2077    }
2078}
2079
2080async fn api_get_stored_traces(
2081    headers: HeaderMap,
2082    State(state): State<InspectorState>,
2083) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
2084    ensure_internal_access(&headers, &state.auth_policy)?;
2085    state.bearer_auth.validate(&headers)?;
2086
2087    let Some(store) = &state.trace_store else {
2088        return Ok(inspector_envelope(
2089            "inspector.traces_stored.v1",
2090            serde_json::json!({
2091                "total": 0,
2092                "traces": [],
2093                "note": "No trace store configured"
2094            }),
2095        ));
2096    };
2097
2098    let query = trace_store::TraceQuery::default();
2099    match store.query(query).await {
2100        Ok(traces) => Ok(inspector_envelope(
2101            "inspector.traces_stored.v1",
2102            serde_json::json!({
2103                "total": traces.len(),
2104                "traces": traces
2105            }),
2106        )),
2107        Err(e) => Err((
2108            StatusCode::INTERNAL_SERVER_ERROR,
2109            Json(serde_json::json!({ "error": "trace_store_error", "message": e })),
2110        )),
2111    }
2112}
2113
2114#[cfg(test)]
2115mod tests {
2116    use super::*;
2117    use ranvier_core::schematic::Schematic;
2118    use std::time::Duration;
2119
2120    fn reserve_listener() -> (u16, tokio::net::TcpListener) {
2121        let std_listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
2122        std_listener.set_nonblocking(true).expect("set nonblocking");
2123        let port = std_listener.local_addr().expect("local addr").port();
2124        let listener =
2125            tokio::net::TcpListener::from_std(std_listener).expect("tokio listener conversion");
2126        (port, listener)
2127    }
2128
2129    async fn wait_ready(port: u16) {
2130        let client = reqwest::Client::new();
2131        for _ in 0..30 {
2132            if client
2133                .get(format!("http://127.0.0.1:{port}/schematic"))
2134                .send()
2135                .await
2136                .map(|_| true)
2137                .unwrap_or(false)
2138            {
2139                return;
2140            }
2141            tokio::time::sleep(Duration::from_millis(100)).await;
2142        }
2143        panic!("inspector server did not become ready");
2144    }
2145
2146    #[test]
2147    fn redaction_defaults_public_and_internal_surfaces() {
2148        let policy = TelemetryRedactionPolicy::default();
2149        let src = serde_json::json!({
2150            "email": "user@example.com",
2151            "summary": { "ok": true }
2152        });
2153
2154        let public = apply_projection_redaction(src.clone(), ProjectionSurface::Public, &policy);
2155        let internal = apply_projection_redaction(src, ProjectionSurface::Internal, &policy);
2156
2157        assert_eq!(public["email"], "[REDACTED]");
2158        assert_eq!(public["summary"]["ok"], true);
2159        assert_eq!(internal["email"], "user@example.com");
2160    }
2161
2162    #[test]
2163    fn strict_mode_filters_attribute_bag_by_allowlist() {
2164        let policy = TelemetryRedactionPolicy {
2165            mode_override: Some(RedactionMode::Strict),
2166            allow_keys: {
2167                let mut keys = std::collections::HashSet::new();
2168                keys.insert("ranvier.circuit".to_string());
2169                keys
2170            },
2171            ..Default::default()
2172        };
2173
2174        let src = serde_json::json!({
2175            "attributes": {
2176                "ranvier.circuit": "CheckoutCircuit",
2177                "customer_id": "u-123",
2178                "api_key": "secret-key"
2179            }
2180        });
2181
2182        let out = apply_projection_redaction(src, ProjectionSurface::Internal, &policy);
2183        assert_eq!(out["attributes"]["ranvier.circuit"], "CheckoutCircuit");
2184        assert_eq!(out["attributes"]["api_key"], "[REDACTED]");
2185        assert!(out["attributes"].get("customer_id").is_none());
2186    }
2187
2188    #[test]
2189    fn custom_sensitive_patterns_are_applied() {
2190        let mut policy = TelemetryRedactionPolicy::default();
2191        policy.sensitive_patterns.push("tenant_id".to_string());
2192
2193        let src = serde_json::json!({
2194            "tenant_id": "team-a",
2195            "trace_id": "abc123"
2196        });
2197
2198        let out = apply_projection_redaction(src, ProjectionSurface::Public, &policy);
2199        assert_eq!(out["tenant_id"], "[REDACTED]");
2200        assert_eq!(out["trace_id"], "abc123");
2201    }
2202
2203    #[tokio::test]
2204    async fn dev_mode_exposes_quick_view_and_internal_routes() {
2205        let (port, listener) = reserve_listener();
2206        let inspector = Inspector::new(Schematic::new("dev-test"), port).with_mode("dev");
2207        let handle = tokio::spawn(async move {
2208            let _ = inspector.serve_with_listener(listener).await;
2209        });
2210        wait_ready(port).await;
2211
2212        let client = reqwest::Client::new();
2213        let quick = client
2214            .get(format!("http://127.0.0.1:{port}/quick-view"))
2215            .send()
2216            .await
2217            .expect("quick-view request");
2218        let internal = client
2219            .get(format!("http://127.0.0.1:{port}/trace/internal"))
2220            .send()
2221            .await
2222            .expect("internal request");
2223        let events = client
2224            .get(format!("http://127.0.0.1:{port}/events"))
2225            .send()
2226            .await
2227            .expect("events request");
2228        let circuits = client
2229            .get(format!("http://127.0.0.1:{port}/inspector/circuits"))
2230            .send()
2231            .await
2232            .expect("circuits request");
2233        let bus = client
2234            .get(format!("http://127.0.0.1:{port}/inspector/bus"))
2235            .send()
2236            .await
2237            .expect("bus request");
2238        let timeline = client
2239            .get(format!(
2240                "http://127.0.0.1:{port}/inspector/timeline/bootstrap"
2241            ))
2242            .send()
2243            .await
2244            .expect("timeline request");
2245
2246        assert_eq!(quick.status(), reqwest::StatusCode::OK);
2247        assert_eq!(internal.status(), reqwest::StatusCode::OK);
2248        assert_ne!(events.status(), reqwest::StatusCode::NOT_FOUND);
2249        assert_eq!(circuits.status(), reqwest::StatusCode::OK);
2250        assert_eq!(bus.status(), reqwest::StatusCode::OK);
2251        assert_eq!(timeline.status(), reqwest::StatusCode::OK);
2252        let circuits_json: Value =
2253            serde_json::from_str(&circuits.text().await.expect("circuits text"))
2254                .expect("circuits json");
2255        let bus_json: Value =
2256            serde_json::from_str(&bus.text().await.expect("bus text")).expect("bus json");
2257        let timeline_json: Value =
2258            serde_json::from_str(&timeline.text().await.expect("timeline text"))
2259                .expect("timeline json");
2260        assert_eq!(circuits_json["kind"], "inspector.circuits.v1");
2261        assert_eq!(bus_json["kind"], "inspector.bus.v1");
2262        assert_eq!(timeline_json["kind"], "inspector.timeline.v1");
2263
2264        handle.abort();
2265    }
2266
2267    #[tokio::test]
2268    async fn prod_mode_hides_quick_view_and_internal_routes() {
2269        let (port, listener) = reserve_listener();
2270        let inspector = Inspector::new(Schematic::new("prod-test"), port).with_mode("prod");
2271        let handle = tokio::spawn(async move {
2272            let _ = inspector.serve_with_listener(listener).await;
2273        });
2274        wait_ready(port).await;
2275
2276        let client = reqwest::Client::new();
2277        let quick = client
2278            .get(format!("http://127.0.0.1:{port}/quick-view"))
2279            .send()
2280            .await
2281            .expect("quick-view request");
2282        let internal = client
2283            .get(format!("http://127.0.0.1:{port}/trace/internal"))
2284            .send()
2285            .await
2286            .expect("internal request");
2287        let events = client
2288            .get(format!("http://127.0.0.1:{port}/events"))
2289            .send()
2290            .await
2291            .expect("events request");
2292        let circuits = client
2293            .get(format!("http://127.0.0.1:{port}/inspector/circuits"))
2294            .send()
2295            .await
2296            .expect("circuits request");
2297        let public = client
2298            .get(format!("http://127.0.0.1:{port}/trace/public"))
2299            .send()
2300            .await
2301            .expect("public request");
2302
2303        assert_eq!(quick.status(), reqwest::StatusCode::NOT_FOUND);
2304        assert_eq!(internal.status(), reqwest::StatusCode::NOT_FOUND);
2305        assert_eq!(events.status(), reqwest::StatusCode::NOT_FOUND);
2306        assert_eq!(circuits.status(), reqwest::StatusCode::NOT_FOUND);
2307        assert_eq!(public.status(), reqwest::StatusCode::OK);
2308
2309        handle.abort();
2310    }
2311
2312    #[tokio::test]
2313    async fn timeline_endpoint_returns_not_found_for_unknown_request() {
2314        let (port, listener) = reserve_listener();
2315        let inspector = Inspector::new(Schematic::new("timeline-test"), port).with_mode("dev");
2316        let handle = tokio::spawn(async move {
2317            let _ = inspector.serve_with_listener(listener).await;
2318        });
2319        wait_ready(port).await;
2320
2321        let client = reqwest::Client::new();
2322        let timeline = client
2323            .get(format!(
2324                "http://127.0.0.1:{port}/inspector/timeline/unknown-request"
2325            ))
2326            .send()
2327            .await
2328            .expect("timeline request");
2329        assert_eq!(timeline.status(), reqwest::StatusCode::NOT_FOUND);
2330
2331        handle.abort();
2332    }
2333
2334    #[tokio::test]
2335    async fn auth_enforcement_rejects_missing_role_header() {
2336        let (port, listener) = reserve_listener();
2337        let inspector = Inspector::new(Schematic::new("auth-public"), port)
2338            .with_mode("dev")
2339            .with_auth_enforcement(true);
2340        let handle = tokio::spawn(async move {
2341            let _ = inspector.serve_with_listener(listener).await;
2342        });
2343        wait_ready(port).await;
2344
2345        let client = reqwest::Client::new();
2346        let schematic = client
2347            .get(format!("http://127.0.0.1:{port}/schematic"))
2348            .send()
2349            .await
2350            .expect("schematic request");
2351        assert_eq!(schematic.status(), reqwest::StatusCode::UNAUTHORIZED);
2352
2353        handle.abort();
2354    }
2355
2356    #[tokio::test]
2357    async fn auth_enforcement_blocks_viewer_internal_and_requires_tenant() {
2358        let (port, listener) = reserve_listener();
2359        let inspector = Inspector::new(Schematic::new("auth-internal"), port)
2360            .with_mode("dev")
2361            .with_auth_enforcement(true)
2362            .with_require_tenant_for_internal(true);
2363        let handle = tokio::spawn(async move {
2364            let _ = inspector.serve_with_listener(listener).await;
2365        });
2366        wait_ready(port).await;
2367
2368        let client = reqwest::Client::new();
2369
2370        let viewer_internal = client
2371            .get(format!("http://127.0.0.1:{port}/trace/internal"))
2372            .header("X-Ranvier-Role", "viewer")
2373            .send()
2374            .await
2375            .expect("viewer internal request");
2376        assert_eq!(viewer_internal.status(), reqwest::StatusCode::FORBIDDEN);
2377
2378        let operator_no_tenant = client
2379            .get(format!("http://127.0.0.1:{port}/trace/internal"))
2380            .header("X-Ranvier-Role", "operator")
2381            .send()
2382            .await
2383            .expect("operator internal request");
2384        assert_eq!(operator_no_tenant.status(), reqwest::StatusCode::FORBIDDEN);
2385
2386        let operator_with_tenant = client
2387            .get(format!("http://127.0.0.1:{port}/trace/internal"))
2388            .header("X-Ranvier-Role", "operator")
2389            .header("X-Ranvier-Tenant", "team-a")
2390            .send()
2391            .await
2392            .expect("operator internal with tenant request");
2393        assert_eq!(operator_with_tenant.status(), reqwest::StatusCode::OK);
2394
2395        handle.abort();
2396    }
2397}