Skip to main content

running_process/broker/server/
admin.rs

1//! Admin verb rendering for the v1 broker.
2
3use std::io::{self, Read, Write};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use interprocess::local_socket::traits::Listener;
7use prost::Message;
8use serde::Serialize;
9use serde_json::json;
10
11use crate::broker::protocol::{
12    read_frame, write_frame, AdminReply, AdminReplyKind, AdminRequest, AdminVerb, Frame, FrameKind,
13    FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES, MAX_HELLO_BYTES,
14    PROTOCOL_VERSION,
15};
16
17use super::backend_registry::BackendRegistry;
18use super::connection::{bind_local_socket, BrokerConnectionError, LocalSocketCleanup};
19use super::service_def_loader::{service_definition_dir, SERVICE_DEF_DIR_ENV};
20use super::spawn_coordinator::{
21    SpawnBudgetSnapshot, DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, DEFAULT_SPAWN_BUDGET_WINDOW,
22};
23use crate::broker::server::metrics::{MetricKind, BROKER_METRICS};
24
25/// Frozen admin JSON schema version.
26pub const ADMIN_SCHEMA_VERSION: u32 = 1;
27/// Payload protocol value for v1 admin request/reply frames.
28///
29/// Re-exported from the authoritative
30/// [`registry`](crate::broker::protocol::registry), which owns every v1
31/// payload-protocol ID (#375).
32pub use crate::broker::protocol::registry::ADMIN_PAYLOAD_PROTOCOL;
33
34const DIAGNOSTIC_BUNDLE_FORMAT: &str = "tar.gz";
35const DIAGNOSTIC_BUNDLE_MODE: &str = "metadata-only";
36const DIAGNOSTIC_REDACTIONS: &[&str] = &["home", "secret-env", "acl-identities"];
37
38/// Snapshot consumed by admin verb renderers.
39#[derive(Clone, Debug)]
40pub struct AdminSnapshot {
41    /// Broker instance identifier.
42    pub broker_instance: String,
43    /// Broker process id.
44    pub broker_pid: u32,
45    /// Snapshot generation timestamp.
46    pub generated_at_unix_ms: u64,
47    /// Time since broker start.
48    pub uptime: Duration,
49    /// Whether new Hello requests are accepted.
50    pub accepting_hello: bool,
51    /// Open control-plane connections.
52    pub connections_open: u64,
53    /// Known backend rows.
54    pub backends: Vec<AdminBackend>,
55    /// Known spawn budget rows.
56    pub spawn_budgets: Vec<AdminSpawnBudget>,
57    /// Whether the broker self-demoted under fd pressure (#390).
58    pub fd_pressure_demoted: bool,
59    /// Inode usage of the daemon data dir filesystem (#390).
60    pub inode_pressure: AdminInodePressure,
61}
62
63/// Inode usage row for `status --json` (#390). On Windows (and on Unix
64/// filesystems without a fixed inode table) `supported` is false and the
65/// counters are absent rather than faked.
66#[derive(Clone, Debug, Default)]
67pub struct AdminInodePressure {
68    /// Whether inode accounting applies to the probed filesystem.
69    pub supported: bool,
70    /// Probe failure detail, when the probe itself errored.
71    pub error: Option<String>,
72    /// Total inodes (only meaningful when `supported`).
73    pub total: u64,
74    /// Free inodes (only meaningful when `supported`).
75    pub free: u64,
76}
77
78impl AdminInodePressure {
79    /// Probe the daemon data dir filesystem.
80    pub fn probe() -> Self {
81        match crate::broker::fs_health::daemon_data_dir_inode_usage() {
82            Ok(Some(usage)) => Self {
83                supported: true,
84                error: None,
85                total: usage.total,
86                free: usage.free,
87            },
88            Ok(None) => Self::default(),
89            Err(err) => Self {
90                supported: false,
91                error: Some(err.to_string()),
92                total: 0,
93                free: 0,
94            },
95        }
96    }
97
98    fn to_json(&self) -> serde_json::Value {
99        if self.supported {
100            let usage = crate::broker::fs_health::InodeUsage {
101                total: self.total,
102                free: self.free,
103            };
104            json!({
105                "supported": true,
106                "inodes_total": self.total,
107                "inodes_free": self.free,
108                "inodes_used": usage.used(),
109                "used_ratio": usage.used_ratio(),
110            })
111        } else {
112            json!({
113                "supported": false,
114                "detail": self.error.clone().unwrap_or_else(|| {
115                    "inode accounting not applicable on this filesystem".into()
116                }),
117            })
118        }
119    }
120}
121
122impl AdminSnapshot {
123    /// Local process snapshot used until pipe-backed admin transport lands.
124    pub fn local_not_serving() -> Self {
125        Self {
126            broker_instance: "local".into(),
127            broker_pid: std::process::id(),
128            generated_at_unix_ms: unix_now_ms(),
129            uptime: Duration::ZERO,
130            accepting_hello: false,
131            connections_open: 0,
132            backends: Vec::new(),
133            spawn_budgets: Vec::new(),
134            fd_pressure_demoted: false,
135            inode_pressure: AdminInodePressure::probe(),
136        }
137    }
138
139    /// Build a live snapshot from broker state.
140    pub fn from_registry(
141        broker_instance: impl Into<String>,
142        uptime: Duration,
143        accepting_hello: bool,
144        connections_open: u64,
145        registry: &BackendRegistry,
146        spawn_budgets: &[SpawnBudgetSnapshot],
147    ) -> Self {
148        Self::from_registry_at(
149            broker_instance,
150            std::process::id(),
151            unix_now_ms(),
152            uptime,
153            accepting_hello,
154            connections_open,
155            registry,
156            spawn_budgets,
157        )
158    }
159
160    /// Testable variant of [`Self::from_registry`] with deterministic metadata.
161    #[allow(clippy::too_many_arguments)]
162    pub fn from_registry_at(
163        broker_instance: impl Into<String>,
164        broker_pid: u32,
165        generated_at_unix_ms: u64,
166        uptime: Duration,
167        accepting_hello: bool,
168        connections_open: u64,
169        registry: &BackendRegistry,
170        spawn_budgets: &[SpawnBudgetSnapshot],
171    ) -> Self {
172        Self {
173            broker_instance: broker_instance.into(),
174            broker_pid,
175            generated_at_unix_ms,
176            uptime,
177            accepting_hello,
178            connections_open,
179            backends: registry
180                .iter()
181                .map(|(_key, handle)| AdminBackend {
182                    service_name: handle.service_name.clone(),
183                    service_version: handle.service_version.clone(),
184                    pid: handle.daemon_process.pid,
185                    backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
186                    last_active_unix_ms: handle.daemon_process.started_at_unix_ms,
187                    state: if handle.is_alive() {
188                        "running".into()
189                    } else {
190                        "stale".into()
191                    },
192                    last_hello_unix_ms: 0,
193                    last_error: None,
194                })
195                .collect(),
196            spawn_budgets: spawn_budgets
197                .iter()
198                .map(AdminSpawnBudget::from_snapshot)
199                .collect(),
200            fd_pressure_demoted: false,
201            inode_pressure: AdminInodePressure::probe(),
202        }
203    }
204
205    /// Record the broker's fd-pressure demotion state (#390).
206    pub fn with_fd_pressure_demoted(mut self, demoted: bool) -> Self {
207        self.fd_pressure_demoted = demoted;
208        self
209    }
210
211    /// Override the inode-pressure row (tests / deterministic snapshots).
212    pub fn with_inode_pressure(mut self, inode_pressure: AdminInodePressure) -> Self {
213        self.inode_pressure = inode_pressure;
214        self
215    }
216}
217
218/// Backend row used in admin output.
219#[derive(Clone, Debug)]
220pub struct AdminBackend {
221    /// Logical service name.
222    pub service_name: String,
223    /// Service version.
224    pub service_version: String,
225    /// Backend process id.
226    pub pid: u32,
227    /// Backend pipe/socket path.
228    pub backend_pipe: String,
229    /// Last activity timestamp.
230    pub last_active_unix_ms: u64,
231    /// Human-readable state.
232    pub state: String,
233    /// Last Hello timestamp.
234    pub last_hello_unix_ms: u64,
235    /// Last backend error.
236    pub last_error: Option<String>,
237}
238
239/// Spawn budget row used in admin output.
240#[derive(Clone, Debug)]
241pub struct AdminSpawnBudget {
242    /// Broker instance identifier.
243    pub broker_instance: String,
244    /// Logical service name.
245    pub service_name: String,
246    /// Service version.
247    pub service_version: String,
248    /// Attempts used in the active window.
249    pub attempts_used: u32,
250    /// Attempts remaining in the active window.
251    pub remaining: u32,
252    /// Whether a spawn is currently in flight.
253    pub in_flight: bool,
254    /// Retry-after hint when exhausted.
255    pub retry_after_ms: Option<u64>,
256}
257
258impl AdminSpawnBudget {
259    fn from_snapshot(snapshot: &SpawnBudgetSnapshot) -> Self {
260        Self {
261            broker_instance: snapshot.key.instance.id(),
262            service_name: snapshot.key.service_name.clone(),
263            service_version: snapshot.key.service_version.clone(),
264            attempts_used: snapshot.attempts_used,
265            remaining: snapshot.remaining,
266            in_flight: snapshot.in_flight,
267            retry_after_ms: snapshot
268                .retry_after
269                .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)),
270        }
271    }
272}
273
274/// Render `status --json`.
275pub fn render_status_json(snapshot: &AdminSnapshot) -> String {
276    json!({
277        "schema_version": ADMIN_SCHEMA_VERSION,
278        "command": "status",
279        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
280        "broker_instance": snapshot.broker_instance,
281        "broker_pid": snapshot.broker_pid,
282        "uptime_seconds": snapshot.uptime.as_secs_f64(),
283        "accepting_hello": snapshot.accepting_hello,
284        "connections_open": snapshot.connections_open,
285        "fd_pressure": {
286            "demoted": snapshot.fd_pressure_demoted,
287        },
288        "inode_pressure": snapshot.inode_pressure.to_json(),
289        "backends": snapshot.backends.iter().map(|backend| {
290            json!({
291                "service_name": backend.service_name,
292                "service_version": backend.service_version,
293                "pid": backend.pid,
294                "backend_pipe": backend.backend_pipe,
295                "last_active_unix_ms": backend.last_active_unix_ms,
296                "state": backend.state,
297            })
298        }).collect::<Vec<_>>(),
299    })
300    .to_string()
301}
302
303/// Render `dump --json`.
304pub fn render_dump_json(snapshot: &AdminSnapshot) -> String {
305    json!({
306        "schema_version": ADMIN_SCHEMA_VERSION,
307        "command": "dump",
308        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
309        "broker_instance": snapshot.broker_instance,
310        "effective_config": effective_config_json(snapshot),
311        "backend_table": snapshot.backends.iter().map(|backend| {
312            json!({
313                "service_name": backend.service_name,
314                "service_version": backend.service_version,
315                "pid": backend.pid,
316                "backend_pipe": backend.backend_pipe,
317                "state": backend.state,
318            })
319        }).collect::<Vec<_>>(),
320        "spawn_budgets": snapshot.spawn_budgets.iter().map(|budget| {
321            json!({
322                "broker_instance": budget.broker_instance,
323                "service_name": budget.service_name,
324                "service_version": budget.service_version,
325                "attempts_used": budget.attempts_used,
326                "remaining": budget.remaining,
327                "in_flight": budget.in_flight,
328                "retry_after_ms": budget.retry_after_ms,
329            })
330        }).collect::<Vec<_>>(),
331        "recent_lifecycle_events": [],
332    })
333    .to_string()
334}
335
336/// Render `list-instances --json`.
337pub fn render_list_instances_json(snapshot: &AdminSnapshot) -> String {
338    json!({
339        "schema_version": ADMIN_SCHEMA_VERSION,
340        "command": "list-instances",
341        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
342        "instances": [{
343            "broker_instance": snapshot.broker_instance,
344            "pipe": "",
345            "pid": snapshot.broker_pid,
346            "state": if snapshot.accepting_hello { "running" } else { "not-serving" },
347        }],
348    })
349    .to_string()
350}
351
352/// Render `backend-health <service> --json`.
353pub fn render_backend_health_json(snapshot: &AdminSnapshot, service_name: &str) -> String {
354    json!({
355        "schema_version": ADMIN_SCHEMA_VERSION,
356        "command": "backend-health",
357        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
358        "service_name": service_name,
359        "backends": snapshot.backends.iter()
360            .filter(|backend| backend.service_name == service_name)
361            .map(|backend| {
362                json!({
363                    "service_version": backend.service_version,
364                    "pid": backend.pid,
365                    "state": backend.state,
366                    "last_hello_unix_ms": backend.last_hello_unix_ms,
367                    "last_error": backend.last_error,
368                })
369            })
370            .collect::<Vec<_>>(),
371    })
372    .to_string()
373}
374
375/// Render `config --effective --json`.
376pub fn render_config_json(snapshot: &AdminSnapshot) -> String {
377    json!({
378        "schema_version": ADMIN_SCHEMA_VERSION,
379        "command": "config",
380        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
381        "values": effective_config_json(snapshot),
382    })
383    .to_string()
384}
385
386/// Render `diagnose --output <path>` summary JSON.
387pub fn render_diagnose_json(snapshot: &AdminSnapshot, output: &str) -> String {
388    let entries = diagnostic_bundle_entries_json(snapshot);
389    json!({
390        "schema_version": ADMIN_SCHEMA_VERSION,
391        "command": "diagnose",
392        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
393        "output": output,
394        "bundle": {
395            "format": DIAGNOSTIC_BUNDLE_FORMAT,
396            "mode": DIAGNOSTIC_BUNDLE_MODE,
397            "created": false,
398            "entries": entries,
399        },
400        "files": diagnostic_bundle_file_paths(snapshot),
401        "redactions": diagnostic_redaction_names(),
402        "redaction_policy": diagnostic_redaction_policy_json(),
403    })
404    .to_string()
405}
406
407/// Render OpenMetrics text.
408pub fn render_metrics_text(snapshot: &AdminSnapshot) -> String {
409    let mut out = String::new();
410    for metric in BROKER_METRICS {
411        out.push_str("# TYPE ");
412        out.push_str(metric.name);
413        out.push(' ');
414        out.push_str(metric_kind_name(metric.kind));
415        out.push('\n');
416        if metric.labels.is_empty() {
417            out.push_str(metric.name);
418            out.push(' ');
419            out.push_str(&metric_value(metric.name, snapshot));
420            out.push('\n');
421        }
422    }
423    out.push_str("# EOF\n");
424    out
425}
426
427/// Health endpoint body.
428pub fn render_healthz() -> &'static str {
429    "ok\n"
430}
431
432/// Readiness endpoint body.
433pub fn render_readyz(snapshot: &AdminSnapshot) -> &'static str {
434    if snapshot.accepting_hello {
435        "ready\n"
436    } else {
437        "not ready\n"
438    }
439}
440
441/// Render one typed admin request into a typed admin reply.
442pub fn render_admin_reply(snapshot: &AdminSnapshot, request: &AdminRequest) -> AdminReply {
443    match AdminVerb::try_from(request.verb) {
444        Ok(AdminVerb::Status) => {
445            if request.json {
446                json_reply(render_status_json(snapshot))
447            } else {
448                text_reply(
449                    format!(
450                        "broker_instance: {}\naccepting_hello: {}\n",
451                        snapshot.broker_instance, snapshot.accepting_hello
452                    ),
453                    0,
454                )
455            }
456        }
457        Ok(AdminVerb::Dump) => json_reply(render_dump_json(snapshot)),
458        Ok(AdminVerb::ListInstances) => json_reply(render_list_instances_json(snapshot)),
459        Ok(AdminVerb::Healthz) => text_reply(render_healthz(), 0),
460        Ok(AdminVerb::Readyz) => {
461            let exit_code = if snapshot.accepting_hello { 0 } else { 1 };
462            text_reply(render_readyz(snapshot), exit_code)
463        }
464        Ok(AdminVerb::BackendHealth) => {
465            let service_name = if request.service_name.is_empty() {
466                "unknown"
467            } else {
468                &request.service_name
469            };
470            json_reply(render_backend_health_json(snapshot, service_name))
471        }
472        Ok(AdminVerb::Config) => json_reply(render_config_json(snapshot)),
473        Ok(AdminVerb::Diagnose) => {
474            let output = if request.output_path.is_empty() {
475                "bundle.tar.gz"
476            } else {
477                &request.output_path
478            };
479            json_reply(render_diagnose_json(snapshot, output))
480        }
481        Ok(AdminVerb::Metrics) => AdminReply {
482            kind: AdminReplyKind::Openmetrics as i32,
483            body: render_metrics_text(snapshot),
484            exit_code: 0,
485            content_type: "application/openmetrics-text".into(),
486        },
487        Ok(AdminVerb::Unspecified) | Err(_) => text_reply("unsupported admin verb\n", 2),
488    }
489}
490
491/// Handle one decoded admin frame and return a response frame.
492pub fn handle_admin_frame(
493    frame: Frame,
494    snapshot: &AdminSnapshot,
495) -> Result<Frame, AdminFrameError> {
496    if frame.envelope_version != PROTOCOL_VERSION {
497        return Err(AdminFrameError::UnsupportedEnvelopeVersion(
498            frame.envelope_version,
499        ));
500    }
501    if FrameKind::try_from(frame.kind) != Ok(FrameKind::Request) {
502        return Err(AdminFrameError::UnexpectedKind(frame.kind));
503    }
504    if frame.payload_protocol != ADMIN_PAYLOAD_PROTOCOL {
505        return Err(AdminFrameError::UnexpectedPayloadProtocol(
506            frame.payload_protocol,
507        ));
508    }
509    if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
510        return Err(AdminFrameError::UnsupportedPayloadEncoding(
511            frame.payload_encoding,
512        ));
513    }
514
515    let request =
516        AdminRequest::decode(frame.payload.as_slice()).map_err(AdminFrameError::Decode)?;
517    let reply = render_admin_reply(snapshot, &request);
518    Ok(Frame {
519        envelope_version: PROTOCOL_VERSION,
520        kind: FrameKind::Response as i32,
521        payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
522        payload: reply.encode_to_vec(),
523        request_id: frame.request_id,
524        payload_encoding: PayloadEncoding::None as i32,
525        deadline_unix_ms: 0,
526        traceparent: frame.traceparent,
527        tracestate: frame.tracestate,
528    })
529}
530
531/// Handle one already-accepted broker admin connection.
532///
533/// The connection reads one v1-framed [`Frame`] carrying an [`AdminRequest`],
534/// dispatches through [`handle_admin_frame`], writes one v1-framed response
535/// [`Frame`] carrying an [`AdminReply`], then returns the decoded reply for
536/// tests and callers that need exit-code metadata.
537pub fn handle_admin_connection<S: Read + Write>(
538    stream: &mut S,
539    snapshot: &AdminSnapshot,
540) -> Result<AdminReply, AdminConnectionError> {
541    let request_bytes = read_frame(stream)?;
542    let request_frame =
543        Frame::decode(request_bytes.as_slice()).map_err(AdminConnectionError::DecodeFrame)?;
544    let response_frame = handle_admin_frame(request_frame, snapshot)?;
545    write_frame(stream, &response_frame.encode_to_vec())?;
546    AdminReply::decode(response_frame.payload.as_slice()).map_err(AdminConnectionError::DecodeReply)
547}
548
549/// Run one blocking local-socket accept and serve exactly one admin request.
550///
551/// This is the admin-side counterpart to `serve_one_local_socket` for Hello.
552/// The full long-lived broker loop can reuse [`handle_admin_connection`] after
553/// selecting an admin connection from the shared accept path.
554pub fn serve_one_admin_socket(
555    socket_path: &str,
556    snapshot: &AdminSnapshot,
557) -> Result<AdminReply, AdminConnectionError> {
558    let listener = bind_local_socket(socket_path)?;
559    let cleanup = LocalSocketCleanup(socket_path);
560    let result = (|| {
561        let mut stream = listener.accept()?;
562        handle_admin_connection(&mut stream, snapshot)
563    })();
564    drop(listener);
565    drop(cleanup);
566    result
567}
568
569/// Errors raised by admin frame validation/dispatch.
570#[derive(Debug, thiserror::Error)]
571pub enum AdminFrameError {
572    /// Unsupported frame envelope version.
573    #[error("unsupported admin frame envelope_version {0}")]
574    UnsupportedEnvelopeVersion(u32),
575    /// Admin frames must be requests.
576    #[error("admin frame kind must be REQUEST, got {0}")]
577    UnexpectedKind(i32),
578    /// Admin frame used the wrong payload protocol.
579    #[error("admin frame payload_protocol must be 0xAD01, got {0}")]
580    UnexpectedPayloadProtocol(u32),
581    /// Admin frame payload must be uncompressed.
582    #[error("admin frame payload must not be compressed, got {0}")]
583    UnsupportedPayloadEncoding(i32),
584    /// AdminRequest protobuf decoding failed.
585    #[error(transparent)]
586    Decode(prost::DecodeError),
587}
588
589/// Errors raised while serving a framed admin connection.
590#[derive(Debug, thiserror::Error)]
591pub enum AdminConnectionError {
592    /// v1 framing failed.
593    #[error(transparent)]
594    Framing(#[from] FramingError),
595    /// The request frame could not be decoded.
596    #[error("failed to decode admin request Frame: {0}")]
597    DecodeFrame(prost::DecodeError),
598    /// The request frame failed admin validation or dispatch.
599    #[error(transparent)]
600    AdminFrame(#[from] AdminFrameError),
601    /// The response payload could not be decoded after dispatch.
602    #[error("failed to decode admin reply payload: {0}")]
603    DecodeReply(prost::DecodeError),
604    /// Local socket binding failed.
605    #[error(transparent)]
606    LocalSocket(#[from] BrokerConnectionError),
607    /// Local socket I/O failed.
608    #[error(transparent)]
609    Io(#[from] io::Error),
610}
611
612fn json_reply(body: String) -> AdminReply {
613    AdminReply {
614        kind: AdminReplyKind::Json as i32,
615        body,
616        exit_code: 0,
617        content_type: "application/json".into(),
618    }
619}
620
621fn text_reply(body: impl Into<String>, exit_code: u32) -> AdminReply {
622    AdminReply {
623        kind: AdminReplyKind::Text as i32,
624        body: body.into(),
625        exit_code,
626        content_type: "text/plain".into(),
627    }
628}
629
630fn metric_kind_name(kind: MetricKind) -> &'static str {
631    match kind {
632        MetricKind::Counter => "counter",
633        MetricKind::Gauge => "gauge",
634        MetricKind::Histogram => "histogram",
635    }
636}
637
638fn metric_value(name: &str, snapshot: &AdminSnapshot) -> String {
639    match name {
640        "running_process_broker_v1_connections_open" => snapshot.connections_open.to_string(),
641        "running_process_broker_v1_fd_usage_ratio" => "0".into(),
642        "running_process_broker_v1_uptime_seconds" => snapshot.uptime.as_secs().to_string(),
643        _ => "0".into(),
644    }
645}
646
647fn effective_config_json(snapshot: &AdminSnapshot) -> serde_json::Value {
648    json!({
649        "broker": {
650            "broker_instance": sourced_value(&snapshot.broker_instance, "runtime"),
651            "broker_pid": sourced_value(snapshot.broker_pid, "runtime"),
652            "accepting_hello": sourced_value(snapshot.accepting_hello, "runtime"),
653        },
654        "protocol": {
655            "admin_payload_protocol": sourced_value(format!("0x{ADMIN_PAYLOAD_PROTOCOL:04X}"), "protocol-v1"),
656            "envelope_version": sourced_value(PROTOCOL_VERSION, "protocol-v1"),
657            "framing_version": sourced_value(ENVELOPE_VERSION, "protocol-v1"),
658        },
659        "limits": {
660            "max_frame_bytes": sourced_value(MAX_FRAME_BYTES, "protocol-v1"),
661            "max_hello_bytes": sourced_value(MAX_HELLO_BYTES, "protocol-v1"),
662            "connections_open": sourced_value(snapshot.connections_open, "runtime"),
663        },
664        "paths": {
665            "service_definition_dir": sourced_value(
666                service_definition_dir().display().to_string(),
667                service_definition_dir_source(),
668            ),
669        },
670        "spawn_budget": {
671            "default_attempts_per_window": sourced_value(DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, "default"),
672            "default_window_ms": sourced_value(duration_ms(DEFAULT_SPAWN_BUDGET_WINDOW), "default"),
673            "active_budget_rows": sourced_value(snapshot.spawn_budgets.len(), "runtime"),
674        },
675        "diagnostics": {
676            "bundle_format": sourced_value(DIAGNOSTIC_BUNDLE_FORMAT, "schema-v1"),
677            "bundle_mode": sourced_value(DIAGNOSTIC_BUNDLE_MODE, "schema-v1"),
678            "redactions": sourced_value(diagnostic_redaction_names(), "schema-v1"),
679        },
680    })
681}
682
683fn service_definition_dir_source() -> &'static str {
684    if std::env::var_os(SERVICE_DEF_DIR_ENV).is_some() {
685        "env:RUNNING_PROCESS_SERVICE_DEF_DIR"
686    } else {
687        "platform-default"
688    }
689}
690
691fn diagnostic_bundle_entries_json(snapshot: &AdminSnapshot) -> Vec<serde_json::Value> {
692    vec![
693        diagnostic_bundle_entry("admin/status.json", "json", "status", true, false, None),
694        diagnostic_bundle_entry("admin/dump.json", "json", "dump", true, true, None),
695        diagnostic_bundle_entry(
696            "config/effective.json",
697            "json",
698            "effective-config",
699            true,
700            false,
701            None,
702        ),
703        diagnostic_bundle_entry(
704            "metrics/openmetrics.txt",
705            "openmetrics",
706            "metrics",
707            true,
708            false,
709            None,
710        ),
711        diagnostic_bundle_entry(
712            "events/lifecycle.jsonl",
713            "jsonl",
714            "lifecycle-events",
715            false,
716            true,
717            None,
718        ),
719        diagnostic_bundle_entry(
720            "manifest/backend-manifests.json",
721            "json",
722            "backend-manifest-index",
723            false,
724            true,
725            None,
726        ),
727        diagnostic_bundle_entry(
728            "process/backends.json",
729            "json",
730            "backend-table",
731            true,
732            true,
733            Some(snapshot.backends.len()),
734        ),
735        diagnostic_bundle_entry(
736            "system/summary.json",
737            "json",
738            "host-summary",
739            false,
740            true,
741            None,
742        ),
743    ]
744}
745
746fn diagnostic_bundle_file_paths(snapshot: &AdminSnapshot) -> Vec<String> {
747    diagnostic_bundle_entries_json(snapshot)
748        .into_iter()
749        .filter_map(|entry| {
750            entry
751                .get("path")
752                .and_then(serde_json::Value::as_str)
753                .map(str::to_owned)
754        })
755        .collect()
756}
757
758fn diagnostic_bundle_entry(
759    path: &str,
760    kind: &str,
761    source: &str,
762    required: bool,
763    redacted: bool,
764    record_count: Option<usize>,
765) -> serde_json::Value {
766    let mut entry = json!({
767        "path": path,
768        "kind": kind,
769        "source": source,
770        "required": required,
771        "redacted": redacted,
772    });
773    if let Some(record_count) = record_count {
774        entry["record_count"] = json!(record_count);
775    }
776    entry
777}
778
779fn diagnostic_redaction_names() -> Vec<&'static str> {
780    DIAGNOSTIC_REDACTIONS.to_vec()
781}
782
783fn diagnostic_redaction_policy_json() -> Vec<serde_json::Value> {
784    vec![
785        json!({
786            "name": "home",
787            "replacement": "~",
788        }),
789        json!({
790            "name": "secret-env",
791            "matches": ["KEY", "TOKEN", "SECRET", "PASS"],
792        }),
793        json!({
794            "name": "acl-identities",
795            "replacement": "stable-hash",
796        }),
797    ]
798}
799
800fn sourced_value(value: impl Serialize, source: &'static str) -> serde_json::Value {
801    json!({
802        "value": value,
803        "source": source,
804    })
805}
806
807fn duration_ms(duration: Duration) -> u64 {
808    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
809}
810
811fn unix_now_ms() -> u64 {
812    SystemTime::now()
813        .duration_since(UNIX_EPOCH)
814        .map(|duration| duration.as_millis() as u64)
815        .unwrap_or(0)
816}