1use std::io::{self, Read, Write};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use interprocess::local_socket::traits::Listener;
7use prost::Message;
8use serde::Serialize;
9use serde_json::json;
10
11use crate::broker::protocol::{
12 read_frame, write_frame, AdminReply, AdminReplyKind, AdminRequest, AdminVerb, Frame, FrameKind,
13 FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES, MAX_HELLO_BYTES,
14};
15
16use super::backend_registry::BackendRegistry;
17use super::connection::{bind_local_socket, BrokerConnectionError, LocalSocketCleanup};
18use super::service_def_loader::{service_definition_dir, SERVICE_DEF_DIR_ENV};
19use super::spawn_coordinator::{
20 SpawnBudgetSnapshot, DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, DEFAULT_SPAWN_BUDGET_WINDOW,
21};
22use crate::broker::server::metrics::{MetricKind, BROKER_METRICS};
23
24pub const ADMIN_SCHEMA_VERSION: u32 = 1;
26pub const ADMIN_PAYLOAD_PROTOCOL: u32 = 0xAD01;
28
29const PROTOCOL_VERSION: u32 = 1;
30const DIAGNOSTIC_BUNDLE_FORMAT: &str = "tar.gz";
31const DIAGNOSTIC_BUNDLE_MODE: &str = "metadata-only";
32const DIAGNOSTIC_REDACTIONS: &[&str] = &["home", "secret-env", "acl-identities"];
33
34#[derive(Clone, Debug)]
36pub struct AdminSnapshot {
37 pub broker_instance: String,
39 pub broker_pid: u32,
41 pub generated_at_unix_ms: u64,
43 pub uptime: Duration,
45 pub accepting_hello: bool,
47 pub connections_open: u64,
49 pub backends: Vec<AdminBackend>,
51 pub spawn_budgets: Vec<AdminSpawnBudget>,
53}
54
55impl AdminSnapshot {
56 pub fn local_not_serving() -> Self {
58 Self {
59 broker_instance: "local".into(),
60 broker_pid: std::process::id(),
61 generated_at_unix_ms: unix_now_ms(),
62 uptime: Duration::ZERO,
63 accepting_hello: false,
64 connections_open: 0,
65 backends: Vec::new(),
66 spawn_budgets: Vec::new(),
67 }
68 }
69
70 pub fn from_registry(
72 broker_instance: impl Into<String>,
73 uptime: Duration,
74 accepting_hello: bool,
75 connections_open: u64,
76 registry: &BackendRegistry,
77 spawn_budgets: &[SpawnBudgetSnapshot],
78 ) -> Self {
79 Self::from_registry_at(
80 broker_instance,
81 std::process::id(),
82 unix_now_ms(),
83 uptime,
84 accepting_hello,
85 connections_open,
86 registry,
87 spawn_budgets,
88 )
89 }
90
91 #[allow(clippy::too_many_arguments)]
93 pub fn from_registry_at(
94 broker_instance: impl Into<String>,
95 broker_pid: u32,
96 generated_at_unix_ms: u64,
97 uptime: Duration,
98 accepting_hello: bool,
99 connections_open: u64,
100 registry: &BackendRegistry,
101 spawn_budgets: &[SpawnBudgetSnapshot],
102 ) -> Self {
103 Self {
104 broker_instance: broker_instance.into(),
105 broker_pid,
106 generated_at_unix_ms,
107 uptime,
108 accepting_hello,
109 connections_open,
110 backends: registry
111 .iter()
112 .map(|(_key, handle)| AdminBackend {
113 service_name: handle.service_name.clone(),
114 service_version: handle.service_version.clone(),
115 pid: handle.daemon_process.pid,
116 backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
117 last_active_unix_ms: handle.daemon_process.started_at_unix_ms,
118 state: if handle.is_alive() {
119 "running".into()
120 } else {
121 "stale".into()
122 },
123 last_hello_unix_ms: 0,
124 last_error: None,
125 })
126 .collect(),
127 spawn_budgets: spawn_budgets
128 .iter()
129 .map(AdminSpawnBudget::from_snapshot)
130 .collect(),
131 }
132 }
133}
134
135#[derive(Clone, Debug)]
137pub struct AdminBackend {
138 pub service_name: String,
140 pub service_version: String,
142 pub pid: u32,
144 pub backend_pipe: String,
146 pub last_active_unix_ms: u64,
148 pub state: String,
150 pub last_hello_unix_ms: u64,
152 pub last_error: Option<String>,
154}
155
156#[derive(Clone, Debug)]
158pub struct AdminSpawnBudget {
159 pub broker_instance: String,
161 pub service_name: String,
163 pub service_version: String,
165 pub attempts_used: u32,
167 pub remaining: u32,
169 pub in_flight: bool,
171 pub retry_after_ms: Option<u64>,
173}
174
175impl AdminSpawnBudget {
176 fn from_snapshot(snapshot: &SpawnBudgetSnapshot) -> Self {
177 Self {
178 broker_instance: snapshot.key.instance.id(),
179 service_name: snapshot.key.service_name.clone(),
180 service_version: snapshot.key.service_version.clone(),
181 attempts_used: snapshot.attempts_used,
182 remaining: snapshot.remaining,
183 in_flight: snapshot.in_flight,
184 retry_after_ms: snapshot
185 .retry_after
186 .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)),
187 }
188 }
189}
190
191pub fn render_status_json(snapshot: &AdminSnapshot) -> String {
193 json!({
194 "schema_version": ADMIN_SCHEMA_VERSION,
195 "command": "status",
196 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
197 "broker_instance": snapshot.broker_instance,
198 "broker_pid": snapshot.broker_pid,
199 "uptime_seconds": snapshot.uptime.as_secs_f64(),
200 "accepting_hello": snapshot.accepting_hello,
201 "connections_open": snapshot.connections_open,
202 "backends": snapshot.backends.iter().map(|backend| {
203 json!({
204 "service_name": backend.service_name,
205 "service_version": backend.service_version,
206 "pid": backend.pid,
207 "backend_pipe": backend.backend_pipe,
208 "last_active_unix_ms": backend.last_active_unix_ms,
209 "state": backend.state,
210 })
211 }).collect::<Vec<_>>(),
212 })
213 .to_string()
214}
215
216pub fn render_dump_json(snapshot: &AdminSnapshot) -> String {
218 json!({
219 "schema_version": ADMIN_SCHEMA_VERSION,
220 "command": "dump",
221 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
222 "broker_instance": snapshot.broker_instance,
223 "effective_config": effective_config_json(snapshot),
224 "backend_table": snapshot.backends.iter().map(|backend| {
225 json!({
226 "service_name": backend.service_name,
227 "service_version": backend.service_version,
228 "pid": backend.pid,
229 "backend_pipe": backend.backend_pipe,
230 "state": backend.state,
231 })
232 }).collect::<Vec<_>>(),
233 "spawn_budgets": snapshot.spawn_budgets.iter().map(|budget| {
234 json!({
235 "broker_instance": budget.broker_instance,
236 "service_name": budget.service_name,
237 "service_version": budget.service_version,
238 "attempts_used": budget.attempts_used,
239 "remaining": budget.remaining,
240 "in_flight": budget.in_flight,
241 "retry_after_ms": budget.retry_after_ms,
242 })
243 }).collect::<Vec<_>>(),
244 "recent_lifecycle_events": [],
245 })
246 .to_string()
247}
248
249pub fn render_list_instances_json(snapshot: &AdminSnapshot) -> String {
251 json!({
252 "schema_version": ADMIN_SCHEMA_VERSION,
253 "command": "list-instances",
254 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
255 "instances": [{
256 "broker_instance": snapshot.broker_instance,
257 "pipe": "",
258 "pid": snapshot.broker_pid,
259 "state": if snapshot.accepting_hello { "running" } else { "not-serving" },
260 }],
261 })
262 .to_string()
263}
264
265pub fn render_backend_health_json(snapshot: &AdminSnapshot, service_name: &str) -> String {
267 json!({
268 "schema_version": ADMIN_SCHEMA_VERSION,
269 "command": "backend-health",
270 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
271 "service_name": service_name,
272 "backends": snapshot.backends.iter()
273 .filter(|backend| backend.service_name == service_name)
274 .map(|backend| {
275 json!({
276 "service_version": backend.service_version,
277 "pid": backend.pid,
278 "state": backend.state,
279 "last_hello_unix_ms": backend.last_hello_unix_ms,
280 "last_error": backend.last_error,
281 })
282 })
283 .collect::<Vec<_>>(),
284 })
285 .to_string()
286}
287
288pub fn render_config_json(snapshot: &AdminSnapshot) -> String {
290 json!({
291 "schema_version": ADMIN_SCHEMA_VERSION,
292 "command": "config",
293 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
294 "values": effective_config_json(snapshot),
295 })
296 .to_string()
297}
298
299pub fn render_diagnose_json(snapshot: &AdminSnapshot, output: &str) -> String {
301 let entries = diagnostic_bundle_entries_json(snapshot);
302 json!({
303 "schema_version": ADMIN_SCHEMA_VERSION,
304 "command": "diagnose",
305 "generated_at_unix_ms": snapshot.generated_at_unix_ms,
306 "output": output,
307 "bundle": {
308 "format": DIAGNOSTIC_BUNDLE_FORMAT,
309 "mode": DIAGNOSTIC_BUNDLE_MODE,
310 "created": false,
311 "entries": entries,
312 },
313 "files": diagnostic_bundle_file_paths(snapshot),
314 "redactions": diagnostic_redaction_names(),
315 "redaction_policy": diagnostic_redaction_policy_json(),
316 })
317 .to_string()
318}
319
320pub fn render_metrics_text(snapshot: &AdminSnapshot) -> String {
322 let mut out = String::new();
323 for metric in BROKER_METRICS {
324 out.push_str("# TYPE ");
325 out.push_str(metric.name);
326 out.push(' ');
327 out.push_str(metric_kind_name(metric.kind));
328 out.push('\n');
329 if metric.labels.is_empty() {
330 out.push_str(metric.name);
331 out.push(' ');
332 out.push_str(&metric_value(metric.name, snapshot));
333 out.push('\n');
334 }
335 }
336 out.push_str("# EOF\n");
337 out
338}
339
340pub fn render_healthz() -> &'static str {
342 "ok\n"
343}
344
345pub fn render_readyz(snapshot: &AdminSnapshot) -> &'static str {
347 if snapshot.accepting_hello {
348 "ready\n"
349 } else {
350 "not ready\n"
351 }
352}
353
354pub fn render_admin_reply(snapshot: &AdminSnapshot, request: &AdminRequest) -> AdminReply {
356 match AdminVerb::try_from(request.verb) {
357 Ok(AdminVerb::Status) => {
358 if request.json {
359 json_reply(render_status_json(snapshot))
360 } else {
361 text_reply(
362 format!(
363 "broker_instance: {}\naccepting_hello: {}\n",
364 snapshot.broker_instance, snapshot.accepting_hello
365 ),
366 0,
367 )
368 }
369 }
370 Ok(AdminVerb::Dump) => json_reply(render_dump_json(snapshot)),
371 Ok(AdminVerb::ListInstances) => json_reply(render_list_instances_json(snapshot)),
372 Ok(AdminVerb::Healthz) => text_reply(render_healthz(), 0),
373 Ok(AdminVerb::Readyz) => {
374 let exit_code = if snapshot.accepting_hello { 0 } else { 1 };
375 text_reply(render_readyz(snapshot), exit_code)
376 }
377 Ok(AdminVerb::BackendHealth) => {
378 let service_name = if request.service_name.is_empty() {
379 "unknown"
380 } else {
381 &request.service_name
382 };
383 json_reply(render_backend_health_json(snapshot, service_name))
384 }
385 Ok(AdminVerb::Config) => json_reply(render_config_json(snapshot)),
386 Ok(AdminVerb::Diagnose) => {
387 let output = if request.output_path.is_empty() {
388 "bundle.tar.gz"
389 } else {
390 &request.output_path
391 };
392 json_reply(render_diagnose_json(snapshot, output))
393 }
394 Ok(AdminVerb::Metrics) => AdminReply {
395 kind: AdminReplyKind::Openmetrics as i32,
396 body: render_metrics_text(snapshot),
397 exit_code: 0,
398 content_type: "application/openmetrics-text".into(),
399 },
400 Ok(AdminVerb::Unspecified) | Err(_) => text_reply("unsupported admin verb\n", 2),
401 }
402}
403
404pub fn handle_admin_frame(
406 frame: Frame,
407 snapshot: &AdminSnapshot,
408) -> Result<Frame, AdminFrameError> {
409 if frame.envelope_version != PROTOCOL_VERSION {
410 return Err(AdminFrameError::UnsupportedEnvelopeVersion(
411 frame.envelope_version,
412 ));
413 }
414 if FrameKind::try_from(frame.kind) != Ok(FrameKind::Request) {
415 return Err(AdminFrameError::UnexpectedKind(frame.kind));
416 }
417 if frame.payload_protocol != ADMIN_PAYLOAD_PROTOCOL {
418 return Err(AdminFrameError::UnexpectedPayloadProtocol(
419 frame.payload_protocol,
420 ));
421 }
422 if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
423 return Err(AdminFrameError::UnsupportedPayloadEncoding(
424 frame.payload_encoding,
425 ));
426 }
427
428 let request =
429 AdminRequest::decode(frame.payload.as_slice()).map_err(AdminFrameError::Decode)?;
430 let reply = render_admin_reply(snapshot, &request);
431 Ok(Frame {
432 envelope_version: PROTOCOL_VERSION,
433 kind: FrameKind::Response as i32,
434 payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
435 payload: reply.encode_to_vec(),
436 request_id: frame.request_id,
437 payload_encoding: PayloadEncoding::None as i32,
438 deadline_unix_ms: 0,
439 traceparent: frame.traceparent,
440 tracestate: frame.tracestate,
441 })
442}
443
444pub fn handle_admin_connection<S: Read + Write>(
451 stream: &mut S,
452 snapshot: &AdminSnapshot,
453) -> Result<AdminReply, AdminConnectionError> {
454 let request_bytes = read_frame(stream)?;
455 let request_frame =
456 Frame::decode(request_bytes.as_slice()).map_err(AdminConnectionError::DecodeFrame)?;
457 let response_frame = handle_admin_frame(request_frame, snapshot)?;
458 write_frame(stream, &response_frame.encode_to_vec())?;
459 AdminReply::decode(response_frame.payload.as_slice()).map_err(AdminConnectionError::DecodeReply)
460}
461
462pub fn serve_one_admin_socket(
468 socket_path: &str,
469 snapshot: &AdminSnapshot,
470) -> Result<AdminReply, AdminConnectionError> {
471 let listener = bind_local_socket(socket_path)?;
472 let cleanup = LocalSocketCleanup(socket_path);
473 let result = (|| {
474 let mut stream = listener.accept()?;
475 handle_admin_connection(&mut stream, snapshot)
476 })();
477 drop(listener);
478 drop(cleanup);
479 result
480}
481
482#[derive(Debug, thiserror::Error)]
484pub enum AdminFrameError {
485 #[error("unsupported admin frame envelope_version {0}")]
487 UnsupportedEnvelopeVersion(u32),
488 #[error("admin frame kind must be REQUEST, got {0}")]
490 UnexpectedKind(i32),
491 #[error("admin frame payload_protocol must be 0xAD01, got {0}")]
493 UnexpectedPayloadProtocol(u32),
494 #[error("admin frame payload must not be compressed, got {0}")]
496 UnsupportedPayloadEncoding(i32),
497 #[error(transparent)]
499 Decode(prost::DecodeError),
500}
501
502#[derive(Debug, thiserror::Error)]
504pub enum AdminConnectionError {
505 #[error(transparent)]
507 Framing(#[from] FramingError),
508 #[error("failed to decode admin request Frame: {0}")]
510 DecodeFrame(prost::DecodeError),
511 #[error(transparent)]
513 AdminFrame(#[from] AdminFrameError),
514 #[error("failed to decode admin reply payload: {0}")]
516 DecodeReply(prost::DecodeError),
517 #[error(transparent)]
519 LocalSocket(#[from] BrokerConnectionError),
520 #[error(transparent)]
522 Io(#[from] io::Error),
523}
524
525fn json_reply(body: String) -> AdminReply {
526 AdminReply {
527 kind: AdminReplyKind::Json as i32,
528 body,
529 exit_code: 0,
530 content_type: "application/json".into(),
531 }
532}
533
534fn text_reply(body: impl Into<String>, exit_code: u32) -> AdminReply {
535 AdminReply {
536 kind: AdminReplyKind::Text as i32,
537 body: body.into(),
538 exit_code,
539 content_type: "text/plain".into(),
540 }
541}
542
543fn metric_kind_name(kind: MetricKind) -> &'static str {
544 match kind {
545 MetricKind::Counter => "counter",
546 MetricKind::Gauge => "gauge",
547 MetricKind::Histogram => "histogram",
548 }
549}
550
551fn metric_value(name: &str, snapshot: &AdminSnapshot) -> String {
552 match name {
553 "running_process_broker_v1_connections_open" => snapshot.connections_open.to_string(),
554 "running_process_broker_v1_fd_usage_ratio" => "0".into(),
555 "running_process_broker_v1_uptime_seconds" => snapshot.uptime.as_secs().to_string(),
556 _ => "0".into(),
557 }
558}
559
560fn effective_config_json(snapshot: &AdminSnapshot) -> serde_json::Value {
561 json!({
562 "broker": {
563 "broker_instance": sourced_value(&snapshot.broker_instance, "runtime"),
564 "broker_pid": sourced_value(snapshot.broker_pid, "runtime"),
565 "accepting_hello": sourced_value(snapshot.accepting_hello, "runtime"),
566 },
567 "protocol": {
568 "admin_payload_protocol": sourced_value(format!("0x{ADMIN_PAYLOAD_PROTOCOL:04X}"), "protocol-v1"),
569 "envelope_version": sourced_value(PROTOCOL_VERSION, "protocol-v1"),
570 "framing_version": sourced_value(ENVELOPE_VERSION, "protocol-v1"),
571 },
572 "limits": {
573 "max_frame_bytes": sourced_value(MAX_FRAME_BYTES, "protocol-v1"),
574 "max_hello_bytes": sourced_value(MAX_HELLO_BYTES, "protocol-v1"),
575 "connections_open": sourced_value(snapshot.connections_open, "runtime"),
576 },
577 "paths": {
578 "service_definition_dir": sourced_value(
579 service_definition_dir().display().to_string(),
580 service_definition_dir_source(),
581 ),
582 },
583 "spawn_budget": {
584 "default_attempts_per_window": sourced_value(DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, "default"),
585 "default_window_ms": sourced_value(duration_ms(DEFAULT_SPAWN_BUDGET_WINDOW), "default"),
586 "active_budget_rows": sourced_value(snapshot.spawn_budgets.len(), "runtime"),
587 },
588 "diagnostics": {
589 "bundle_format": sourced_value(DIAGNOSTIC_BUNDLE_FORMAT, "schema-v1"),
590 "bundle_mode": sourced_value(DIAGNOSTIC_BUNDLE_MODE, "schema-v1"),
591 "redactions": sourced_value(diagnostic_redaction_names(), "schema-v1"),
592 },
593 })
594}
595
596fn service_definition_dir_source() -> &'static str {
597 if std::env::var_os(SERVICE_DEF_DIR_ENV).is_some() {
598 "env:RUNNING_PROCESS_SERVICE_DEF_DIR"
599 } else {
600 "platform-default"
601 }
602}
603
604fn diagnostic_bundle_entries_json(snapshot: &AdminSnapshot) -> Vec<serde_json::Value> {
605 vec![
606 diagnostic_bundle_entry("admin/status.json", "json", "status", true, false, None),
607 diagnostic_bundle_entry("admin/dump.json", "json", "dump", true, true, None),
608 diagnostic_bundle_entry(
609 "config/effective.json",
610 "json",
611 "effective-config",
612 true,
613 false,
614 None,
615 ),
616 diagnostic_bundle_entry(
617 "metrics/openmetrics.txt",
618 "openmetrics",
619 "metrics",
620 true,
621 false,
622 None,
623 ),
624 diagnostic_bundle_entry(
625 "events/lifecycle.jsonl",
626 "jsonl",
627 "lifecycle-events",
628 false,
629 true,
630 None,
631 ),
632 diagnostic_bundle_entry(
633 "manifest/backend-manifests.json",
634 "json",
635 "backend-manifest-index",
636 false,
637 true,
638 None,
639 ),
640 diagnostic_bundle_entry(
641 "process/backends.json",
642 "json",
643 "backend-table",
644 true,
645 true,
646 Some(snapshot.backends.len()),
647 ),
648 diagnostic_bundle_entry(
649 "system/summary.json",
650 "json",
651 "host-summary",
652 false,
653 true,
654 None,
655 ),
656 ]
657}
658
659fn diagnostic_bundle_file_paths(snapshot: &AdminSnapshot) -> Vec<String> {
660 diagnostic_bundle_entries_json(snapshot)
661 .into_iter()
662 .filter_map(|entry| {
663 entry
664 .get("path")
665 .and_then(serde_json::Value::as_str)
666 .map(str::to_owned)
667 })
668 .collect()
669}
670
671fn diagnostic_bundle_entry(
672 path: &str,
673 kind: &str,
674 source: &str,
675 required: bool,
676 redacted: bool,
677 record_count: Option<usize>,
678) -> serde_json::Value {
679 let mut entry = json!({
680 "path": path,
681 "kind": kind,
682 "source": source,
683 "required": required,
684 "redacted": redacted,
685 });
686 if let Some(record_count) = record_count {
687 entry["record_count"] = json!(record_count);
688 }
689 entry
690}
691
692fn diagnostic_redaction_names() -> Vec<&'static str> {
693 DIAGNOSTIC_REDACTIONS.to_vec()
694}
695
696fn diagnostic_redaction_policy_json() -> Vec<serde_json::Value> {
697 vec![
698 json!({
699 "name": "home",
700 "replacement": "~",
701 }),
702 json!({
703 "name": "secret-env",
704 "matches": ["KEY", "TOKEN", "SECRET", "PASS"],
705 }),
706 json!({
707 "name": "acl-identities",
708 "replacement": "stable-hash",
709 }),
710 ]
711}
712
713fn sourced_value(value: impl Serialize, source: &'static str) -> serde_json::Value {
714 json!({
715 "value": value,
716 "source": source,
717 })
718}
719
720fn duration_ms(duration: Duration) -> u64 {
721 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
722}
723
724fn unix_now_ms() -> u64 {
725 SystemTime::now()
726 .duration_since(UNIX_EPOCH)
727 .map(|duration| duration.as_millis() as u64)
728 .unwrap_or(0)
729}