1use std::io::{self, Read, Write};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use interprocess::local_socket::traits::Listener;
7use prost::Message;
8use serde::Serialize;
9use serde_json::json;
10
11use crate::broker::protocol::{
12 read_frame, write_frame, AdminReply, AdminReplyKind, AdminRequest, AdminVerb, Frame, FrameKind,
13 FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES, MAX_HELLO_BYTES,
14 PROTOCOL_VERSION,
15};
16
17use super::backend_registry::BackendRegistry;
18use super::connection::{bind_local_socket, BrokerConnectionError, LocalSocketCleanup};
19use super::service_def_loader::{service_definition_dir, SERVICE_DEF_DIR_ENV};
20use super::spawn_coordinator::{
21 SpawnBudgetSnapshot, DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, DEFAULT_SPAWN_BUDGET_WINDOW,
22};
23use crate::broker::server::metrics::{MetricKind, BROKER_METRICS};
24
25pub const ADMIN_SCHEMA_VERSION: u32 = 1;
27pub use crate::broker::protocol::registry::ADMIN_PAYLOAD_PROTOCOL;
33
34const DIAGNOSTIC_BUNDLE_FORMAT: &str = "tar.gz";
35const DIAGNOSTIC_BUNDLE_MODE: &str = "metadata-only";
36const DIAGNOSTIC_REDACTIONS: &[&str] = &["home", "secret-env", "acl-identities"];
37
38#[derive(Clone, Debug)]
40pub struct AdminSnapshot {
41 pub broker_instance: String,
43 pub broker_pid: u32,
45 pub generated_at_unix_ms: u64,
47 pub uptime: Duration,
49 pub accepting_hello: bool,
51 pub connections_open: u64,
53 pub backends: Vec<AdminBackend>,
55 pub spawn_budgets: Vec<AdminSpawnBudget>,
57 pub fd_pressure_demoted: bool,
59 pub inode_pressure: AdminInodePressure,
61}
62
63#[derive(Clone, Debug, Default)]
67pub struct AdminInodePressure {
68 pub supported: bool,
70 pub error: Option<String>,
72 pub total: u64,
74 pub free: u64,
76}
77
78impl AdminInodePressure {
79 pub fn probe() -> Self {
81 match crate::broker::fs_health::daemon_data_dir_inode_usage() {
82 Ok(Some(usage)) => Self {
83 supported: true,
84 error: None,
85 total: usage.total,
86 free: usage.free,
87 },
88 Ok(None) => Self::default(),
89 Err(err) => Self {
90 supported: false,
91 error: Some(err.to_string()),
92 total: 0,
93 free: 0,
94 },
95 }
96 }
97
98 fn to_json(&self) -> serde_json::Value {
99 if self.supported {
100 let usage = crate::broker::fs_health::InodeUsage {
101 total: self.total,
102 free: self.free,
103 };
104 json!({
105 "supported": true,
106 "inodes_total": self.total,
107 "inodes_free": self.free,
108 "inodes_used": usage.used(),
109 "used_ratio": usage.used_ratio(),
110 })
111 } else {
112 json!({
113 "supported": false,
114 "detail": self.error.clone().unwrap_or_else(|| {
115 "inode accounting not applicable on this filesystem".into()
116 }),
117 })
118 }
119 }
120}
121
122impl AdminSnapshot {
123 pub fn local_not_serving() -> Self {
125 Self {
126 broker_instance: "local".into(),
127 broker_pid: std::process::id(),
128 generated_at_unix_ms: unix_now_ms(),
129 uptime: Duration::ZERO,
130 accepting_hello: false,
131 connections_open: 0,
132 backends: Vec::new(),
133 spawn_budgets: Vec::new(),
134 fd_pressure_demoted: false,
135 inode_pressure: AdminInodePressure::probe(),
136 }
137 }
138
139 pub fn from_registry(
141 broker_instance: impl Into<String>,
142 uptime: Duration,
143 accepting_hello: bool,
144 connections_open: u64,
145 registry: &BackendRegistry,
146 spawn_budgets: &[SpawnBudgetSnapshot],
147 ) -> Self {
148 Self::from_registry_at(
149 broker_instance,
150 std::process::id(),
151 unix_now_ms(),
152 uptime,
153 accepting_hello,
154 connections_open,
155 registry,
156 spawn_budgets,
157 )
158 }
159
160 #[allow(clippy::too_many_arguments)]
162 pub fn from_registry_at(
163 broker_instance: impl Into<String>,
164 broker_pid: u32,
165 generated_at_unix_ms: u64,
166 uptime: Duration,
167 accepting_hello: bool,
168 connections_open: u64,
169 registry: &BackendRegistry,
170 spawn_budgets: &[SpawnBudgetSnapshot],
171 ) -> Self {
172 Self {
173 broker_instance: broker_instance.into(),
174 broker_pid,
175 generated_at_unix_ms,
176 uptime,
177 accepting_hello,
178 connections_open,
179 backends: registry
180 .iter()
181 .map(|(_key, handle)| AdminBackend {
182 service_name: handle.service_name.clone(),
183 service_version: handle.service_version.clone(),
184 pid: handle.daemon_process.pid,
185 backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
186 last_active_unix_ms: handle.daemon_process.started_at_unix_ms,
187 state: if handle.is_alive() {
188 "running".into()
189 } else {
190 "stale".into()
191 },
192 last_hello_unix_ms: 0,
193 last_error: None,
194 })
195 .collect(),
196 spawn_budgets: spawn_budgets
197 .iter()
198 .map(AdminSpawnBudget::from_snapshot)
199 .collect(),
200 fd_pressure_demoted: false,
201 inode_pressure: AdminInodePressure::probe(),
202 }
203 }
204
205 pub fn with_fd_pressure_demoted(mut self, demoted: bool) -> Self {
207 self.fd_pressure_demoted = demoted;
208 self
209 }
210
211 pub fn with_inode_pressure(mut self, inode_pressure: AdminInodePressure) -> Self {
213 self.inode_pressure = inode_pressure;
214 self
215 }
216}
217
218#[derive(Clone, Debug)]
220pub struct AdminBackend {
221 pub service_name: String,
223 pub service_version: String,
225 pub pid: u32,
227 pub backend_pipe: String,
229 pub last_active_unix_ms: u64,
231 pub state: String,
233 pub last_hello_unix_ms: u64,
235 pub last_error: Option<String>,
237}
238
239#[derive(Clone, Debug)]
241pub struct AdminSpawnBudget {
242 pub broker_instance: String,
244 pub service_name: String,
246 pub service_version: String,
248 pub attempts_used: u32,
250 pub remaining: u32,
252 pub in_flight: bool,
254 pub retry_after_ms: Option<u64>,
256}
257
258impl AdminSpawnBudget {
259 fn from_snapshot(snapshot: &SpawnBudgetSnapshot) -> Self {
260 Self {
261 broker_instance: snapshot.key.instance.id(),
262 service_name: snapshot.key.service_name.clone(),
263 service_version: snapshot.key.service_version.clone(),
264 attempts_used: snapshot.attempts_used,
265 remaining: snapshot.remaining,
266 in_flight: snapshot.in_flight,
267 retry_after_ms: snapshot
268 .retry_after
269 .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)),
270 }
271 }
272}
273
274pub fn render_status_json(snapshot: &AdminSnapshot) -> String {
276 json!({
277 "schema_version": ADMIN_SCHEMA_VERSION,
278 "command": "status",
279 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
280 "broker_instance": snapshot.broker_instance,
281 "broker_pid": snapshot.broker_pid,
282 "uptime_seconds": snapshot.uptime.as_secs_f64(),
283 "accepting_hello": snapshot.accepting_hello,
284 "connections_open": snapshot.connections_open,
285 "fd_pressure": {
286 "demoted": snapshot.fd_pressure_demoted,
287 },
288 "inode_pressure": snapshot.inode_pressure.to_json(),
289 "backends": snapshot.backends.iter().map(|backend| {
290 json!({
291 "service_name": backend.service_name,
292 "service_version": backend.service_version,
293 "pid": backend.pid,
294 "backend_pipe": backend.backend_pipe,
295 "last_active_unix_ms": backend.last_active_unix_ms,
296 "state": backend.state,
297 })
298 }).collect::<Vec<_>>(),
299 })
300 .to_string()
301}
302
303pub fn render_dump_json(snapshot: &AdminSnapshot) -> String {
305 json!({
306 "schema_version": ADMIN_SCHEMA_VERSION,
307 "command": "dump",
308 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
309 "broker_instance": snapshot.broker_instance,
310 "effective_config": effective_config_json(snapshot),
311 "backend_table": snapshot.backends.iter().map(|backend| {
312 json!({
313 "service_name": backend.service_name,
314 "service_version": backend.service_version,
315 "pid": backend.pid,
316 "backend_pipe": backend.backend_pipe,
317 "state": backend.state,
318 })
319 }).collect::<Vec<_>>(),
320 "spawn_budgets": snapshot.spawn_budgets.iter().map(|budget| {
321 json!({
322 "broker_instance": budget.broker_instance,
323 "service_name": budget.service_name,
324 "service_version": budget.service_version,
325 "attempts_used": budget.attempts_used,
326 "remaining": budget.remaining,
327 "in_flight": budget.in_flight,
328 "retry_after_ms": budget.retry_after_ms,
329 })
330 }).collect::<Vec<_>>(),
331 "recent_lifecycle_events": [],
332 })
333 .to_string()
334}
335
336pub fn render_list_instances_json(snapshot: &AdminSnapshot) -> String {
338 json!({
339 "schema_version": ADMIN_SCHEMA_VERSION,
340 "command": "list-instances",
341 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
342 "instances": [{
343 "broker_instance": snapshot.broker_instance,
344 "pipe": "",
345 "pid": snapshot.broker_pid,
346 "state": if snapshot.accepting_hello { "running" } else { "not-serving" },
347 }],
348 })
349 .to_string()
350}
351
352pub fn render_backend_health_json(snapshot: &AdminSnapshot, service_name: &str) -> String {
354 json!({
355 "schema_version": ADMIN_SCHEMA_VERSION,
356 "command": "backend-health",
357 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
358 "service_name": service_name,
359 "backends": snapshot.backends.iter()
360 .filter(|backend| backend.service_name == service_name)
361 .map(|backend| {
362 json!({
363 "service_version": backend.service_version,
364 "pid": backend.pid,
365 "state": backend.state,
366 "last_hello_unix_ms": backend.last_hello_unix_ms,
367 "last_error": backend.last_error,
368 })
369 })
370 .collect::<Vec<_>>(),
371 })
372 .to_string()
373}
374
375pub fn render_config_json(snapshot: &AdminSnapshot) -> String {
377 json!({
378 "schema_version": ADMIN_SCHEMA_VERSION,
379 "command": "config",
380 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
381 "values": effective_config_json(snapshot),
382 })
383 .to_string()
384}
385
386pub fn render_diagnose_json(snapshot: &AdminSnapshot, output: &str) -> String {
388 let entries = diagnostic_bundle_entries_json(snapshot);
389 json!({
390 "schema_version": ADMIN_SCHEMA_VERSION,
391 "command": "diagnose",
392 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
393 "output": output,
394 "bundle": {
395 "format": DIAGNOSTIC_BUNDLE_FORMAT,
396 "mode": DIAGNOSTIC_BUNDLE_MODE,
397 "created": false,
398 "entries": entries,
399 },
400 "files": diagnostic_bundle_file_paths(snapshot),
401 "redactions": diagnostic_redaction_names(),
402 "redaction_policy": diagnostic_redaction_policy_json(),
403 })
404 .to_string()
405}
406
407pub fn render_metrics_text(snapshot: &AdminSnapshot) -> String {
409 let mut out = String::new();
410 for metric in BROKER_METRICS {
411 out.push_str("# TYPE ");
412 out.push_str(metric.name);
413 out.push(' ');
414 out.push_str(metric_kind_name(metric.kind));
415 out.push('\n');
416 if metric.labels.is_empty() {
417 out.push_str(metric.name);
418 out.push(' ');
419 out.push_str(&metric_value(metric.name, snapshot));
420 out.push('\n');
421 }
422 }
423 out.push_str("# EOF\n");
424 out
425}
426
427pub fn render_healthz() -> &'static str {
429 "ok\n"
430}
431
432pub fn render_readyz(snapshot: &AdminSnapshot) -> &'static str {
434 if snapshot.accepting_hello {
435 "ready\n"
436 } else {
437 "not ready\n"
438 }
439}
440
441pub fn render_admin_reply(snapshot: &AdminSnapshot, request: &AdminRequest) -> AdminReply {
443 match AdminVerb::try_from(request.verb) {
444 Ok(AdminVerb::Status) => {
445 if request.json {
446 json_reply(render_status_json(snapshot))
447 } else {
448 text_reply(
449 format!(
450 "broker_instance: {}\naccepting_hello: {}\n",
451 snapshot.broker_instance, snapshot.accepting_hello
452 ),
453 0,
454 )
455 }
456 }
457 Ok(AdminVerb::Dump) => json_reply(render_dump_json(snapshot)),
458 Ok(AdminVerb::ListInstances) => json_reply(render_list_instances_json(snapshot)),
459 Ok(AdminVerb::Healthz) => text_reply(render_healthz(), 0),
460 Ok(AdminVerb::Readyz) => {
461 let exit_code = if snapshot.accepting_hello { 0 } else { 1 };
462 text_reply(render_readyz(snapshot), exit_code)
463 }
464 Ok(AdminVerb::BackendHealth) => {
465 let service_name = if request.service_name.is_empty() {
466 "unknown"
467 } else {
468 &request.service_name
469 };
470 json_reply(render_backend_health_json(snapshot, service_name))
471 }
472 Ok(AdminVerb::Config) => json_reply(render_config_json(snapshot)),
473 Ok(AdminVerb::Diagnose) => {
474 let output = if request.output_path.is_empty() {
475 "bundle.tar.gz"
476 } else {
477 &request.output_path
478 };
479 json_reply(render_diagnose_json(snapshot, output))
480 }
481 Ok(AdminVerb::Metrics) => AdminReply {
482 kind: AdminReplyKind::Openmetrics as i32,
483 body: render_metrics_text(snapshot),
484 exit_code: 0,
485 content_type: "application/openmetrics-text".into(),
486 },
487 Ok(AdminVerb::Unspecified) | Err(_) => text_reply("unsupported admin verb\n", 2),
488 }
489}
490
491pub fn handle_admin_frame(
493 frame: Frame,
494 snapshot: &AdminSnapshot,
495) -> Result<Frame, AdminFrameError> {
496 if frame.envelope_version != PROTOCOL_VERSION {
497 return Err(AdminFrameError::UnsupportedEnvelopeVersion(
498 frame.envelope_version,
499 ));
500 }
501 if FrameKind::try_from(frame.kind) != Ok(FrameKind::Request) {
502 return Err(AdminFrameError::UnexpectedKind(frame.kind));
503 }
504 if frame.payload_protocol != ADMIN_PAYLOAD_PROTOCOL {
505 return Err(AdminFrameError::UnexpectedPayloadProtocol(
506 frame.payload_protocol,
507 ));
508 }
509 if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
510 return Err(AdminFrameError::UnsupportedPayloadEncoding(
511 frame.payload_encoding,
512 ));
513 }
514
515 let request =
516 AdminRequest::decode(frame.payload.as_slice()).map_err(AdminFrameError::Decode)?;
517 let reply = render_admin_reply(snapshot, &request);
518 Ok(Frame {
519 envelope_version: PROTOCOL_VERSION,
520 kind: FrameKind::Response as i32,
521 payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
522 payload: reply.encode_to_vec(),
523 request_id: frame.request_id,
524 payload_encoding: PayloadEncoding::None as i32,
525 deadline_unix_ms: 0,
526 traceparent: frame.traceparent,
527 tracestate: frame.tracestate,
528 })
529}
530
531pub fn handle_admin_connection<S: Read + Write>(
538 stream: &mut S,
539 snapshot: &AdminSnapshot,
540) -> Result<AdminReply, AdminConnectionError> {
541 let request_bytes = read_frame(stream)?;
542 let request_frame =
543 Frame::decode(request_bytes.as_slice()).map_err(AdminConnectionError::DecodeFrame)?;
544 let response_frame = handle_admin_frame(request_frame, snapshot)?;
545 write_frame(stream, &response_frame.encode_to_vec())?;
546 AdminReply::decode(response_frame.payload.as_slice()).map_err(AdminConnectionError::DecodeReply)
547}
548
549pub fn serve_one_admin_socket(
555 socket_path: &str,
556 snapshot: &AdminSnapshot,
557) -> Result<AdminReply, AdminConnectionError> {
558 let listener = bind_local_socket(socket_path)?;
559 let cleanup = LocalSocketCleanup(socket_path);
560 let result = (|| {
561 let mut stream = listener.accept()?;
562 handle_admin_connection(&mut stream, snapshot)
563 })();
564 drop(listener);
565 drop(cleanup);
566 result
567}
568
569#[derive(Debug, thiserror::Error)]
571pub enum AdminFrameError {
572 #[error("unsupported admin frame envelope_version {0}")]
574 UnsupportedEnvelopeVersion(u32),
575 #[error("admin frame kind must be REQUEST, got {0}")]
577 UnexpectedKind(i32),
578 #[error("admin frame payload_protocol must be 0xAD01, got {0}")]
580 UnexpectedPayloadProtocol(u32),
581 #[error("admin frame payload must not be compressed, got {0}")]
583 UnsupportedPayloadEncoding(i32),
584 #[error(transparent)]
586 Decode(prost::DecodeError),
587}
588
589#[derive(Debug, thiserror::Error)]
591pub enum AdminConnectionError {
592 #[error(transparent)]
594 Framing(#[from] FramingError),
595 #[error("failed to decode admin request Frame: {0}")]
597 DecodeFrame(prost::DecodeError),
598 #[error(transparent)]
600 AdminFrame(#[from] AdminFrameError),
601 #[error("failed to decode admin reply payload: {0}")]
603 DecodeReply(prost::DecodeError),
604 #[error(transparent)]
606 LocalSocket(#[from] BrokerConnectionError),
607 #[error(transparent)]
609 Io(#[from] io::Error),
610}
611
612fn json_reply(body: String) -> AdminReply {
613 AdminReply {
614 kind: AdminReplyKind::Json as i32,
615 body,
616 exit_code: 0,
617 content_type: "application/json".into(),
618 }
619}
620
621fn text_reply(body: impl Into<String>, exit_code: u32) -> AdminReply {
622 AdminReply {
623 kind: AdminReplyKind::Text as i32,
624 body: body.into(),
625 exit_code,
626 content_type: "text/plain".into(),
627 }
628}
629
630fn metric_kind_name(kind: MetricKind) -> &'static str {
631 match kind {
632 MetricKind::Counter => "counter",
633 MetricKind::Gauge => "gauge",
634 MetricKind::Histogram => "histogram",
635 }
636}
637
638fn metric_value(name: &str, snapshot: &AdminSnapshot) -> String {
639 match name {
640 "running_process_broker_v1_connections_open" => snapshot.connections_open.to_string(),
641 "running_process_broker_v1_fd_usage_ratio" => "0".into(),
642 "running_process_broker_v1_uptime_seconds" => snapshot.uptime.as_secs().to_string(),
643 _ => "0".into(),
644 }
645}
646
647fn effective_config_json(snapshot: &AdminSnapshot) -> serde_json::Value {
648 json!({
649 "broker": {
650 "broker_instance": sourced_value(&snapshot.broker_instance, "runtime"),
651 "broker_pid": sourced_value(snapshot.broker_pid, "runtime"),
652 "accepting_hello": sourced_value(snapshot.accepting_hello, "runtime"),
653 },
654 "protocol": {
655 "admin_payload_protocol": sourced_value(format!("0x{ADMIN_PAYLOAD_PROTOCOL:04X}"), "protocol-v1"),
656 "envelope_version": sourced_value(PROTOCOL_VERSION, "protocol-v1"),
657 "framing_version": sourced_value(ENVELOPE_VERSION, "protocol-v1"),
658 },
659 "limits": {
660 "max_frame_bytes": sourced_value(MAX_FRAME_BYTES, "protocol-v1"),
661 "max_hello_bytes": sourced_value(MAX_HELLO_BYTES, "protocol-v1"),
662 "connections_open": sourced_value(snapshot.connections_open, "runtime"),
663 },
664 "paths": {
665 "service_definition_dir": sourced_value(
666 service_definition_dir().display().to_string(),
667 service_definition_dir_source(),
668 ),
669 },
670 "spawn_budget": {
671 "default_attempts_per_window": sourced_value(DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, "default"),
672 "default_window_ms": sourced_value(duration_ms(DEFAULT_SPAWN_BUDGET_WINDOW), "default"),
673 "active_budget_rows": sourced_value(snapshot.spawn_budgets.len(), "runtime"),
674 },
675 "diagnostics": {
676 "bundle_format": sourced_value(DIAGNOSTIC_BUNDLE_FORMAT, "schema-v1"),
677 "bundle_mode": sourced_value(DIAGNOSTIC_BUNDLE_MODE, "schema-v1"),
678 "redactions": sourced_value(diagnostic_redaction_names(), "schema-v1"),
679 },
680 })
681}
682
683fn service_definition_dir_source() -> &'static str {
684 if std::env::var_os(SERVICE_DEF_DIR_ENV).is_some() {
685 "env:RUNNING_PROCESS_SERVICE_DEF_DIR"
686 } else {
687 "platform-default"
688 }
689}
690
691fn diagnostic_bundle_entries_json(snapshot: &AdminSnapshot) -> Vec<serde_json::Value> {
692 vec![
693 diagnostic_bundle_entry("admin/status.json", "json", "status", true, false, None),
694 diagnostic_bundle_entry("admin/dump.json", "json", "dump", true, true, None),
695 diagnostic_bundle_entry(
696 "config/effective.json",
697 "json",
698 "effective-config",
699 true,
700 false,
701 None,
702 ),
703 diagnostic_bundle_entry(
704 "metrics/openmetrics.txt",
705 "openmetrics",
706 "metrics",
707 true,
708 false,
709 None,
710 ),
711 diagnostic_bundle_entry(
712 "events/lifecycle.jsonl",
713 "jsonl",
714 "lifecycle-events",
715 false,
716 true,
717 None,
718 ),
719 diagnostic_bundle_entry(
720 "manifest/backend-manifests.json",
721 "json",
722 "backend-manifest-index",
723 false,
724 true,
725 None,
726 ),
727 diagnostic_bundle_entry(
728 "process/backends.json",
729 "json",
730 "backend-table",
731 true,
732 true,
733 Some(snapshot.backends.len()),
734 ),
735 diagnostic_bundle_entry(
736 "system/summary.json",
737 "json",
738 "host-summary",
739 false,
740 true,
741 None,
742 ),
743 ]
744}
745
746fn diagnostic_bundle_file_paths(snapshot: &AdminSnapshot) -> Vec<String> {
747 diagnostic_bundle_entries_json(snapshot)
748 .into_iter()
749 .filter_map(|entry| {
750 entry
751 .get("path")
752 .and_then(serde_json::Value::as_str)
753 .map(str::to_owned)
754 })
755 .collect()
756}
757
758fn diagnostic_bundle_entry(
759 path: &str,
760 kind: &str,
761 source: &str,
762 required: bool,
763 redacted: bool,
764 record_count: Option<usize>,
765) -> serde_json::Value {
766 let mut entry = json!({
767 "path": path,
768 "kind": kind,
769 "source": source,
770 "required": required,
771 "redacted": redacted,
772 });
773 if let Some(record_count) = record_count {
774 entry["record_count"] = json!(record_count);
775 }
776 entry
777}
778
779fn diagnostic_redaction_names() -> Vec<&'static str> {
780 DIAGNOSTIC_REDACTIONS.to_vec()
781}
782
783fn diagnostic_redaction_policy_json() -> Vec<serde_json::Value> {
784 vec![
785 json!({
786 "name": "home",
787 "replacement": "~",
788 }),
789 json!({
790 "name": "secret-env",
791 "matches": ["KEY", "TOKEN", "SECRET", "PASS"],
792 }),
793 json!({
794 "name": "acl-identities",
795 "replacement": "stable-hash",
796 }),
797 ]
798}
799
800fn sourced_value(value: impl Serialize, source: &'static str) -> serde_json::Value {
801 json!({
802 "value": value,
803 "source": source,
804 })
805}
806
807fn duration_ms(duration: Duration) -> u64 {
808 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
809}
810
811fn unix_now_ms() -> u64 {
812 SystemTime::now()
813 .duration_since(UNIX_EPOCH)
814 .map(|duration| duration.as_millis() as u64)
815 .unwrap_or(0)
816}