1pub mod alert;
2pub mod auth;
3pub mod breakpoint;
4pub mod metrics;
5pub mod payload;
6pub mod prometheus;
7pub mod relay;
8pub mod routes;
9pub mod schema;
10pub mod stall;
11pub mod trace_store;
12
13use async_trait::async_trait;
14use axum::{
15 Json, Router,
16 extract::{
17 Path as AxPath, State,
18 ws::{Message, WebSocket, WebSocketUpgrade},
19 },
20 http::{HeaderMap, StatusCode, header},
21 response::IntoResponse,
22 routing::get,
23};
24use ranvier_core::event::DlqReader;
25use ranvier_core::prelude::DebugControl;
26use ranvier_core::schematic::{NodeKind, Schematic};
27use serde::Deserialize;
28use serde_json::Value;
29use std::collections::{HashMap, HashSet};
30use std::fs;
31use std::net::SocketAddr;
32use std::sync::{Arc, Mutex, OnceLock};
33use std::time::{Instant, SystemTime, UNIX_EPOCH};
34
35#[async_trait]
36pub trait StateInspector: Send + Sync {
37 async fn get_state(&self, trace_id: &str) -> Option<Value>;
38 async fn force_resume(
39 &self,
40 trace_id: &str,
41 target_node: &str,
42 payload_override: Option<Value>,
43 ) -> Result<(), String>;
44}
45use tokio::sync::broadcast;
46use tower_http::cors::CorsLayer;
47use tracing::{Event, Id, Subscriber};
48use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan};
49
50static EVENT_CHANNEL: OnceLock<broadcast::Sender<String>> = OnceLock::new();
51static TRACE_REGISTRY: OnceLock<Arc<Mutex<ActiveTraceRegistry>>> = OnceLock::new();
52static PAYLOAD_POLICY: OnceLock<payload::PayloadCapturePolicy> = OnceLock::new();
53const QUICK_VIEW_HTML: &str = include_str!("quick_view/index.html");
54const QUICK_VIEW_JS: &str = include_str!("quick_view/app.js");
55const QUICK_VIEW_CSS: &str = include_str!("quick_view/styles.css");
56const INSPECTOR_API_VERSION: &str = "1.0";
57
58#[derive(Clone, Debug, serde::Serialize)]
59pub struct TraceRecord {
60 pub trace_id: String,
61 pub circuit: String,
62 pub status: TraceStatus,
63 pub started_at: u64,
64 pub finished_at: Option<u64>,
65 pub duration_ms: Option<u64>,
66 pub outcome_type: Option<String>,
67}
68
69#[derive(Clone, Debug, serde::Serialize, Eq, PartialEq)]
70#[serde(rename_all = "lowercase")]
71pub enum TraceStatus {
72 Active,
73 Completed,
74 Faulted,
75}
76
77pub struct ActiveTraceRegistry {
78 active: HashMap<String, TraceRecord>,
79 recent: Vec<TraceRecord>,
80 max_recent: usize,
81}
82
83impl ActiveTraceRegistry {
84 fn new() -> Self {
85 Self {
86 active: HashMap::new(),
87 recent: Vec::new(),
88 max_recent: 100,
89 }
90 }
91
92 fn register(&mut self, circuit: String) {
93 let trace_id = format!(
94 "{}-{}",
95 circuit.replace(' ', "_").to_lowercase(),
96 epoch_ms()
97 );
98 self.active.insert(
99 trace_id.clone(),
100 TraceRecord {
101 trace_id,
102 circuit,
103 status: TraceStatus::Active,
104 started_at: epoch_ms(),
105 finished_at: None,
106 duration_ms: None,
107 outcome_type: None,
108 },
109 );
110 }
111
112 fn complete(
113 &mut self,
114 circuit: &str,
115 outcome_type: Option<String>,
116 duration_ms: Option<u64>,
117 ) {
118 let key = self
120 .active
121 .iter()
122 .filter(|(_, r)| r.circuit == circuit)
123 .max_by_key(|(_, r)| r.started_at)
124 .map(|(k, _)| k.clone());
125
126 if let Some(key) = key {
127 if let Some(mut record) = self.active.remove(&key) {
128 record.finished_at = Some(epoch_ms());
129 record.duration_ms = duration_ms;
130 record.outcome_type = outcome_type.clone();
131 record.status = if outcome_type.as_deref() == Some("Fault") {
132 TraceStatus::Faulted
133 } else {
134 TraceStatus::Completed
135 };
136 self.recent.push(record);
137 if self.recent.len() > self.max_recent {
138 self.recent.remove(0);
139 }
140 }
141 }
142 }
143
144 fn list_all(&self) -> Vec<TraceRecord> {
145 let mut result: Vec<TraceRecord> = self.active.values().cloned().collect();
146 result.extend(self.recent.iter().cloned());
147 result.sort_by(|a, b| b.started_at.cmp(&a.started_at));
148 result
149 }
150
151 pub fn active_count(&self) -> usize {
153 self.active.len()
154 }
155}
156
157pub fn get_trace_registry() -> Arc<Mutex<ActiveTraceRegistry>> {
158 TRACE_REGISTRY
159 .get_or_init(|| Arc::new(Mutex::new(ActiveTraceRegistry::new())))
160 .clone()
161}
162
163#[derive(Clone, Copy, Debug, Eq, PartialEq)]
164enum InspectorMode {
165 Dev,
166 Prod,
167}
168
169impl InspectorMode {
170 fn from_env() -> Self {
171 match std::env::var("RANVIER_MODE")
172 .unwrap_or_else(|_| "dev".to_string())
173 .to_ascii_lowercase()
174 .as_str()
175 {
176 "prod" | "production" => Self::Prod,
177 _ => Self::Dev,
178 }
179 }
180
181 fn from_str(mode: &str) -> Self {
182 match mode.to_ascii_lowercase().as_str() {
183 "prod" | "production" => Self::Prod,
184 _ => Self::Dev,
185 }
186 }
187}
188
189#[derive(Clone, Copy, Debug)]
190struct SurfacePolicy {
191 expose_internal: bool,
192 expose_events: bool,
193 expose_quick_view: bool,
194}
195
196impl SurfacePolicy {
197 fn for_mode(mode: InspectorMode) -> Self {
198 match mode {
199 InspectorMode::Dev => Self {
200 expose_internal: true,
201 expose_events: true,
202 expose_quick_view: true,
203 },
204 InspectorMode::Prod => Self {
205 expose_internal: false,
206 expose_events: false,
207 expose_quick_view: false,
208 },
209 }
210 }
211}
212
213#[derive(Clone, Copy, Debug, Eq, PartialEq)]
214enum AccessRole {
215 Viewer,
216 Operator,
217 Admin,
218}
219
220#[derive(Clone, Copy, Debug)]
221struct AuthPolicy {
222 enforce_headers: bool,
223 require_tenant_for_internal: bool,
224}
225
226impl AuthPolicy {
227 fn default() -> Self {
228 Self {
229 enforce_headers: false,
230 require_tenant_for_internal: false,
231 }
232 }
233
234 fn from_env() -> Self {
235 Self {
236 enforce_headers: env_flag("RANVIER_AUTH_ENFORCE", false),
237 require_tenant_for_internal: env_flag("RANVIER_AUTH_REQUIRE_TENANT_INTERNAL", false),
238 }
239 }
240}
241
242#[derive(Clone, Copy, Debug, Eq, PartialEq)]
243enum ProjectionSurface {
244 Public,
245 Internal,
246}
247
248#[derive(Clone, Copy, Debug, Eq, PartialEq)]
249enum RedactionMode {
250 Off,
251 Public,
252 Strict,
253}
254
255impl RedactionMode {
256 fn parse(raw: &str) -> Option<Self> {
257 match raw.trim().to_ascii_lowercase().as_str() {
258 "off" => Some(Self::Off),
259 "public" => Some(Self::Public),
260 "strict" => Some(Self::Strict),
261 _ => None,
262 }
263 }
264}
265
266#[derive(Clone, Debug)]
267struct TelemetryRedactionPolicy {
268 mode_override: Option<RedactionMode>,
269 sensitive_patterns: Vec<String>,
270 allow_keys: HashSet<String>,
271}
272
273impl Default for TelemetryRedactionPolicy {
274 fn default() -> Self {
275 Self {
276 mode_override: None,
277 sensitive_patterns: default_sensitive_patterns(),
278 allow_keys: HashSet::new(),
279 }
280 }
281}
282
283impl TelemetryRedactionPolicy {
284 fn from_env() -> Self {
285 let mut policy = Self::default();
286
287 if let Ok(raw_mode) = std::env::var("RANVIER_TELEMETRY_REDACT_MODE") {
288 match RedactionMode::parse(&raw_mode) {
289 Some(mode) => policy.mode_override = Some(mode),
290 None => tracing::warn!(
291 "Invalid RANVIER_TELEMETRY_REDACT_MODE='{}' (expected: off|public|strict)",
292 raw_mode
293 ),
294 }
295 }
296
297 if let Ok(extra) = std::env::var("RANVIER_TELEMETRY_REDACT_KEYS") {
298 for key in parse_csv_lower(&extra) {
299 if !policy.sensitive_patterns.contains(&key) {
300 policy.sensitive_patterns.push(key);
301 }
302 }
303 }
304
305 if let Ok(allow) = std::env::var("RANVIER_TELEMETRY_ALLOW_KEYS") {
306 policy.allow_keys.extend(parse_csv_lower(&allow));
307 }
308
309 policy
310 }
311
312 fn mode_for_surface(&self, surface: ProjectionSurface) -> RedactionMode {
313 if let Some(mode) = self.mode_override {
314 return mode;
315 }
316
317 match surface {
318 ProjectionSurface::Public => RedactionMode::Public,
319 ProjectionSurface::Internal => RedactionMode::Off,
320 }
321 }
322
323 fn is_sensitive_key(&self, key: &str) -> bool {
324 let lowered = key.to_ascii_lowercase();
325 self.sensitive_patterns
326 .iter()
327 .any(|pattern| lowered.contains(pattern))
328 }
329}
330
331fn get_sender() -> &'static broadcast::Sender<String> {
332 EVENT_CHANNEL.get_or_init(|| {
333 let (tx, _rx) = broadcast::channel(100);
334 tx
335 })
336}
337
338pub struct Inspector {
340 port: u16,
341 schematic: Arc<Mutex<Schematic>>,
342 public_projection: Arc<Mutex<Option<Value>>>,
343 internal_projection: Arc<Mutex<Option<Value>>>,
344 public_projection_path: Option<String>,
345 internal_projection_path: Option<String>,
346 surface_policy: SurfacePolicy,
347 auth_policy: AuthPolicy,
348 redaction_policy: TelemetryRedactionPolicy,
349 state_inspector: Option<Arc<dyn StateInspector>>,
350 dlq_reader: Option<Arc<dyn DlqReader>>,
351 payload_policy: payload::PayloadCapturePolicy,
352 relay_state: Option<relay::RelayState>,
353 bearer_auth: auth::BearerAuth,
354 trace_store: Option<Arc<dyn trace_store::TraceStore>>,
355 alert_dispatcher: Option<Arc<alert::AlertDispatcher>>,
356}
357
358impl Inspector {
359 pub fn new(schematic: Schematic, port: u16) -> Self {
360 get_sender();
362 let public_projection = default_public_projection(&schematic);
363 let internal_projection = default_internal_projection(&schematic);
364
365 Self {
366 port,
367 schematic: Arc::new(Mutex::new(schematic)),
368 public_projection: Arc::new(Mutex::new(Some(public_projection))),
369 internal_projection: Arc::new(Mutex::new(Some(internal_projection))),
370 public_projection_path: None,
371 internal_projection_path: None,
372 surface_policy: SurfacePolicy::for_mode(InspectorMode::Dev),
373 auth_policy: AuthPolicy::default(),
374 redaction_policy: TelemetryRedactionPolicy::from_env(),
375 state_inspector: None,
376 dlq_reader: None,
377 payload_policy: payload::PayloadCapturePolicy::from_env(),
378 relay_state: None,
379 bearer_auth: auth::BearerAuth::default(),
380 trace_store: None,
381 alert_dispatcher: None,
382 }
383 }
384
385 pub fn with_relay_target(mut self, target_url: impl Into<String>) -> Self {
394 let config = relay::RelayConfig::new(target_url);
395 self.relay_state = Some(relay::RelayState::new(config));
396 self
397 }
398
399 pub fn with_routes(self, route_infos: Vec<routes::RouteInfo>) -> Self {
403 routes::register_routes(route_infos);
404 self
405 }
406
407 pub fn with_dlq_reader(mut self, reader: Arc<dyn DlqReader>) -> Self {
408 self.dlq_reader = Some(reader);
409 self
410 }
411
412 pub fn with_payload_capture(mut self, policy: payload::PayloadCapturePolicy) -> Self {
413 self.payload_policy = policy;
414 self
415 }
416
417 pub fn with_state_inspector(mut self, inspector: Arc<dyn StateInspector>) -> Self {
418 self.state_inspector = Some(inspector);
419 self
420 }
421
422 pub fn with_public_projection(self, projection: Value) -> Self {
424 if let Ok(mut slot) = self.public_projection.lock() {
425 *slot = Some(projection);
426 }
427 self
428 }
429
430 pub fn with_internal_projection(self, projection: Value) -> Self {
432 if let Ok(mut slot) = self.internal_projection.lock() {
433 *slot = Some(projection);
434 }
435 self
436 }
437
438 pub fn with_projection_files_from_env(self) -> Self {
444 let mut inspector = self;
445
446 if let Ok(path) = std::env::var("RANVIER_TRACE_PUBLIC_PATH") {
447 inspector.public_projection_path = Some(path.clone());
448 match read_projection_file(&path) {
449 Ok(v) => inspector = inspector.with_public_projection(v),
450 Err(e) => tracing::warn!("Failed to load public projection from {}: {}", path, e),
451 }
452 }
453
454 if let Ok(path) = std::env::var("RANVIER_TRACE_INTERNAL_PATH") {
455 inspector.internal_projection_path = Some(path.clone());
456 match read_projection_file(&path) {
457 Ok(v) => inspector = inspector.with_internal_projection(v),
458 Err(e) => tracing::warn!("Failed to load internal projection from {}: {}", path, e),
459 }
460 }
461
462 inspector
463 }
464
465 pub fn with_mode_from_env(mut self) -> Self {
470 let mode = InspectorMode::from_env();
471 self.surface_policy = SurfacePolicy::for_mode(mode);
472 self
473 }
474
475 pub fn with_mode(mut self, mode: &str) -> Self {
481 let parsed = InspectorMode::from_str(mode);
482 self.surface_policy = SurfacePolicy::for_mode(parsed);
483 self
484 }
485
486 pub fn with_auth_policy_from_env(mut self) -> Self {
491 self.auth_policy = AuthPolicy::from_env();
492 self
493 }
494
495 pub fn with_auth_enforcement(mut self, enabled: bool) -> Self {
497 self.auth_policy.enforce_headers = enabled;
498 self
499 }
500
501 pub fn with_require_tenant_for_internal(mut self, required: bool) -> Self {
503 self.auth_policy.require_tenant_for_internal = required;
504 self
505 }
506
507 pub fn with_redaction_policy_from_env(mut self) -> Self {
514 self.redaction_policy = TelemetryRedactionPolicy::from_env();
515 self
516 }
517
518 pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
520 self.bearer_auth = auth::BearerAuth { token: Some(token.into()) };
521 self
522 }
523
524 pub fn with_bearer_token_from_env(mut self) -> Self {
526 self.bearer_auth = auth::BearerAuth::from_env();
527 self
528 }
529
530 pub fn with_trace_store(mut self, store: Arc<dyn trace_store::TraceStore>) -> Self {
532 self.trace_store = Some(store);
533 self
534 }
535
536 pub fn with_alert_dispatcher(mut self, dispatcher: Arc<alert::AlertDispatcher>) -> Self {
538 self.alert_dispatcher = Some(dispatcher);
539 self
540 }
541
542 pub async fn serve(self) -> Result<(), std::io::Error> {
543 let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
544 let listener = tokio::net::TcpListener::bind(addr).await?;
545 self.serve_with_listener(listener).await
546 }
547
548 pub async fn serve_with_listener(
549 self,
550 listener: tokio::net::TcpListener,
551 ) -> Result<(), std::io::Error> {
552 let state = InspectorState {
553 schematic: self.schematic.clone(),
554 public_projection: self.public_projection.clone(),
555 internal_projection: self.internal_projection.clone(),
556 public_projection_path: self.public_projection_path.clone(),
557 internal_projection_path: self.internal_projection_path.clone(),
558 auth_policy: self.auth_policy,
559 redaction_policy: self.redaction_policy.clone(),
560 state_inspector: self.state_inspector,
561 dlq_reader: self.dlq_reader,
562 relay_state: self.relay_state,
563 bearer_auth: self.bearer_auth,
564 trace_store: self.trace_store,
565 alert_dispatcher: self.alert_dispatcher,
566 };
567
568 PAYLOAD_POLICY.get_or_init(|| self.payload_policy);
570
571 let mut app = Router::new()
572 .route("/schematic", get(get_schematic))
573 .route("/trace/public", get(get_public_projection))
574 .route("/debug/resume/:trace_id", get(debug_resume))
575 .route("/debug/step/:trace_id", get(debug_step))
576 .route("/debug/pause/:trace_id", get(debug_pause))
577 .route("/api/v1/state/:trace_id", get(api_get_state))
578 .route(
579 "/api/v1/state/:trace_id/resume",
580 axum::routing::post(api_post_resume),
581 )
582 .route("/metrics", get(prometheus_metrics_handler))
583 .layer(CorsLayer::permissive());
584
585 if self.surface_policy.expose_internal {
586 app = app
587 .route("/trace/internal", get(get_internal_projection))
588 .route("/inspector/circuits", get(get_inspector_circuits))
589 .route(
590 "/inspector/circuits/:name",
591 get(get_inspector_circuit_by_name),
592 )
593 .route("/inspector/bus", get(get_inspector_bus))
594 .route(
595 "/inspector/timeline/:request_id",
596 get(get_inspector_timeline_by_request_id),
597 )
598 .route("/api/v1/traces", get(api_get_traces))
599 .route("/api/v1/metrics", get(api_get_metrics_all))
600 .route("/api/v1/metrics/:circuit", get(api_get_metrics))
601 .route("/api/v1/events", get(api_get_events))
602 .route("/api/v1/dlq", get(api_get_dlq))
603 .route(
604 "/api/v1/breakpoints",
605 get(api_get_breakpoints).post(api_post_breakpoint),
606 )
607 .route(
608 "/api/v1/breakpoints/:bp_id",
609 axum::routing::delete(api_delete_breakpoint)
610 .patch(api_patch_breakpoint),
611 )
612 .route("/api/v1/stalls", get(api_get_stalls))
613 .route("/api/v1/routes", get(api_get_routes))
614 .route(
615 "/api/v1/routes/schema",
616 axum::routing::post(api_post_routes_schema),
617 )
618 .route(
619 "/api/v1/routes/sample",
620 axum::routing::post(api_post_routes_sample),
621 )
622 .route(
623 "/api/v1/relay",
624 axum::routing::post(api_post_relay),
625 )
626 .route("/api/v1/traces/stored", get(api_get_stored_traces));
627 }
628
629 if self.surface_policy.expose_events {
630 app = app.route("/events", get(ws_handler));
631 }
632
633 if self.surface_policy.expose_quick_view {
634 app = app
635 .route("/quick-view", get(get_quick_view_html))
636 .route("/quick-view/app.js", get(get_quick_view_js))
637 .route("/quick-view/styles.css", get(get_quick_view_css));
638 }
639
640 let app = app.with_state(state);
641 let addr = listener.local_addr()?;
642 tracing::info!("Ranvier Inspector listening on http://{}", addr);
643
644 if self.surface_policy.expose_events {
646 let broadcast_interval = std::env::var("RANVIER_INSPECTOR_METRICS_INTERVAL_MS")
647 .ok()
648 .and_then(|v| v.parse::<u64>().ok())
649 .unwrap_or(2000);
650 tokio::spawn(async move {
651 let mut interval =
652 tokio::time::interval(std::time::Duration::from_millis(broadcast_interval));
653 loop {
654 interval.tick().await;
655 let snapshots = metrics::snapshot_all();
656 if !snapshots.is_empty() {
657 let msg = serde_json::json!({
658 "type": "metrics",
659 "circuits": snapshots,
660 "timestamp": epoch_ms()
661 })
662 .to_string();
663 let _ = get_sender().send(msg);
664 }
665
666 let stalls = stall::detect_stalls();
668 if !stalls.is_empty() {
669 let msg = serde_json::json!({
670 "type": "stall_detected",
671 "stalls": stalls,
672 "timestamp": epoch_ms()
673 })
674 .to_string();
675 let _ = get_sender().send(msg);
676 }
677 }
678 });
679 }
680
681 axum::serve(listener, app).await
682 }
683}
684
685fn default_public_projection(schematic: &Schematic) -> Value {
686 serde_json::json!({
687 "service_name": schematic.name,
688 "window_start": "1970-01-01T00:00:00Z",
689 "window_end": "1970-01-01T00:00:00Z",
690 "overall_status": "operational",
691 "circuits": [
692 {
693 "name": schematic.name,
694 "status": "operational",
695 "success_rate": 1.0,
696 "error_rate": 0.0,
697 "p95_latency_ms": 0.0
698 }
699 ]
700 })
701}
702
703fn default_internal_projection(schematic: &Schematic) -> Value {
704 let nodes = schematic
705 .nodes
706 .iter()
707 .map(|n| {
708 serde_json::json!({
709 "node_id": n.id,
710 "label": n.label,
711 "kind": node_kind_name(&n.kind),
712 "entered_at": "1970-01-01T00:00:00Z",
713 "exited_at": "1970-01-01T00:00:00Z",
714 "latency_ms": 0.0,
715 "outcome_type": "Next",
716 "branch_id": Value::Null,
717 "error_code": Value::Null,
718 "error_category": Value::Null
719 })
720 })
721 .collect::<Vec<_>>();
722
723 serde_json::json!({
724 "trace_id": "bootstrap",
725 "circuit_id": schematic.id,
726 "started_at": "1970-01-01T00:00:00Z",
727 "finished_at": "1970-01-01T00:00:00Z",
728 "nodes": nodes,
729 "summary": {
730 "node_count": schematic.nodes.len(),
731 "fault_count": 0,
732 "branch_count": 0
733 }
734 })
735}
736
737fn node_kind_name(kind: &NodeKind) -> &'static str {
738 match kind {
739 NodeKind::Ingress => "Ingress",
740 NodeKind::Atom => "Atom",
741 NodeKind::Synapse => "Synapse",
742 NodeKind::Egress => "Egress",
743 NodeKind::Subgraph(_) => "Subgraph",
744 NodeKind::FanOut => "FanOut",
745 NodeKind::FanIn => "FanIn",
746 }
747}
748
749fn read_projection_file(path: &str) -> Result<Value, String> {
750 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
751 serde_json::from_str::<Value>(&content).map_err(|e| e.to_string())
752}
753
754fn default_sensitive_patterns() -> Vec<String> {
755 vec![
756 "password".to_string(),
757 "secret".to_string(),
758 "token".to_string(),
759 "authorization".to_string(),
760 "cookie".to_string(),
761 "session".to_string(),
762 "api_key".to_string(),
763 "credit_card".to_string(),
764 "ssn".to_string(),
765 "email".to_string(),
766 "phone".to_string(),
767 ]
768}
769
770fn parse_csv_lower(raw: &str) -> Vec<String> {
771 raw.split(',')
772 .map(|s| s.trim().to_ascii_lowercase())
773 .filter(|s| !s.is_empty())
774 .collect()
775}
776
777fn apply_projection_redaction(
778 projection: Value,
779 surface: ProjectionSurface,
780 policy: &TelemetryRedactionPolicy,
781) -> Value {
782 let mode = policy.mode_for_surface(surface);
783 redact_json_value(projection, mode, policy, None)
784}
785
786fn redact_json_value(
787 value: Value,
788 mode: RedactionMode,
789 policy: &TelemetryRedactionPolicy,
790 parent_key: Option<&str>,
791) -> Value {
792 if mode == RedactionMode::Off {
793 return value;
794 }
795
796 match value {
797 Value::Object(map) => {
798 let mut out = serde_json::Map::with_capacity(map.len());
799 let strict_attribute_bag =
800 mode == RedactionMode::Strict && parent_key.is_some_and(is_attribute_bag_key);
801
802 for (key, child) in map {
803 let lowered = key.to_ascii_lowercase();
804
805 if strict_attribute_bag && !policy.allow_keys.contains(&lowered) {
806 if policy.is_sensitive_key(&lowered) {
807 out.insert(key, Value::String("[REDACTED]".to_string()));
808 }
809 continue;
810 }
811
812 if policy.is_sensitive_key(&lowered) {
813 out.insert(key, Value::String("[REDACTED]".to_string()));
814 continue;
815 }
816
817 out.insert(
818 key.clone(),
819 redact_json_value(child, mode, policy, Some(&key)),
820 );
821 }
822 Value::Object(out)
823 }
824 Value::Array(values) => Value::Array(
825 values
826 .into_iter()
827 .map(|v| redact_json_value(v, mode, policy, parent_key))
828 .collect(),
829 ),
830 other => other,
831 }
832}
833
834fn is_attribute_bag_key(key: &str) -> bool {
835 let lowered = key.to_ascii_lowercase();
836 lowered == "attributes" || lowered.ends_with("_attributes")
837}
838
839async fn debug_resume(AxPath(trace_id): AxPath<String>) -> impl IntoResponse {
840 if let Some(debug) = get_debug_control_for_trace(&trace_id) {
841 debug.resume();
842 StatusCode::OK
843 } else {
844 StatusCode::NOT_FOUND
845 }
846}
847
848async fn debug_step(AxPath(trace_id): AxPath<String>) -> impl IntoResponse {
849 if let Some(debug) = get_debug_control_for_trace(&trace_id) {
850 debug.step();
851 StatusCode::OK
852 } else {
853 StatusCode::NOT_FOUND
854 }
855}
856
857async fn debug_pause(AxPath(trace_id): AxPath<String>) -> impl IntoResponse {
858 if let Some(debug) = get_debug_control_for_trace(&trace_id) {
859 debug.pause();
860 StatusCode::OK
861 } else {
862 StatusCode::NOT_FOUND
863 }
864}
865
866async fn api_get_state(
867 AxPath(trace_id): AxPath<String>,
868 State(state): State<InspectorState>,
869) -> impl IntoResponse {
870 if let Some(inspector) = state.state_inspector.as_ref()
871 && let Some(val) = inspector.get_state(&trace_id).await
872 {
873 return Json(val).into_response();
874 }
875 StatusCode::NOT_FOUND.into_response()
876}
877
878#[derive(Deserialize)]
879struct ResumePayload {
880 target_node: String,
881 #[allow(dead_code)]
882 force: bool,
883 payload_override: Option<Value>,
884}
885
886async fn api_post_resume(
887 AxPath(trace_id): AxPath<String>,
888 State(state): State<InspectorState>,
889 Json(payload): Json<ResumePayload>,
890) -> impl IntoResponse {
891 if let Some(inspector) = state.state_inspector.as_ref() {
892 match inspector
893 .force_resume(&trace_id, &payload.target_node, payload.payload_override)
894 .await
895 {
896 Ok(_) => StatusCode::OK.into_response(),
897 Err(e) => (StatusCode::BAD_REQUEST, e).into_response(),
898 }
899 } else {
900 (
901 StatusCode::SERVICE_UNAVAILABLE,
902 "State inspector not configured",
903 )
904 .into_response()
905 }
906}
907
908async fn api_get_traces(
909 headers: HeaderMap,
910 State(state): State<InspectorState>,
911) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
912 ensure_internal_access(&headers, &state.auth_policy)?;
913 let traces = get_trace_registry()
914 .lock()
915 .map(|r| r.list_all())
916 .unwrap_or_default();
917 Ok(inspector_envelope(
918 "inspector.traces.v1",
919 serde_json::json!({
920 "count": traces.len(),
921 "traces": traces
922 }),
923 ))
924}
925
926async fn api_get_metrics(
927 headers: HeaderMap,
928 AxPath(circuit): AxPath<String>,
929 State(state): State<InspectorState>,
930) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
931 ensure_internal_access(&headers, &state.auth_policy)?;
932 match metrics::snapshot_circuit(&circuit) {
933 Some(snap) => Ok(inspector_envelope(
934 "inspector.metrics.v1",
935 serde_json::json!(snap),
936 )),
937 None => Err((
938 StatusCode::NOT_FOUND,
939 Json(serde_json::json!({ "error": "No metrics for circuit", "circuit": circuit })),
940 )),
941 }
942}
943
944async fn api_get_metrics_all(
945 headers: HeaderMap,
946 State(state): State<InspectorState>,
947) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
948 ensure_internal_access(&headers, &state.auth_policy)?;
949 let snapshots = metrics::snapshot_all();
950 Ok(inspector_envelope(
951 "inspector.metrics.v1",
952 serde_json::json!({
953 "count": snapshots.len(),
954 "circuits": snapshots
955 }),
956 ))
957}
958
959async fn prometheus_metrics_handler(
965 headers: HeaderMap,
966 State(state): State<InspectorState>,
967) -> Result<impl IntoResponse, (StatusCode, Json<Value>)> {
968 state.bearer_auth.validate(&headers)?;
969 let body = prometheus::render();
970 Ok((
971 [(header::CONTENT_TYPE, prometheus::CONTENT_TYPE)],
972 body,
973 ))
974}
975
976async fn api_get_events(
977 headers: HeaderMap,
978 State(state): State<InspectorState>,
979) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
980 ensure_internal_access(&headers, &state.auth_policy)?;
981 let events = payload::list_events(200);
982 Ok(inspector_envelope(
983 "inspector.events.v1",
984 serde_json::json!({
985 "count": events.len(),
986 "events": events
987 }),
988 ))
989}
990
991async fn api_get_dlq(
992 headers: HeaderMap,
993 State(state): State<InspectorState>,
994) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
995 ensure_internal_access(&headers, &state.auth_policy)?;
996 match &state.dlq_reader {
997 Some(reader) => {
998 let letters = reader
999 .list_dead_letters(None, 100)
1000 .await
1001 .unwrap_or_default();
1002 let count = reader.count_dead_letters().await.unwrap_or(0);
1003 Ok(inspector_envelope(
1004 "inspector.dlq.v1",
1005 serde_json::json!({
1006 "total": count,
1007 "items": letters
1008 }),
1009 ))
1010 }
1011 None => Ok(inspector_envelope(
1012 "inspector.dlq.v1",
1013 serde_json::json!({
1014 "total": 0,
1015 "items": [],
1016 "note": "No DLQ reader configured"
1017 }),
1018 )),
1019 }
1020}
1021
1022async fn api_get_breakpoints(
1023 headers: HeaderMap,
1024 State(state): State<InspectorState>,
1025) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1026 ensure_internal_access(&headers, &state.auth_policy)?;
1027 let bps = breakpoint::list_breakpoints();
1028 Ok(inspector_envelope(
1029 "inspector.breakpoints.v1",
1030 serde_json::json!({
1031 "count": bps.len(),
1032 "breakpoints": bps
1033 }),
1034 ))
1035}
1036
1037#[derive(Deserialize)]
1038struct CreateBreakpointPayload {
1039 node_id: String,
1040 condition: Option<String>,
1041}
1042
1043async fn api_post_breakpoint(
1044 headers: HeaderMap,
1045 State(state): State<InspectorState>,
1046 Json(body): Json<CreateBreakpointPayload>,
1047) -> Result<(StatusCode, Json<Value>), (StatusCode, Json<Value>)> {
1048 ensure_internal_access(&headers, &state.auth_policy)?;
1049 let bp = breakpoint::add_breakpoint(body.node_id, body.condition);
1050 Ok((
1051 StatusCode::CREATED,
1052 inspector_envelope("inspector.breakpoint.v1", serde_json::json!(bp)),
1053 ))
1054}
1055
1056async fn api_delete_breakpoint(
1057 headers: HeaderMap,
1058 AxPath(bp_id): AxPath<String>,
1059 State(state): State<InspectorState>,
1060) -> Result<StatusCode, (StatusCode, Json<Value>)> {
1061 ensure_internal_access(&headers, &state.auth_policy)?;
1062 if breakpoint::remove_breakpoint(&bp_id) {
1063 Ok(StatusCode::NO_CONTENT)
1064 } else {
1065 Err(policy_error(StatusCode::NOT_FOUND, "breakpoint_not_found"))
1066 }
1067}
1068
1069#[derive(Deserialize)]
1070struct PatchBreakpointPayload {
1071 enabled: Option<bool>,
1072 condition: Option<Option<String>>,
1073}
1074
1075async fn api_patch_breakpoint(
1076 headers: HeaderMap,
1077 AxPath(bp_id): AxPath<String>,
1078 State(state): State<InspectorState>,
1079 Json(body): Json<PatchBreakpointPayload>,
1080) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1081 ensure_internal_access(&headers, &state.auth_policy)?;
1082 match breakpoint::update_breakpoint(&bp_id, body.enabled, body.condition) {
1083 Some(bp) => Ok(inspector_envelope(
1084 "inspector.breakpoint.v1",
1085 serde_json::json!(bp),
1086 )),
1087 None => Err(policy_error(StatusCode::NOT_FOUND, "breakpoint_not_found")),
1088 }
1089}
1090
1091async fn api_get_stalls(
1092 headers: HeaderMap,
1093 State(state): State<InspectorState>,
1094) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1095 ensure_internal_access(&headers, &state.auth_policy)?;
1096 let stalls = stall::detect_stalls();
1097 Ok(inspector_envelope(
1098 "inspector.stalls.v1",
1099 serde_json::json!({
1100 "count": stalls.len(),
1101 "stalls": stalls
1102 }),
1103 ))
1104}
1105
1106static DEBUG_REGISTRY: OnceLock<Arc<Mutex<HashMap<String, DebugControl>>>> = OnceLock::new();
1107
1108fn get_debug_registry() -> Arc<Mutex<HashMap<String, DebugControl>>> {
1109 DEBUG_REGISTRY
1110 .get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
1111 .clone()
1112}
1113
1114pub fn get_debug_control_for_trace(trace_id: &str) -> Option<DebugControl> {
1115 get_debug_registry().lock().unwrap().get(trace_id).cloned()
1116}
1117
1118pub fn register_debug_control(trace_id: String, control: DebugControl) {
1119 get_debug_registry()
1120 .lock()
1121 .unwrap()
1122 .insert(trace_id, control);
1123}
1124
1125pub fn unregister_debug_control(trace_id: &str) {
1126 get_debug_registry().lock().unwrap().remove(trace_id);
1127}
1128
1129#[derive(Clone)]
1130struct InspectorState {
1131 schematic: Arc<Mutex<Schematic>>,
1132 public_projection: Arc<Mutex<Option<Value>>>,
1133 internal_projection: Arc<Mutex<Option<Value>>>,
1134 public_projection_path: Option<String>,
1135 internal_projection_path: Option<String>,
1136 auth_policy: AuthPolicy,
1137 redaction_policy: TelemetryRedactionPolicy,
1138 state_inspector: Option<Arc<dyn StateInspector>>,
1139 dlq_reader: Option<Arc<dyn DlqReader>>,
1140 relay_state: Option<relay::RelayState>,
1141 bearer_auth: auth::BearerAuth,
1142 trace_store: Option<Arc<dyn trace_store::TraceStore>>,
1143 #[allow(dead_code)]
1144 alert_dispatcher: Option<Arc<alert::AlertDispatcher>>,
1145}
1146
1147pub fn layer() -> InspectorLayer {
1148 InspectorLayer
1149}
1150
1151struct SpanData {
1153 node_id: Option<String>,
1154 resource_type: Option<String>,
1155 circuit: Option<String>,
1156 outcome_kind: Option<String>,
1157 outcome_target: Option<String>,
1158 entered_at: Option<Instant>,
1159 duration_ms: Option<u64>,
1160}
1161
1162impl SpanData {
1163 fn from_visitor(v: SpanFieldExtractor) -> Self {
1164 Self {
1165 node_id: v.node_id,
1166 resource_type: v.resource_type,
1167 circuit: v.circuit,
1168 outcome_kind: v.outcome_kind,
1169 outcome_target: v.outcome_target,
1170 entered_at: None,
1171 duration_ms: None,
1172 }
1173 }
1174
1175 fn update_from_visitor(&mut self, v: SpanFieldExtractor) {
1176 if let Some(val) = v.node_id {
1177 self.node_id = Some(val);
1178 }
1179 if let Some(val) = v.resource_type {
1180 self.resource_type = Some(val);
1181 }
1182 if let Some(val) = v.circuit {
1183 self.circuit = Some(val);
1184 }
1185 if let Some(val) = v.outcome_kind {
1186 self.outcome_kind = Some(val);
1187 }
1188 if let Some(val) = v.outcome_target {
1189 self.outcome_target = Some(val);
1190 }
1191 }
1192}
1193
1194struct SpanFieldExtractor {
1195 node_id: Option<String>,
1196 resource_type: Option<String>,
1197 circuit: Option<String>,
1198 outcome_kind: Option<String>,
1199 outcome_target: Option<String>,
1200}
1201
1202impl SpanFieldExtractor {
1203 fn new() -> Self {
1204 Self {
1205 node_id: None,
1206 resource_type: None,
1207 circuit: None,
1208 outcome_kind: None,
1209 outcome_target: None,
1210 }
1211 }
1212}
1213
1214impl tracing::field::Visit for SpanFieldExtractor {
1215 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1216 match field.name() {
1217 "ranvier.node" => self.node_id = Some(value.to_string()),
1218 "ranvier.resource_type" => self.resource_type = Some(value.to_string()),
1219 "ranvier.circuit" => self.circuit = Some(value.to_string()),
1220 "ranvier.outcome_kind" => self.outcome_kind = Some(value.to_string()),
1221 "ranvier.outcome_target" => self.outcome_target = Some(value.to_string()),
1222 _ => {}
1223 }
1224 }
1225
1226 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1227 let s = format!("{value:?}");
1229 match field.name() {
1230 "ranvier.node" => self.node_id = Some(s),
1231 "ranvier.resource_type" => self.resource_type = Some(s),
1232 "ranvier.circuit" => self.circuit = Some(s),
1233 "ranvier.outcome_kind" => self.outcome_kind = Some(s),
1234 "ranvier.outcome_target" => self.outcome_target = Some(s),
1235 _ => {}
1236 }
1237 }
1238}
1239
1240fn epoch_ms() -> u64 {
1241 SystemTime::now()
1242 .duration_since(UNIX_EPOCH)
1243 .unwrap_or_default()
1244 .as_millis() as u64
1245}
1246
1247pub struct InspectorLayer;
1248
1249impl<S> Layer<S> for InspectorLayer
1250where
1251 S: Subscriber + for<'a> LookupSpan<'a>,
1252{
1253 fn on_new_span(
1254 &self,
1255 attrs: &tracing::span::Attributes<'_>,
1256 id: &Id,
1257 ctx: Context<'_, S>,
1258 ) {
1259 if let Some(span) = ctx.span(id) {
1260 let name = span.name();
1261 if name == "Node" || name == "Circuit" || name == "NodeRetry" {
1262 let mut extractor = SpanFieldExtractor::new();
1263 attrs.record(&mut extractor);
1264 span.extensions_mut()
1265 .insert(SpanData::from_visitor(extractor));
1266 }
1267 }
1268 }
1269
1270 fn on_record(
1271 &self,
1272 id: &Id,
1273 values: &tracing::span::Record<'_>,
1274 ctx: Context<'_, S>,
1275 ) {
1276 if let Some(span) = ctx.span(id) {
1277 let name = span.name();
1278 if name == "Node" || name == "Circuit" || name == "NodeRetry" {
1279 let mut extractor = SpanFieldExtractor::new();
1280 values.record(&mut extractor);
1281 if let Some(data) = span.extensions_mut().get_mut::<SpanData>() {
1282 data.update_from_visitor(extractor);
1283 }
1284 }
1285 }
1286 }
1287
1288 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
1289 let metadata = event.metadata();
1290 if metadata.target() == "ranvier.debugger" {
1291 let mut fields = HashMap::new();
1292 let mut visitor = FieldVisitor {
1293 fields: &mut fields,
1294 };
1295 event.record(&mut visitor);
1296
1297 if let (Some(trace_id), Some(node_id)) =
1298 (fields.get("trace_id"), fields.get("node_id"))
1299 {
1300 let msg = serde_json::json!({
1301 "type": "node_paused",
1302 "trace_id": trace_id,
1303 "node_id": node_id,
1304 "timestamp": epoch_ms()
1305 })
1306 .to_string();
1307 let _ = get_sender().send(msg);
1308 }
1309 return;
1310 }
1311
1312 if metadata.target().starts_with("ranvier") {
1313 let mut fields = HashMap::new();
1314 let mut visitor = FieldVisitor {
1315 fields: &mut fields,
1316 };
1317 event.record(&mut visitor);
1318
1319 let msg = serde_json::json!({
1320 "type": "event",
1321 "target": metadata.target(),
1322 "level": format!("{}", metadata.level()),
1323 "fields": fields,
1324 "timestamp": epoch_ms()
1325 })
1326 .to_string();
1327 let _ = get_sender().send(msg);
1328 }
1329 }
1330
1331 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
1332 if let Some(span) = ctx.span(id) {
1333 let name = span.name();
1334 if name == "Node" || name == "Circuit" {
1335 let mut extensions = span.extensions_mut();
1336 if let Some(data) = extensions.get_mut::<SpanData>() {
1337 data.entered_at = Some(Instant::now());
1338
1339 if name == "Node" {
1340 let msg = serde_json::json!({
1341 "type": "node_enter",
1342 "node_id": data.node_id,
1343 "resource_type": data.resource_type,
1344 "timestamp": epoch_ms()
1345 })
1346 .to_string();
1347 let _ = get_sender().send(msg);
1348
1349 if let Some(node_id) = &data.node_id {
1351 let circuit_name = data.circuit.clone().unwrap_or_default();
1352 stall::register_node(
1353 format!("{:?}", id),
1354 node_id.clone(),
1355 circuit_name,
1356 );
1357 }
1358 } else if name == "Circuit" {
1359 if let Some(circuit) = &data.circuit {
1360 if let Ok(mut registry) = get_trace_registry().lock() {
1361 registry.register(circuit.clone());
1362 }
1363 }
1364 }
1365 }
1366 }
1367 }
1368 }
1369
1370 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
1371 if let Some(span) = ctx.span(id) {
1372 let name = span.name();
1373 if name == "Node" || name == "Circuit" {
1374 let mut extensions = span.extensions_mut();
1375 if let Some(data) = extensions.get_mut::<SpanData>() {
1376 data.duration_ms =
1378 data.entered_at.map(|t| t.elapsed().as_millis() as u64);
1379 }
1380 }
1381 }
1382 }
1383
1384 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
1385 if let Some(span) = ctx.span(&id) {
1386 let name = span.name();
1387 if name == "Node" {
1388 stall::unregister_node(&format!("{:?}", id));
1390
1391 let extensions = span.extensions();
1392 if let Some(data) = extensions.get::<SpanData>() {
1393 let duration = data.duration_ms.unwrap_or(0);
1394 let is_error = data.outcome_kind.as_deref() == Some("Fault");
1395
1396 let msg = serde_json::json!({
1397 "type": "node_exit",
1398 "node_id": data.node_id,
1399 "resource_type": data.resource_type,
1400 "outcome_type": data.outcome_kind,
1401 "outcome_target": data.outcome_target,
1402 "duration_ms": duration,
1403 "timestamp": epoch_ms()
1404 })
1405 .to_string();
1406 let _ = get_sender().send(msg);
1407
1408 let circuit_name = span
1410 .parent()
1411 .and_then(|p| {
1412 p.extensions()
1413 .get::<SpanData>()
1414 .and_then(|d| d.circuit.clone())
1415 });
1416 if let Some(node_id) = &data.node_id {
1417 metrics::record_global_node_exit(
1418 circuit_name.as_deref().unwrap_or("default"),
1419 node_id,
1420 duration,
1421 is_error,
1422 );
1423 }
1424
1425 payload::record_event(payload::CapturedEvent {
1427 timestamp: epoch_ms(),
1428 event_type: "node_exit".to_string(),
1429 node_id: data.node_id.clone(),
1430 circuit: circuit_name,
1431 duration_ms: Some(duration),
1432 outcome_type: data.outcome_kind.clone(),
1433 payload_hash: None,
1434 payload_json: None,
1435 });
1436 }
1437 } else if name == "Circuit" {
1438 let extensions = span.extensions();
1439 if let Some(data) = extensions.get::<SpanData>() {
1440 let msg = serde_json::json!({
1441 "type": "circuit_exit",
1442 "circuit": data.circuit,
1443 "outcome_type": data.outcome_kind,
1444 "outcome_target": data.outcome_target,
1445 "duration_ms": data.duration_ms.unwrap_or(0),
1446 "timestamp": epoch_ms()
1447 })
1448 .to_string();
1449 let _ = get_sender().send(msg);
1450
1451 if let Some(circuit) = &data.circuit {
1453 if let Ok(mut registry) = get_trace_registry().lock() {
1454 registry.complete(
1455 circuit,
1456 data.outcome_kind.clone(),
1457 data.duration_ms,
1458 );
1459 }
1460 }
1461
1462 payload::record_event(payload::CapturedEvent {
1464 timestamp: epoch_ms(),
1465 event_type: "circuit_exit".to_string(),
1466 node_id: None,
1467 circuit: data.circuit.clone(),
1468 duration_ms: data.duration_ms,
1469 outcome_type: data.outcome_kind.clone(),
1470 payload_hash: None,
1471 payload_json: None,
1472 });
1473 }
1474 }
1475 }
1476 }
1477}
1478
1479struct FieldVisitor<'a> {
1480 fields: &'a mut HashMap<String, String>,
1481}
1482
1483impl<'a> tracing::field::Visit for FieldVisitor<'a> {
1484 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1485 self.fields
1486 .insert(field.name().to_string(), format!("{:?}", value));
1487 }
1488
1489 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1490 self.fields
1491 .insert(field.name().to_string(), value.to_string());
1492 }
1493}
1494
1495async fn get_schematic(
1496 headers: HeaderMap,
1497 State(state): State<InspectorState>,
1498) -> Result<Json<Schematic>, (StatusCode, Json<Value>)> {
1499 ensure_public_access(&headers, &state.auth_policy)?;
1500 let schematic = state.schematic.lock().unwrap();
1501 Ok(Json(schematic.clone()))
1502}
1503
1504async fn get_public_projection(
1505 headers: HeaderMap,
1506 State(state): State<InspectorState>,
1507) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1508 ensure_public_access(&headers, &state.auth_policy)?;
1509 if let Some(path) = &state.public_projection_path
1510 && let Ok(v) = read_projection_file(path)
1511 {
1512 return Ok(Json(apply_projection_redaction(
1513 v,
1514 ProjectionSurface::Public,
1515 &state.redaction_policy,
1516 )));
1517 }
1518
1519 let projection = state
1520 .public_projection
1521 .lock()
1522 .ok()
1523 .and_then(|v| v.clone())
1524 .unwrap_or(Value::Null);
1525 Ok(Json(apply_projection_redaction(
1526 projection,
1527 ProjectionSurface::Public,
1528 &state.redaction_policy,
1529 )))
1530}
1531
1532async fn get_internal_projection(
1533 headers: HeaderMap,
1534 State(state): State<InspectorState>,
1535) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1536 ensure_internal_access(&headers, &state.auth_policy)?;
1537 if let Some(path) = &state.internal_projection_path
1538 && let Ok(v) = read_projection_file(path)
1539 {
1540 return Ok(Json(apply_projection_redaction(
1541 v,
1542 ProjectionSurface::Internal,
1543 &state.redaction_policy,
1544 )));
1545 }
1546
1547 let projection = state
1548 .internal_projection
1549 .lock()
1550 .ok()
1551 .and_then(|v| v.clone())
1552 .unwrap_or(Value::Null);
1553 Ok(Json(apply_projection_redaction(
1554 projection,
1555 ProjectionSurface::Internal,
1556 &state.redaction_policy,
1557 )))
1558}
1559
1560fn inspector_envelope(kind: &'static str, data: Value) -> Json<Value> {
1561 Json(serde_json::json!({
1562 "api_version": INSPECTOR_API_VERSION,
1563 "kind": kind,
1564 "data": data
1565 }))
1566}
1567
1568fn load_internal_projection_value(state: &InspectorState) -> Value {
1569 if let Some(path) = &state.internal_projection_path
1570 && let Ok(v) = read_projection_file(path)
1571 {
1572 return v;
1573 }
1574 state
1575 .internal_projection
1576 .lock()
1577 .ok()
1578 .and_then(|v| v.clone())
1579 .unwrap_or(Value::Null)
1580}
1581
1582fn latest_trace_from_projection(projection: &Value) -> Option<Value> {
1583 match projection {
1584 Value::Object(map) => {
1585 if let Some(Value::Array(traces)) = map.get("traces") {
1586 return traces.last().cloned();
1587 }
1588 if map.get("trace_id").is_some() {
1589 return Some(projection.clone());
1590 }
1591 None
1592 }
1593 Value::Array(traces) => traces.last().cloned(),
1594 _ => None,
1595 }
1596}
1597
1598fn find_trace_by_request_id(projection: &Value, request_id: &str) -> Option<Value> {
1599 if request_id.eq_ignore_ascii_case("latest") {
1600 return latest_trace_from_projection(projection);
1601 }
1602
1603 match projection {
1604 Value::Object(map) => {
1605 if map.get("trace_id").and_then(Value::as_str) == Some(request_id) {
1606 return Some(projection.clone());
1607 }
1608 if let Some(Value::Array(traces)) = map.get("traces") {
1609 return traces
1610 .iter()
1611 .find(|trace| trace.get("trace_id").and_then(Value::as_str) == Some(request_id))
1612 .cloned();
1613 }
1614 None
1615 }
1616 Value::Array(traces) => traces
1617 .iter()
1618 .find(|trace| trace.get("trace_id").and_then(Value::as_str) == Some(request_id))
1619 .cloned(),
1620 _ => None,
1621 }
1622}
1623
1624async fn get_inspector_circuits(
1625 headers: HeaderMap,
1626 State(state): State<InspectorState>,
1627) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1628 ensure_internal_access(&headers, &state.auth_policy)?;
1629 let schematic = state.schematic.lock().unwrap();
1630 let transition_count = schematic
1631 .nodes
1632 .iter()
1633 .filter(|node| matches!(&node.kind, NodeKind::Atom))
1634 .count();
1635 let has_capability_rules = schematic.nodes.iter().any(|node| {
1636 node.bus_capability
1637 .as_ref()
1638 .map(|policy| !policy.allow.is_empty() || !policy.deny.is_empty())
1639 .unwrap_or(false)
1640 });
1641
1642 Ok(inspector_envelope(
1643 "inspector.circuits.v1",
1644 serde_json::json!({
1645 "count": 1,
1646 "items": [
1647 {
1648 "id": schematic.id,
1649 "name": schematic.name,
1650 "schema_version": schematic.schema_version,
1651 "node_count": schematic.nodes.len(),
1652 "edge_count": schematic.edges.len(),
1653 "transition_count": transition_count,
1654 "has_bus_capability_rules": has_capability_rules
1655 }
1656 ]
1657 }),
1658 ))
1659}
1660
1661async fn get_inspector_circuit_by_name(
1662 headers: HeaderMap,
1663 AxPath(name): AxPath<String>,
1664 State(state): State<InspectorState>,
1665) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1666 ensure_internal_access(&headers, &state.auth_policy)?;
1667 let schematic = state.schematic.lock().unwrap().clone();
1668 if name != schematic.name && name != schematic.id {
1669 return Err(policy_error(StatusCode::NOT_FOUND, "circuit_not_found"));
1670 }
1671
1672 let public_projection_loaded = state
1673 .public_projection
1674 .lock()
1675 .ok()
1676 .map(|slot| slot.is_some())
1677 .unwrap_or(false);
1678 let internal_projection_loaded = state
1679 .internal_projection
1680 .lock()
1681 .ok()
1682 .map(|slot| slot.is_some())
1683 .unwrap_or(false);
1684
1685 Ok(inspector_envelope(
1686 "inspector.circuit.v1",
1687 serde_json::json!({
1688 "circuit": {
1689 "id": schematic.id,
1690 "name": schematic.name,
1691 "schema_version": schematic.schema_version,
1692 "node_count": schematic.nodes.len(),
1693 "edge_count": schematic.edges.len(),
1694 },
1695 "runtime_state": {
1696 "public_projection_loaded": public_projection_loaded,
1697 "internal_projection_loaded": internal_projection_loaded,
1698 "public_projection_path": state.public_projection_path,
1699 "internal_projection_path": state.internal_projection_path,
1700 },
1701 "schematic": schematic
1702 }),
1703 ))
1704}
1705
1706async fn get_inspector_bus(
1707 headers: HeaderMap,
1708 State(state): State<InspectorState>,
1709) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1710 ensure_internal_access(&headers, &state.auth_policy)?;
1711 let schematic = state.schematic.lock().unwrap();
1712
1713 let mut resource_types = HashSet::new();
1714 let mut transition_capabilities = Vec::new();
1715
1716 for node in &schematic.nodes {
1717 if !node.resource_type.trim().is_empty() && node.resource_type != "()" {
1718 resource_types.insert(node.resource_type.clone());
1719 }
1720 if !matches!(&node.kind, NodeKind::Atom) {
1721 continue;
1722 }
1723
1724 let mut allow = node
1725 .bus_capability
1726 .as_ref()
1727 .map(|policy| policy.allow.clone())
1728 .unwrap_or_default();
1729 let mut deny = node
1730 .bus_capability
1731 .as_ref()
1732 .map(|policy| policy.deny.clone())
1733 .unwrap_or_default();
1734 allow.sort();
1735 deny.sort();
1736 let access = if allow.is_empty() && deny.is_empty() {
1737 "unrestricted"
1738 } else {
1739 "restricted"
1740 };
1741
1742 transition_capabilities.push(serde_json::json!({
1743 "transition": node.label,
1744 "resource_type": node.resource_type,
1745 "access": access,
1746 "allow": allow,
1747 "deny": deny
1748 }));
1749 }
1750
1751 let mut resources = resource_types.into_iter().collect::<Vec<_>>();
1752 resources.sort();
1753
1754 Ok(inspector_envelope(
1755 "inspector.bus.v1",
1756 serde_json::json!({
1757 "resource_types": resources,
1758 "transition_capabilities": transition_capabilities
1759 }),
1760 ))
1761}
1762
1763async fn get_inspector_timeline_by_request_id(
1764 headers: HeaderMap,
1765 AxPath(request_id): AxPath<String>,
1766 State(state): State<InspectorState>,
1767) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1768 ensure_internal_access(&headers, &state.auth_policy)?;
1769 let projection = load_internal_projection_value(&state);
1770 let trace = find_trace_by_request_id(&projection, &request_id)
1771 .ok_or_else(|| policy_error(StatusCode::NOT_FOUND, "timeline_request_not_found"))?;
1772
1773 Ok(inspector_envelope(
1774 "inspector.timeline.v1",
1775 serde_json::json!({
1776 "request_id": request_id,
1777 "trace": trace
1778 }),
1779 ))
1780}
1781
1782async fn ws_handler(
1783 headers: HeaderMap,
1784 ws: WebSocketUpgrade,
1785 State(state): State<InspectorState>,
1786) -> impl IntoResponse {
1787 if let Err(err) = ensure_internal_access(&headers, &state.auth_policy) {
1788 return err.into_response();
1789 }
1790 ws.on_upgrade(handle_socket)
1791}
1792
1793async fn get_quick_view_html() -> impl IntoResponse {
1794 (
1795 [(header::CONTENT_TYPE, "text/html; charset=utf-8")],
1796 QUICK_VIEW_HTML,
1797 )
1798}
1799
1800async fn get_quick_view_js() -> impl IntoResponse {
1801 (
1802 [(
1803 header::CONTENT_TYPE,
1804 "application/javascript; charset=utf-8",
1805 )],
1806 QUICK_VIEW_JS,
1807 )
1808}
1809
1810async fn get_quick_view_css() -> impl IntoResponse {
1811 (
1812 [(header::CONTENT_TYPE, "text/css; charset=utf-8")],
1813 QUICK_VIEW_CSS,
1814 )
1815}
1816
1817async fn handle_socket(mut socket: WebSocket) {
1818 let mut rx = get_sender().subscribe();
1819
1820 while let Ok(msg) = rx.recv().await {
1821 if socket.send(Message::Text(msg)).await.is_err() {
1822 break;
1823 }
1824 }
1825}
1826
1827fn env_flag(name: &str, default: bool) -> bool {
1828 match std::env::var(name) {
1829 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1830 Err(_) => default,
1831 }
1832}
1833
1834fn parse_role(headers: &HeaderMap) -> Result<AccessRole, &'static str> {
1835 let raw = headers
1836 .get("x-ranvier-role")
1837 .ok_or("missing_x_ranvier_role")?
1838 .to_str()
1839 .map_err(|_| "invalid_x_ranvier_role")?
1840 .trim()
1841 .to_ascii_lowercase();
1842
1843 match raw.as_str() {
1844 "viewer" => Ok(AccessRole::Viewer),
1845 "operator" => Ok(AccessRole::Operator),
1846 "admin" => Ok(AccessRole::Admin),
1847 _ => Err("invalid_x_ranvier_role"),
1848 }
1849}
1850
1851fn has_tenant(headers: &HeaderMap) -> bool {
1852 headers
1853 .get("x-ranvier-tenant")
1854 .and_then(|v| v.to_str().ok())
1855 .map(|v| !v.trim().is_empty())
1856 .unwrap_or(false)
1857}
1858
1859fn policy_error(code: StatusCode, message: &'static str) -> (StatusCode, Json<Value>) {
1860 (
1861 code,
1862 Json(serde_json::json!({
1863 "error": message
1864 })),
1865 )
1866}
1867
1868fn ensure_public_access(
1869 headers: &HeaderMap,
1870 policy: &AuthPolicy,
1871) -> Result<(), (StatusCode, Json<Value>)> {
1872 if !policy.enforce_headers {
1873 return Ok(());
1874 }
1875 parse_role(headers)
1876 .map(|_| ())
1877 .map_err(|e| policy_error(StatusCode::UNAUTHORIZED, e))
1878}
1879
1880fn ensure_internal_access(
1881 headers: &HeaderMap,
1882 policy: &AuthPolicy,
1883) -> Result<(), (StatusCode, Json<Value>)> {
1884 if !policy.enforce_headers {
1885 return Ok(());
1886 }
1887
1888 let role = parse_role(headers).map_err(|e| policy_error(StatusCode::UNAUTHORIZED, e))?;
1889 if role == AccessRole::Viewer {
1890 return Err(policy_error(
1891 StatusCode::FORBIDDEN,
1892 "role_forbidden_for_internal_endpoint",
1893 ));
1894 }
1895 if policy.require_tenant_for_internal && !has_tenant(headers) {
1896 return Err(policy_error(
1897 StatusCode::FORBIDDEN,
1898 "missing_x_ranvier_tenant",
1899 ));
1900 }
1901 Ok(())
1902}
1903
1904async fn api_get_routes(
1907 headers: HeaderMap,
1908 State(state): State<InspectorState>,
1909) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1910 ensure_internal_access(&headers, &state.auth_policy)?;
1911
1912 let registered = routes::list_routes();
1913
1914 let schematic = state.schematic.lock().unwrap();
1916 let mut route_list: Vec<Value> = registered
1917 .iter()
1918 .map(|r| {
1919 let node_schemas = schematic.nodes.iter().find(|n| {
1921 Some(r.circuit_name.as_deref().unwrap_or(""))
1922 == Some(n.label.as_str())
1923 });
1924
1925 serde_json::json!({
1926 "method": r.method,
1927 "path": r.path,
1928 "circuit_name": r.circuit_name,
1929 "input_schema": r.input_schema.clone().or_else(|| node_schemas.and_then(|n| n.input_schema.clone())),
1930 "output_schema": r.output_schema.clone().or_else(|| node_schemas.and_then(|n| n.output_schema.clone())),
1931 })
1932 })
1933 .collect();
1934
1935 if route_list.is_empty() {
1937 route_list = schematic
1938 .nodes
1939 .iter()
1940 .filter(|n| n.input_schema.is_some() || n.output_schema.is_some())
1941 .map(|n| {
1942 serde_json::json!({
1943 "circuit_name": n.label,
1944 "input_type": n.input_type,
1945 "output_type": n.output_type,
1946 "input_schema": n.input_schema,
1947 "output_schema": n.output_schema,
1948 })
1949 })
1950 .collect();
1951 }
1952
1953 Ok(inspector_envelope(
1954 "inspector.routes.v1",
1955 serde_json::json!({
1956 "count": route_list.len(),
1957 "routes": route_list
1958 }),
1959 ))
1960}
1961
1962async fn api_post_routes_schema(
1963 headers: HeaderMap,
1964 State(state): State<InspectorState>,
1965 Json(body): Json<routes::SchemaLookupRequest>,
1966) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
1967 ensure_internal_access(&headers, &state.auth_policy)?;
1968
1969 if let Some(route) = routes::find_route(&body.method, &body.path) {
1971 if let Some(schema) = route.input_schema {
1972 return Ok(inspector_envelope(
1973 "inspector.route_schema.v1",
1974 serde_json::json!({
1975 "method": body.method,
1976 "path": body.path,
1977 "schema": schema
1978 }),
1979 ));
1980 }
1981 }
1982
1983 let schematic = state.schematic.lock().unwrap();
1985 for node in &schematic.nodes {
1986 if let Some(schema) = &node.input_schema {
1987 return Ok(inspector_envelope(
1988 "inspector.route_schema.v1",
1989 serde_json::json!({
1990 "method": body.method,
1991 "path": body.path,
1992 "schema": schema
1993 }),
1994 ));
1995 }
1996 }
1997
1998 Err(policy_error(StatusCode::NOT_FOUND, "schema_not_found"))
1999}
2000
2001async fn api_post_routes_sample(
2002 headers: HeaderMap,
2003 State(state): State<InspectorState>,
2004 Json(body): Json<routes::SampleRequest>,
2005) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
2006 ensure_internal_access(&headers, &state.auth_policy)?;
2007
2008 let schema_val = {
2010 if let Some(route) = routes::find_route(&body.method, &body.path) {
2011 route.input_schema
2012 } else {
2013 let schematic = state.schematic.lock().unwrap();
2015 schematic
2016 .nodes
2017 .iter()
2018 .find_map(|n| n.input_schema.clone())
2019 }
2020 };
2021
2022 let Some(schema_val) = schema_val else {
2023 return Err(policy_error(StatusCode::NOT_FOUND, "schema_not_found"));
2024 };
2025
2026 let sample = match body.mode.as_str() {
2027 "random" => schema::generate_sample(&schema_val),
2028 _ => schema::generate_template(&schema_val),
2029 };
2030
2031 Ok(inspector_envelope(
2032 "inspector.route_sample.v1",
2033 serde_json::json!({
2034 "method": body.method,
2035 "path": body.path,
2036 "mode": body.mode,
2037 "sample": sample
2038 }),
2039 ))
2040}
2041
2042async fn api_post_relay(
2043 headers: HeaderMap,
2044 State(state): State<InspectorState>,
2045 Json(body): Json<relay::RelayRequest>,
2046) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
2047 ensure_internal_access(&headers, &state.auth_policy)?;
2048
2049 let mode = InspectorMode::from_env();
2051 if mode != InspectorMode::Dev {
2052 return Err(policy_error(
2053 StatusCode::FORBIDDEN,
2054 "relay_only_in_dev_mode",
2055 ));
2056 }
2057
2058 let Some(relay_state) = &state.relay_state else {
2059 return Err((
2060 StatusCode::SERVICE_UNAVAILABLE,
2061 Json(serde_json::json!({
2062 "error": "relay_not_configured",
2063 "message": "No relay target configured. Use Inspector::with_relay_target() to enable."
2064 })),
2065 ));
2066 };
2067
2068 match relay::execute_relay(relay_state, body).await {
2069 Ok(response) => Ok(inspector_envelope(
2070 "inspector.relay.v1",
2071 serde_json::to_value(&response).unwrap_or(Value::Null),
2072 )),
2073 Err(err) => Err((
2074 StatusCode::BAD_GATEWAY,
2075 Json(serde_json::to_value(&err).unwrap_or(Value::Null)),
2076 )),
2077 }
2078}
2079
2080async fn api_get_stored_traces(
2081 headers: HeaderMap,
2082 State(state): State<InspectorState>,
2083) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
2084 ensure_internal_access(&headers, &state.auth_policy)?;
2085 state.bearer_auth.validate(&headers)?;
2086
2087 let Some(store) = &state.trace_store else {
2088 return Ok(inspector_envelope(
2089 "inspector.traces_stored.v1",
2090 serde_json::json!({
2091 "total": 0,
2092 "traces": [],
2093 "note": "No trace store configured"
2094 }),
2095 ));
2096 };
2097
2098 let query = trace_store::TraceQuery::default();
2099 match store.query(query).await {
2100 Ok(traces) => Ok(inspector_envelope(
2101 "inspector.traces_stored.v1",
2102 serde_json::json!({
2103 "total": traces.len(),
2104 "traces": traces
2105 }),
2106 )),
2107 Err(e) => Err((
2108 StatusCode::INTERNAL_SERVER_ERROR,
2109 Json(serde_json::json!({ "error": "trace_store_error", "message": e })),
2110 )),
2111 }
2112}
2113
2114#[cfg(test)]
2115mod tests {
2116 use super::*;
2117 use ranvier_core::schematic::Schematic;
2118 use std::time::Duration;
2119
2120 fn reserve_listener() -> (u16, tokio::net::TcpListener) {
2121 let std_listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
2122 std_listener.set_nonblocking(true).expect("set nonblocking");
2123 let port = std_listener.local_addr().expect("local addr").port();
2124 let listener =
2125 tokio::net::TcpListener::from_std(std_listener).expect("tokio listener conversion");
2126 (port, listener)
2127 }
2128
2129 async fn wait_ready(port: u16) {
2130 let client = reqwest::Client::new();
2131 for _ in 0..30 {
2132 if client
2133 .get(format!("http://127.0.0.1:{port}/schematic"))
2134 .send()
2135 .await
2136 .map(|_| true)
2137 .unwrap_or(false)
2138 {
2139 return;
2140 }
2141 tokio::time::sleep(Duration::from_millis(100)).await;
2142 }
2143 panic!("inspector server did not become ready");
2144 }
2145
2146 #[test]
2147 fn redaction_defaults_public_and_internal_surfaces() {
2148 let policy = TelemetryRedactionPolicy::default();
2149 let src = serde_json::json!({
2150 "email": "user@example.com",
2151 "summary": { "ok": true }
2152 });
2153
2154 let public = apply_projection_redaction(src.clone(), ProjectionSurface::Public, &policy);
2155 let internal = apply_projection_redaction(src, ProjectionSurface::Internal, &policy);
2156
2157 assert_eq!(public["email"], "[REDACTED]");
2158 assert_eq!(public["summary"]["ok"], true);
2159 assert_eq!(internal["email"], "user@example.com");
2160 }
2161
2162 #[test]
2163 fn strict_mode_filters_attribute_bag_by_allowlist() {
2164 let policy = TelemetryRedactionPolicy {
2165 mode_override: Some(RedactionMode::Strict),
2166 allow_keys: {
2167 let mut keys = std::collections::HashSet::new();
2168 keys.insert("ranvier.circuit".to_string());
2169 keys
2170 },
2171 ..Default::default()
2172 };
2173
2174 let src = serde_json::json!({
2175 "attributes": {
2176 "ranvier.circuit": "CheckoutCircuit",
2177 "customer_id": "u-123",
2178 "api_key": "secret-key"
2179 }
2180 });
2181
2182 let out = apply_projection_redaction(src, ProjectionSurface::Internal, &policy);
2183 assert_eq!(out["attributes"]["ranvier.circuit"], "CheckoutCircuit");
2184 assert_eq!(out["attributes"]["api_key"], "[REDACTED]");
2185 assert!(out["attributes"].get("customer_id").is_none());
2186 }
2187
2188 #[test]
2189 fn custom_sensitive_patterns_are_applied() {
2190 let mut policy = TelemetryRedactionPolicy::default();
2191 policy.sensitive_patterns.push("tenant_id".to_string());
2192
2193 let src = serde_json::json!({
2194 "tenant_id": "team-a",
2195 "trace_id": "abc123"
2196 });
2197
2198 let out = apply_projection_redaction(src, ProjectionSurface::Public, &policy);
2199 assert_eq!(out["tenant_id"], "[REDACTED]");
2200 assert_eq!(out["trace_id"], "abc123");
2201 }
2202
2203 #[tokio::test]
2204 async fn dev_mode_exposes_quick_view_and_internal_routes() {
2205 let (port, listener) = reserve_listener();
2206 let inspector = Inspector::new(Schematic::new("dev-test"), port).with_mode("dev");
2207 let handle = tokio::spawn(async move {
2208 let _ = inspector.serve_with_listener(listener).await;
2209 });
2210 wait_ready(port).await;
2211
2212 let client = reqwest::Client::new();
2213 let quick = client
2214 .get(format!("http://127.0.0.1:{port}/quick-view"))
2215 .send()
2216 .await
2217 .expect("quick-view request");
2218 let internal = client
2219 .get(format!("http://127.0.0.1:{port}/trace/internal"))
2220 .send()
2221 .await
2222 .expect("internal request");
2223 let events = client
2224 .get(format!("http://127.0.0.1:{port}/events"))
2225 .send()
2226 .await
2227 .expect("events request");
2228 let circuits = client
2229 .get(format!("http://127.0.0.1:{port}/inspector/circuits"))
2230 .send()
2231 .await
2232 .expect("circuits request");
2233 let bus = client
2234 .get(format!("http://127.0.0.1:{port}/inspector/bus"))
2235 .send()
2236 .await
2237 .expect("bus request");
2238 let timeline = client
2239 .get(format!(
2240 "http://127.0.0.1:{port}/inspector/timeline/bootstrap"
2241 ))
2242 .send()
2243 .await
2244 .expect("timeline request");
2245
2246 assert_eq!(quick.status(), reqwest::StatusCode::OK);
2247 assert_eq!(internal.status(), reqwest::StatusCode::OK);
2248 assert_ne!(events.status(), reqwest::StatusCode::NOT_FOUND);
2249 assert_eq!(circuits.status(), reqwest::StatusCode::OK);
2250 assert_eq!(bus.status(), reqwest::StatusCode::OK);
2251 assert_eq!(timeline.status(), reqwest::StatusCode::OK);
2252 let circuits_json: Value =
2253 serde_json::from_str(&circuits.text().await.expect("circuits text"))
2254 .expect("circuits json");
2255 let bus_json: Value =
2256 serde_json::from_str(&bus.text().await.expect("bus text")).expect("bus json");
2257 let timeline_json: Value =
2258 serde_json::from_str(&timeline.text().await.expect("timeline text"))
2259 .expect("timeline json");
2260 assert_eq!(circuits_json["kind"], "inspector.circuits.v1");
2261 assert_eq!(bus_json["kind"], "inspector.bus.v1");
2262 assert_eq!(timeline_json["kind"], "inspector.timeline.v1");
2263
2264 handle.abort();
2265 }
2266
2267 #[tokio::test]
2268 async fn prod_mode_hides_quick_view_and_internal_routes() {
2269 let (port, listener) = reserve_listener();
2270 let inspector = Inspector::new(Schematic::new("prod-test"), port).with_mode("prod");
2271 let handle = tokio::spawn(async move {
2272 let _ = inspector.serve_with_listener(listener).await;
2273 });
2274 wait_ready(port).await;
2275
2276 let client = reqwest::Client::new();
2277 let quick = client
2278 .get(format!("http://127.0.0.1:{port}/quick-view"))
2279 .send()
2280 .await
2281 .expect("quick-view request");
2282 let internal = client
2283 .get(format!("http://127.0.0.1:{port}/trace/internal"))
2284 .send()
2285 .await
2286 .expect("internal request");
2287 let events = client
2288 .get(format!("http://127.0.0.1:{port}/events"))
2289 .send()
2290 .await
2291 .expect("events request");
2292 let circuits = client
2293 .get(format!("http://127.0.0.1:{port}/inspector/circuits"))
2294 .send()
2295 .await
2296 .expect("circuits request");
2297 let public = client
2298 .get(format!("http://127.0.0.1:{port}/trace/public"))
2299 .send()
2300 .await
2301 .expect("public request");
2302
2303 assert_eq!(quick.status(), reqwest::StatusCode::NOT_FOUND);
2304 assert_eq!(internal.status(), reqwest::StatusCode::NOT_FOUND);
2305 assert_eq!(events.status(), reqwest::StatusCode::NOT_FOUND);
2306 assert_eq!(circuits.status(), reqwest::StatusCode::NOT_FOUND);
2307 assert_eq!(public.status(), reqwest::StatusCode::OK);
2308
2309 handle.abort();
2310 }
2311
2312 #[tokio::test]
2313 async fn timeline_endpoint_returns_not_found_for_unknown_request() {
2314 let (port, listener) = reserve_listener();
2315 let inspector = Inspector::new(Schematic::new("timeline-test"), port).with_mode("dev");
2316 let handle = tokio::spawn(async move {
2317 let _ = inspector.serve_with_listener(listener).await;
2318 });
2319 wait_ready(port).await;
2320
2321 let client = reqwest::Client::new();
2322 let timeline = client
2323 .get(format!(
2324 "http://127.0.0.1:{port}/inspector/timeline/unknown-request"
2325 ))
2326 .send()
2327 .await
2328 .expect("timeline request");
2329 assert_eq!(timeline.status(), reqwest::StatusCode::NOT_FOUND);
2330
2331 handle.abort();
2332 }
2333
2334 #[tokio::test]
2335 async fn auth_enforcement_rejects_missing_role_header() {
2336 let (port, listener) = reserve_listener();
2337 let inspector = Inspector::new(Schematic::new("auth-public"), port)
2338 .with_mode("dev")
2339 .with_auth_enforcement(true);
2340 let handle = tokio::spawn(async move {
2341 let _ = inspector.serve_with_listener(listener).await;
2342 });
2343 wait_ready(port).await;
2344
2345 let client = reqwest::Client::new();
2346 let schematic = client
2347 .get(format!("http://127.0.0.1:{port}/schematic"))
2348 .send()
2349 .await
2350 .expect("schematic request");
2351 assert_eq!(schematic.status(), reqwest::StatusCode::UNAUTHORIZED);
2352
2353 handle.abort();
2354 }
2355
2356 #[tokio::test]
2357 async fn auth_enforcement_blocks_viewer_internal_and_requires_tenant() {
2358 let (port, listener) = reserve_listener();
2359 let inspector = Inspector::new(Schematic::new("auth-internal"), port)
2360 .with_mode("dev")
2361 .with_auth_enforcement(true)
2362 .with_require_tenant_for_internal(true);
2363 let handle = tokio::spawn(async move {
2364 let _ = inspector.serve_with_listener(listener).await;
2365 });
2366 wait_ready(port).await;
2367
2368 let client = reqwest::Client::new();
2369
2370 let viewer_internal = client
2371 .get(format!("http://127.0.0.1:{port}/trace/internal"))
2372 .header("X-Ranvier-Role", "viewer")
2373 .send()
2374 .await
2375 .expect("viewer internal request");
2376 assert_eq!(viewer_internal.status(), reqwest::StatusCode::FORBIDDEN);
2377
2378 let operator_no_tenant = client
2379 .get(format!("http://127.0.0.1:{port}/trace/internal"))
2380 .header("X-Ranvier-Role", "operator")
2381 .send()
2382 .await
2383 .expect("operator internal request");
2384 assert_eq!(operator_no_tenant.status(), reqwest::StatusCode::FORBIDDEN);
2385
2386 let operator_with_tenant = client
2387 .get(format!("http://127.0.0.1:{port}/trace/internal"))
2388 .header("X-Ranvier-Role", "operator")
2389 .header("X-Ranvier-Tenant", "team-a")
2390 .send()
2391 .await
2392 .expect("operator internal with tenant request");
2393 assert_eq!(operator_with_tenant.status(), reqwest::StatusCode::OK);
2394
2395 handle.abort();
2396 }
2397}