Skip to main content

running_process/broker/server/
admin.rs

1//! Admin verb rendering for the v1 broker.
2
3use std::io::{self, Read, Write};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use interprocess::local_socket::traits::Listener;
7use prost::Message;
8use serde::Serialize;
9use serde_json::json;
10
11use crate::broker::protocol::{
12    read_frame, write_frame, AdminReply, AdminReplyKind, AdminRequest, AdminVerb, Frame, FrameKind,
13    FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES, MAX_HELLO_BYTES,
14};
15
16use super::backend_registry::BackendRegistry;
17use super::connection::{bind_local_socket, BrokerConnectionError, LocalSocketCleanup};
18use super::service_def_loader::{service_definition_dir, SERVICE_DEF_DIR_ENV};
19use super::spawn_coordinator::{
20    SpawnBudgetSnapshot, DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, DEFAULT_SPAWN_BUDGET_WINDOW,
21};
22use crate::broker::server::metrics::{MetricKind, BROKER_METRICS};
23
24/// Frozen admin JSON schema version.
25pub const ADMIN_SCHEMA_VERSION: u32 = 1;
26/// Payload protocol value for v1 admin request/reply frames.
27pub const ADMIN_PAYLOAD_PROTOCOL: u32 = 0xAD01;
28
29const PROTOCOL_VERSION: u32 = 1;
30const DIAGNOSTIC_BUNDLE_FORMAT: &str = "tar.gz";
31const DIAGNOSTIC_BUNDLE_MODE: &str = "metadata-only";
32const DIAGNOSTIC_REDACTIONS: &[&str] = &["home", "secret-env", "acl-identities"];
33
34/// Snapshot consumed by admin verb renderers.
35#[derive(Clone, Debug)]
36pub struct AdminSnapshot {
37    /// Broker instance identifier.
38    pub broker_instance: String,
39    /// Broker process id.
40    pub broker_pid: u32,
41    /// Snapshot generation timestamp.
42    pub generated_at_unix_ms: u64,
43    /// Time since broker start.
44    pub uptime: Duration,
45    /// Whether new Hello requests are accepted.
46    pub accepting_hello: bool,
47    /// Open control-plane connections.
48    pub connections_open: u64,
49    /// Known backend rows.
50    pub backends: Vec<AdminBackend>,
51    /// Known spawn budget rows.
52    pub spawn_budgets: Vec<AdminSpawnBudget>,
53}
54
55impl AdminSnapshot {
56    /// Local process snapshot used until pipe-backed admin transport lands.
57    pub fn local_not_serving() -> Self {
58        Self {
59            broker_instance: "local".into(),
60            broker_pid: std::process::id(),
61            generated_at_unix_ms: unix_now_ms(),
62            uptime: Duration::ZERO,
63            accepting_hello: false,
64            connections_open: 0,
65            backends: Vec::new(),
66            spawn_budgets: Vec::new(),
67        }
68    }
69
70    /// Build a live snapshot from broker state.
71    pub fn from_registry(
72        broker_instance: impl Into<String>,
73        uptime: Duration,
74        accepting_hello: bool,
75        connections_open: u64,
76        registry: &BackendRegistry,
77        spawn_budgets: &[SpawnBudgetSnapshot],
78    ) -> Self {
79        Self::from_registry_at(
80            broker_instance,
81            std::process::id(),
82            unix_now_ms(),
83            uptime,
84            accepting_hello,
85            connections_open,
86            registry,
87            spawn_budgets,
88        )
89    }
90
91    /// Testable variant of [`Self::from_registry`] with deterministic metadata.
92    #[allow(clippy::too_many_arguments)]
93    pub fn from_registry_at(
94        broker_instance: impl Into<String>,
95        broker_pid: u32,
96        generated_at_unix_ms: u64,
97        uptime: Duration,
98        accepting_hello: bool,
99        connections_open: u64,
100        registry: &BackendRegistry,
101        spawn_budgets: &[SpawnBudgetSnapshot],
102    ) -> Self {
103        Self {
104            broker_instance: broker_instance.into(),
105            broker_pid,
106            generated_at_unix_ms,
107            uptime,
108            accepting_hello,
109            connections_open,
110            backends: registry
111                .iter()
112                .map(|(_key, handle)| AdminBackend {
113                    service_name: handle.service_name.clone(),
114                    service_version: handle.service_version.clone(),
115                    pid: handle.daemon_process.pid,
116                    backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
117                    last_active_unix_ms: handle.daemon_process.started_at_unix_ms,
118                    state: if handle.is_alive() {
119                        "running".into()
120                    } else {
121                        "stale".into()
122                    },
123                    last_hello_unix_ms: 0,
124                    last_error: None,
125                })
126                .collect(),
127            spawn_budgets: spawn_budgets
128                .iter()
129                .map(AdminSpawnBudget::from_snapshot)
130                .collect(),
131        }
132    }
133}
134
135/// Backend row used in admin output.
136#[derive(Clone, Debug)]
137pub struct AdminBackend {
138    /// Logical service name.
139    pub service_name: String,
140    /// Service version.
141    pub service_version: String,
142    /// Backend process id.
143    pub pid: u32,
144    /// Backend pipe/socket path.
145    pub backend_pipe: String,
146    /// Last activity timestamp.
147    pub last_active_unix_ms: u64,
148    /// Human-readable state.
149    pub state: String,
150    /// Last Hello timestamp.
151    pub last_hello_unix_ms: u64,
152    /// Last backend error.
153    pub last_error: Option<String>,
154}
155
156/// Spawn budget row used in admin output.
157#[derive(Clone, Debug)]
158pub struct AdminSpawnBudget {
159    /// Broker instance identifier.
160    pub broker_instance: String,
161    /// Logical service name.
162    pub service_name: String,
163    /// Service version.
164    pub service_version: String,
165    /// Attempts used in the active window.
166    pub attempts_used: u32,
167    /// Attempts remaining in the active window.
168    pub remaining: u32,
169    /// Whether a spawn is currently in flight.
170    pub in_flight: bool,
171    /// Retry-after hint when exhausted.
172    pub retry_after_ms: Option<u64>,
173}
174
175impl AdminSpawnBudget {
176    fn from_snapshot(snapshot: &SpawnBudgetSnapshot) -> Self {
177        Self {
178            broker_instance: snapshot.key.instance.id(),
179            service_name: snapshot.key.service_name.clone(),
180            service_version: snapshot.key.service_version.clone(),
181            attempts_used: snapshot.attempts_used,
182            remaining: snapshot.remaining,
183            in_flight: snapshot.in_flight,
184            retry_after_ms: snapshot
185                .retry_after
186                .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)),
187        }
188    }
189}
190
191/// Render `status --json`.
192pub fn render_status_json(snapshot: &AdminSnapshot) -> String {
193    json!({
194        "schema_version": ADMIN_SCHEMA_VERSION,
195        "command": "status",
196        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
197        "broker_instance": snapshot.broker_instance,
198        "broker_pid": snapshot.broker_pid,
199        "uptime_seconds": snapshot.uptime.as_secs_f64(),
200        "accepting_hello": snapshot.accepting_hello,
201        "connections_open": snapshot.connections_open,
202        "backends": snapshot.backends.iter().map(|backend| {
203            json!({
204                "service_name": backend.service_name,
205                "service_version": backend.service_version,
206                "pid": backend.pid,
207                "backend_pipe": backend.backend_pipe,
208                "last_active_unix_ms": backend.last_active_unix_ms,
209                "state": backend.state,
210            })
211        }).collect::<Vec<_>>(),
212    })
213    .to_string()
214}
215
216/// Render `dump --json`.
217pub fn render_dump_json(snapshot: &AdminSnapshot) -> String {
218    json!({
219        "schema_version": ADMIN_SCHEMA_VERSION,
220        "command": "dump",
221        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
222        "broker_instance": snapshot.broker_instance,
223        "effective_config": effective_config_json(snapshot),
224        "backend_table": snapshot.backends.iter().map(|backend| {
225            json!({
226                "service_name": backend.service_name,
227                "service_version": backend.service_version,
228                "pid": backend.pid,
229                "backend_pipe": backend.backend_pipe,
230                "state": backend.state,
231            })
232        }).collect::<Vec<_>>(),
233        "spawn_budgets": snapshot.spawn_budgets.iter().map(|budget| {
234            json!({
235                "broker_instance": budget.broker_instance,
236                "service_name": budget.service_name,
237                "service_version": budget.service_version,
238                "attempts_used": budget.attempts_used,
239                "remaining": budget.remaining,
240                "in_flight": budget.in_flight,
241                "retry_after_ms": budget.retry_after_ms,
242            })
243        }).collect::<Vec<_>>(),
244        "recent_lifecycle_events": [],
245    })
246    .to_string()
247}
248
249/// Render `list-instances --json`.
250pub fn render_list_instances_json(snapshot: &AdminSnapshot) -> String {
251    json!({
252        "schema_version": ADMIN_SCHEMA_VERSION,
253        "command": "list-instances",
254        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
255        "instances": [{
256            "broker_instance": snapshot.broker_instance,
257            "pipe": "",
258            "pid": snapshot.broker_pid,
259            "state": if snapshot.accepting_hello { "running" } else { "not-serving" },
260        }],
261    })
262    .to_string()
263}
264
265/// Render `backend-health <service> --json`.
266pub fn render_backend_health_json(snapshot: &AdminSnapshot, service_name: &str) -> String {
267    json!({
268        "schema_version": ADMIN_SCHEMA_VERSION,
269        "command": "backend-health",
270        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
271        "service_name": service_name,
272        "backends": snapshot.backends.iter()
273            .filter(|backend| backend.service_name == service_name)
274            .map(|backend| {
275                json!({
276                    "service_version": backend.service_version,
277                    "pid": backend.pid,
278                    "state": backend.state,
279                    "last_hello_unix_ms": backend.last_hello_unix_ms,
280                    "last_error": backend.last_error,
281                })
282            })
283            .collect::<Vec<_>>(),
284    })
285    .to_string()
286}
287
288/// Render `config --effective --json`.
289pub fn render_config_json(snapshot: &AdminSnapshot) -> String {
290    json!({
291        "schema_version": ADMIN_SCHEMA_VERSION,
292        "command": "config",
293        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
294        "values": effective_config_json(snapshot),
295    })
296    .to_string()
297}
298
299/// Render `diagnose --output <path>` summary JSON.
300pub fn render_diagnose_json(snapshot: &AdminSnapshot, output: &str) -> String {
301    let entries = diagnostic_bundle_entries_json(snapshot);
302    json!({
303        "schema_version": ADMIN_SCHEMA_VERSION,
304        "command": "diagnose",
305        "generated_at_unix_ms": snapshot.generated_at_unix_ms,
306        "output": output,
307        "bundle": {
308            "format": DIAGNOSTIC_BUNDLE_FORMAT,
309            "mode": DIAGNOSTIC_BUNDLE_MODE,
310            "created": false,
311            "entries": entries,
312        },
313        "files": diagnostic_bundle_file_paths(snapshot),
314        "redactions": diagnostic_redaction_names(),
315        "redaction_policy": diagnostic_redaction_policy_json(),
316    })
317    .to_string()
318}
319
320/// Render OpenMetrics text.
321pub fn render_metrics_text(snapshot: &AdminSnapshot) -> String {
322    let mut out = String::new();
323    for metric in BROKER_METRICS {
324        out.push_str("# TYPE ");
325        out.push_str(metric.name);
326        out.push(' ');
327        out.push_str(metric_kind_name(metric.kind));
328        out.push('\n');
329        if metric.labels.is_empty() {
330            out.push_str(metric.name);
331            out.push(' ');
332            out.push_str(&metric_value(metric.name, snapshot));
333            out.push('\n');
334        }
335    }
336    out.push_str("# EOF\n");
337    out
338}
339
340/// Health endpoint body.
341pub fn render_healthz() -> &'static str {
342    "ok\n"
343}
344
345/// Readiness endpoint body.
346pub fn render_readyz(snapshot: &AdminSnapshot) -> &'static str {
347    if snapshot.accepting_hello {
348        "ready\n"
349    } else {
350        "not ready\n"
351    }
352}
353
354/// Render one typed admin request into a typed admin reply.
355pub fn render_admin_reply(snapshot: &AdminSnapshot, request: &AdminRequest) -> AdminReply {
356    match AdminVerb::try_from(request.verb) {
357        Ok(AdminVerb::Status) => {
358            if request.json {
359                json_reply(render_status_json(snapshot))
360            } else {
361                text_reply(
362                    format!(
363                        "broker_instance: {}\naccepting_hello: {}\n",
364                        snapshot.broker_instance, snapshot.accepting_hello
365                    ),
366                    0,
367                )
368            }
369        }
370        Ok(AdminVerb::Dump) => json_reply(render_dump_json(snapshot)),
371        Ok(AdminVerb::ListInstances) => json_reply(render_list_instances_json(snapshot)),
372        Ok(AdminVerb::Healthz) => text_reply(render_healthz(), 0),
373        Ok(AdminVerb::Readyz) => {
374            let exit_code = if snapshot.accepting_hello { 0 } else { 1 };
375            text_reply(render_readyz(snapshot), exit_code)
376        }
377        Ok(AdminVerb::BackendHealth) => {
378            let service_name = if request.service_name.is_empty() {
379                "unknown"
380            } else {
381                &request.service_name
382            };
383            json_reply(render_backend_health_json(snapshot, service_name))
384        }
385        Ok(AdminVerb::Config) => json_reply(render_config_json(snapshot)),
386        Ok(AdminVerb::Diagnose) => {
387            let output = if request.output_path.is_empty() {
388                "bundle.tar.gz"
389            } else {
390                &request.output_path
391            };
392            json_reply(render_diagnose_json(snapshot, output))
393        }
394        Ok(AdminVerb::Metrics) => AdminReply {
395            kind: AdminReplyKind::Openmetrics as i32,
396            body: render_metrics_text(snapshot),
397            exit_code: 0,
398            content_type: "application/openmetrics-text".into(),
399        },
400        Ok(AdminVerb::Unspecified) | Err(_) => text_reply("unsupported admin verb\n", 2),
401    }
402}
403
404/// Handle one decoded admin frame and return a response frame.
405pub fn handle_admin_frame(
406    frame: Frame,
407    snapshot: &AdminSnapshot,
408) -> Result<Frame, AdminFrameError> {
409    if frame.envelope_version != PROTOCOL_VERSION {
410        return Err(AdminFrameError::UnsupportedEnvelopeVersion(
411            frame.envelope_version,
412        ));
413    }
414    if FrameKind::try_from(frame.kind) != Ok(FrameKind::Request) {
415        return Err(AdminFrameError::UnexpectedKind(frame.kind));
416    }
417    if frame.payload_protocol != ADMIN_PAYLOAD_PROTOCOL {
418        return Err(AdminFrameError::UnexpectedPayloadProtocol(
419            frame.payload_protocol,
420        ));
421    }
422    if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
423        return Err(AdminFrameError::UnsupportedPayloadEncoding(
424            frame.payload_encoding,
425        ));
426    }
427
428    let request =
429        AdminRequest::decode(frame.payload.as_slice()).map_err(AdminFrameError::Decode)?;
430    let reply = render_admin_reply(snapshot, &request);
431    Ok(Frame {
432        envelope_version: PROTOCOL_VERSION,
433        kind: FrameKind::Response as i32,
434        payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
435        payload: reply.encode_to_vec(),
436        request_id: frame.request_id,
437        payload_encoding: PayloadEncoding::None as i32,
438        deadline_unix_ms: 0,
439        traceparent: frame.traceparent,
440        tracestate: frame.tracestate,
441    })
442}
443
444/// Handle one already-accepted broker admin connection.
445///
446/// The connection reads one v1-framed [`Frame`] carrying an [`AdminRequest`],
447/// dispatches through [`handle_admin_frame`], writes one v1-framed response
448/// [`Frame`] carrying an [`AdminReply`], then returns the decoded reply for
449/// tests and callers that need exit-code metadata.
450pub fn handle_admin_connection<S: Read + Write>(
451    stream: &mut S,
452    snapshot: &AdminSnapshot,
453) -> Result<AdminReply, AdminConnectionError> {
454    let request_bytes = read_frame(stream)?;
455    let request_frame =
456        Frame::decode(request_bytes.as_slice()).map_err(AdminConnectionError::DecodeFrame)?;
457    let response_frame = handle_admin_frame(request_frame, snapshot)?;
458    write_frame(stream, &response_frame.encode_to_vec())?;
459    AdminReply::decode(response_frame.payload.as_slice()).map_err(AdminConnectionError::DecodeReply)
460}
461
462/// Run one blocking local-socket accept and serve exactly one admin request.
463///
464/// This is the admin-side counterpart to `serve_one_local_socket` for Hello.
465/// The full long-lived broker loop can reuse [`handle_admin_connection`] after
466/// selecting an admin connection from the shared accept path.
467pub fn serve_one_admin_socket(
468    socket_path: &str,
469    snapshot: &AdminSnapshot,
470) -> Result<AdminReply, AdminConnectionError> {
471    let listener = bind_local_socket(socket_path)?;
472    let cleanup = LocalSocketCleanup(socket_path);
473    let result = (|| {
474        let mut stream = listener.accept()?;
475        handle_admin_connection(&mut stream, snapshot)
476    })();
477    drop(listener);
478    drop(cleanup);
479    result
480}
481
482/// Errors raised by admin frame validation/dispatch.
483#[derive(Debug, thiserror::Error)]
484pub enum AdminFrameError {
485    /// Unsupported frame envelope version.
486    #[error("unsupported admin frame envelope_version {0}")]
487    UnsupportedEnvelopeVersion(u32),
488    /// Admin frames must be requests.
489    #[error("admin frame kind must be REQUEST, got {0}")]
490    UnexpectedKind(i32),
491    /// Admin frame used the wrong payload protocol.
492    #[error("admin frame payload_protocol must be 0xAD01, got {0}")]
493    UnexpectedPayloadProtocol(u32),
494    /// Admin frame payload must be uncompressed.
495    #[error("admin frame payload must not be compressed, got {0}")]
496    UnsupportedPayloadEncoding(i32),
497    /// AdminRequest protobuf decoding failed.
498    #[error(transparent)]
499    Decode(prost::DecodeError),
500}
501
502/// Errors raised while serving a framed admin connection.
503#[derive(Debug, thiserror::Error)]
504pub enum AdminConnectionError {
505    /// v1 framing failed.
506    #[error(transparent)]
507    Framing(#[from] FramingError),
508    /// The request frame could not be decoded.
509    #[error("failed to decode admin request Frame: {0}")]
510    DecodeFrame(prost::DecodeError),
511    /// The request frame failed admin validation or dispatch.
512    #[error(transparent)]
513    AdminFrame(#[from] AdminFrameError),
514    /// The response payload could not be decoded after dispatch.
515    #[error("failed to decode admin reply payload: {0}")]
516    DecodeReply(prost::DecodeError),
517    /// Local socket binding failed.
518    #[error(transparent)]
519    LocalSocket(#[from] BrokerConnectionError),
520    /// Local socket I/O failed.
521    #[error(transparent)]
522    Io(#[from] io::Error),
523}
524
525fn json_reply(body: String) -> AdminReply {
526    AdminReply {
527        kind: AdminReplyKind::Json as i32,
528        body,
529        exit_code: 0,
530        content_type: "application/json".into(),
531    }
532}
533
534fn text_reply(body: impl Into<String>, exit_code: u32) -> AdminReply {
535    AdminReply {
536        kind: AdminReplyKind::Text as i32,
537        body: body.into(),
538        exit_code,
539        content_type: "text/plain".into(),
540    }
541}
542
543fn metric_kind_name(kind: MetricKind) -> &'static str {
544    match kind {
545        MetricKind::Counter => "counter",
546        MetricKind::Gauge => "gauge",
547        MetricKind::Histogram => "histogram",
548    }
549}
550
551fn metric_value(name: &str, snapshot: &AdminSnapshot) -> String {
552    match name {
553        "running_process_broker_v1_connections_open" => snapshot.connections_open.to_string(),
554        "running_process_broker_v1_fd_usage_ratio" => "0".into(),
555        "running_process_broker_v1_uptime_seconds" => snapshot.uptime.as_secs().to_string(),
556        _ => "0".into(),
557    }
558}
559
560fn effective_config_json(snapshot: &AdminSnapshot) -> serde_json::Value {
561    json!({
562        "broker": {
563            "broker_instance": sourced_value(&snapshot.broker_instance, "runtime"),
564            "broker_pid": sourced_value(snapshot.broker_pid, "runtime"),
565            "accepting_hello": sourced_value(snapshot.accepting_hello, "runtime"),
566        },
567        "protocol": {
568            "admin_payload_protocol": sourced_value(format!("0x{ADMIN_PAYLOAD_PROTOCOL:04X}"), "protocol-v1"),
569            "envelope_version": sourced_value(PROTOCOL_VERSION, "protocol-v1"),
570            "framing_version": sourced_value(ENVELOPE_VERSION, "protocol-v1"),
571        },
572        "limits": {
573            "max_frame_bytes": sourced_value(MAX_FRAME_BYTES, "protocol-v1"),
574            "max_hello_bytes": sourced_value(MAX_HELLO_BYTES, "protocol-v1"),
575            "connections_open": sourced_value(snapshot.connections_open, "runtime"),
576        },
577        "paths": {
578            "service_definition_dir": sourced_value(
579                service_definition_dir().display().to_string(),
580                service_definition_dir_source(),
581            ),
582        },
583        "spawn_budget": {
584            "default_attempts_per_window": sourced_value(DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, "default"),
585            "default_window_ms": sourced_value(duration_ms(DEFAULT_SPAWN_BUDGET_WINDOW), "default"),
586            "active_budget_rows": sourced_value(snapshot.spawn_budgets.len(), "runtime"),
587        },
588        "diagnostics": {
589            "bundle_format": sourced_value(DIAGNOSTIC_BUNDLE_FORMAT, "schema-v1"),
590            "bundle_mode": sourced_value(DIAGNOSTIC_BUNDLE_MODE, "schema-v1"),
591            "redactions": sourced_value(diagnostic_redaction_names(), "schema-v1"),
592        },
593    })
594}
595
596fn service_definition_dir_source() -> &'static str {
597    if std::env::var_os(SERVICE_DEF_DIR_ENV).is_some() {
598        "env:RUNNING_PROCESS_SERVICE_DEF_DIR"
599    } else {
600        "platform-default"
601    }
602}
603
604fn diagnostic_bundle_entries_json(snapshot: &AdminSnapshot) -> Vec<serde_json::Value> {
605    vec![
606        diagnostic_bundle_entry("admin/status.json", "json", "status", true, false, None),
607        diagnostic_bundle_entry("admin/dump.json", "json", "dump", true, true, None),
608        diagnostic_bundle_entry(
609            "config/effective.json",
610            "json",
611            "effective-config",
612            true,
613            false,
614            None,
615        ),
616        diagnostic_bundle_entry(
617            "metrics/openmetrics.txt",
618            "openmetrics",
619            "metrics",
620            true,
621            false,
622            None,
623        ),
624        diagnostic_bundle_entry(
625            "events/lifecycle.jsonl",
626            "jsonl",
627            "lifecycle-events",
628            false,
629            true,
630            None,
631        ),
632        diagnostic_bundle_entry(
633            "manifest/backend-manifests.json",
634            "json",
635            "backend-manifest-index",
636            false,
637            true,
638            None,
639        ),
640        diagnostic_bundle_entry(
641            "process/backends.json",
642            "json",
643            "backend-table",
644            true,
645            true,
646            Some(snapshot.backends.len()),
647        ),
648        diagnostic_bundle_entry(
649            "system/summary.json",
650            "json",
651            "host-summary",
652            false,
653            true,
654            None,
655        ),
656    ]
657}
658
659fn diagnostic_bundle_file_paths(snapshot: &AdminSnapshot) -> Vec<String> {
660    diagnostic_bundle_entries_json(snapshot)
661        .into_iter()
662        .filter_map(|entry| {
663            entry
664                .get("path")
665                .and_then(serde_json::Value::as_str)
666                .map(str::to_owned)
667        })
668        .collect()
669}
670
671fn diagnostic_bundle_entry(
672    path: &str,
673    kind: &str,
674    source: &str,
675    required: bool,
676    redacted: bool,
677    record_count: Option<usize>,
678) -> serde_json::Value {
679    let mut entry = json!({
680        "path": path,
681        "kind": kind,
682        "source": source,
683        "required": required,
684        "redacted": redacted,
685    });
686    if let Some(record_count) = record_count {
687        entry["record_count"] = json!(record_count);
688    }
689    entry
690}
691
692fn diagnostic_redaction_names() -> Vec<&'static str> {
693    DIAGNOSTIC_REDACTIONS.to_vec()
694}
695
696fn diagnostic_redaction_policy_json() -> Vec<serde_json::Value> {
697    vec![
698        json!({
699            "name": "home",
700            "replacement": "~",
701        }),
702        json!({
703            "name": "secret-env",
704            "matches": ["KEY", "TOKEN", "SECRET", "PASS"],
705        }),
706        json!({
707            "name": "acl-identities",
708            "replacement": "stable-hash",
709        }),
710    ]
711}
712
713fn sourced_value(value: impl Serialize, source: &'static str) -> serde_json::Value {
714    json!({
715        "value": value,
716        "source": source,
717    })
718}
719
720fn duration_ms(duration: Duration) -> u64 {
721    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
722}
723
724fn unix_now_ms() -> u64 {
725    SystemTime::now()
726        .duration_since(UNIX_EPOCH)
727        .map(|duration| duration.as_millis() as u64)
728        .unwrap_or(0)
729}