use std::io::{self, Read, Write};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use interprocess::local_socket::traits::Listener;
use prost::Message;
use serde::Serialize;
use serde_json::json;
use crate::broker::protocol::{
read_frame, write_frame, AdminReply, AdminReplyKind, AdminRequest, AdminVerb, Frame, FrameKind,
FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES, MAX_HELLO_BYTES,
PROTOCOL_VERSION,
};
use super::backend_registry::BackendRegistry;
use super::connection::{bind_local_socket, BrokerConnectionError, LocalSocketCleanup};
use super::service_def_loader::{service_definition_dir, SERVICE_DEF_DIR_ENV};
use super::spawn_coordinator::{
SpawnBudgetSnapshot, DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, DEFAULT_SPAWN_BUDGET_WINDOW,
};
use crate::broker::server::metrics::{MetricKind, BROKER_METRICS};
pub const ADMIN_SCHEMA_VERSION: u32 = 1;
pub use crate::broker::protocol::registry::ADMIN_PAYLOAD_PROTOCOL;
const DIAGNOSTIC_BUNDLE_FORMAT: &str = "tar.gz";
const DIAGNOSTIC_BUNDLE_MODE: &str = "metadata-only";
const DIAGNOSTIC_REDACTIONS: &[&str] = &["home", "secret-env", "acl-identities"];
#[derive(Clone, Debug)]
pub struct AdminSnapshot {
pub broker_instance: String,
pub broker_pid: u32,
pub generated_at_unix_ms: u64,
pub uptime: Duration,
pub accepting_hello: bool,
pub connections_open: u64,
pub backends: Vec<AdminBackend>,
pub spawn_budgets: Vec<AdminSpawnBudget>,
pub fd_pressure_demoted: bool,
pub inode_pressure: AdminInodePressure,
}
#[derive(Clone, Debug, Default)]
pub struct AdminInodePressure {
pub supported: bool,
pub error: Option<String>,
pub total: u64,
pub free: u64,
}
impl AdminInodePressure {
pub fn probe() -> Self {
match crate::broker::fs_health::daemon_data_dir_inode_usage() {
Ok(Some(usage)) => Self {
supported: true,
error: None,
total: usage.total,
free: usage.free,
},
Ok(None) => Self::default(),
Err(err) => Self {
supported: false,
error: Some(err.to_string()),
total: 0,
free: 0,
},
}
}
fn to_json(&self) -> serde_json::Value {
if self.supported {
let usage = crate::broker::fs_health::InodeUsage {
total: self.total,
free: self.free,
};
json!({
"supported": true,
"inodes_total": self.total,
"inodes_free": self.free,
"inodes_used": usage.used(),
"used_ratio": usage.used_ratio(),
})
} else {
json!({
"supported": false,
"detail": self.error.clone().unwrap_or_else(|| {
"inode accounting not applicable on this filesystem".into()
}),
})
}
}
}
impl AdminSnapshot {
pub fn local_not_serving() -> Self {
Self {
broker_instance: "local".into(),
broker_pid: std::process::id(),
generated_at_unix_ms: unix_now_ms(),
uptime: Duration::ZERO,
accepting_hello: false,
connections_open: 0,
backends: Vec::new(),
spawn_budgets: Vec::new(),
fd_pressure_demoted: false,
inode_pressure: AdminInodePressure::probe(),
}
}
pub fn from_registry(
broker_instance: impl Into<String>,
uptime: Duration,
accepting_hello: bool,
connections_open: u64,
registry: &BackendRegistry,
spawn_budgets: &[SpawnBudgetSnapshot],
) -> Self {
Self::from_registry_at(
broker_instance,
std::process::id(),
unix_now_ms(),
uptime,
accepting_hello,
connections_open,
registry,
spawn_budgets,
)
}
#[allow(clippy::too_many_arguments)]
pub fn from_registry_at(
broker_instance: impl Into<String>,
broker_pid: u32,
generated_at_unix_ms: u64,
uptime: Duration,
accepting_hello: bool,
connections_open: u64,
registry: &BackendRegistry,
spawn_budgets: &[SpawnBudgetSnapshot],
) -> Self {
Self {
broker_instance: broker_instance.into(),
broker_pid,
generated_at_unix_ms,
uptime,
accepting_hello,
connections_open,
backends: registry
.iter()
.map(|(_key, handle)| AdminBackend {
service_name: handle.service_name.clone(),
service_version: handle.service_version.clone(),
pid: handle.daemon_process.pid,
backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
last_active_unix_ms: handle.daemon_process.started_at_unix_ms,
state: if handle.is_alive() {
"running".into()
} else {
"stale".into()
},
last_hello_unix_ms: 0,
last_error: None,
})
.collect(),
spawn_budgets: spawn_budgets
.iter()
.map(AdminSpawnBudget::from_snapshot)
.collect(),
fd_pressure_demoted: false,
inode_pressure: AdminInodePressure::probe(),
}
}
pub fn with_fd_pressure_demoted(mut self, demoted: bool) -> Self {
self.fd_pressure_demoted = demoted;
self
}
pub fn with_inode_pressure(mut self, inode_pressure: AdminInodePressure) -> Self {
self.inode_pressure = inode_pressure;
self
}
}
#[derive(Clone, Debug)]
pub struct AdminBackend {
pub service_name: String,
pub service_version: String,
pub pid: u32,
pub backend_pipe: String,
pub last_active_unix_ms: u64,
pub state: String,
pub last_hello_unix_ms: u64,
pub last_error: Option<String>,
}
#[derive(Clone, Debug)]
pub struct AdminSpawnBudget {
pub broker_instance: String,
pub service_name: String,
pub service_version: String,
pub attempts_used: u32,
pub remaining: u32,
pub in_flight: bool,
pub retry_after_ms: Option<u64>,
}
impl AdminSpawnBudget {
fn from_snapshot(snapshot: &SpawnBudgetSnapshot) -> Self {
Self {
broker_instance: snapshot.key.instance.id(),
service_name: snapshot.key.service_name.clone(),
service_version: snapshot.key.service_version.clone(),
attempts_used: snapshot.attempts_used,
remaining: snapshot.remaining,
in_flight: snapshot.in_flight,
retry_after_ms: snapshot
.retry_after
.map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)),
}
}
}
pub fn render_status_json(snapshot: &AdminSnapshot) -> String {
json!({
"schema_version": ADMIN_SCHEMA_VERSION,
"command": "status",
"generated_at_unix_ms": snapshot.generated_at_unix_ms,
"broker_instance": snapshot.broker_instance,
"broker_pid": snapshot.broker_pid,
"uptime_seconds": snapshot.uptime.as_secs_f64(),
"accepting_hello": snapshot.accepting_hello,
"connections_open": snapshot.connections_open,
"fd_pressure": {
"demoted": snapshot.fd_pressure_demoted,
},
"inode_pressure": snapshot.inode_pressure.to_json(),
"backends": snapshot.backends.iter().map(|backend| {
json!({
"service_name": backend.service_name,
"service_version": backend.service_version,
"pid": backend.pid,
"backend_pipe": backend.backend_pipe,
"last_active_unix_ms": backend.last_active_unix_ms,
"state": backend.state,
})
}).collect::<Vec<_>>(),
})
.to_string()
}
pub fn render_dump_json(snapshot: &AdminSnapshot) -> String {
json!({
"schema_version": ADMIN_SCHEMA_VERSION,
"command": "dump",
"generated_at_unix_ms": snapshot.generated_at_unix_ms,
"broker_instance": snapshot.broker_instance,
"effective_config": effective_config_json(snapshot),
"backend_table": snapshot.backends.iter().map(|backend| {
json!({
"service_name": backend.service_name,
"service_version": backend.service_version,
"pid": backend.pid,
"backend_pipe": backend.backend_pipe,
"state": backend.state,
})
}).collect::<Vec<_>>(),
"spawn_budgets": snapshot.spawn_budgets.iter().map(|budget| {
json!({
"broker_instance": budget.broker_instance,
"service_name": budget.service_name,
"service_version": budget.service_version,
"attempts_used": budget.attempts_used,
"remaining": budget.remaining,
"in_flight": budget.in_flight,
"retry_after_ms": budget.retry_after_ms,
})
}).collect::<Vec<_>>(),
"recent_lifecycle_events": [],
})
.to_string()
}
pub fn render_list_instances_json(snapshot: &AdminSnapshot) -> String {
json!({
"schema_version": ADMIN_SCHEMA_VERSION,
"command": "list-instances",
"generated_at_unix_ms": snapshot.generated_at_unix_ms,
"instances": [{
"broker_instance": snapshot.broker_instance,
"pipe": "",
"pid": snapshot.broker_pid,
"state": if snapshot.accepting_hello { "running" } else { "not-serving" },
}],
})
.to_string()
}
pub fn render_backend_health_json(snapshot: &AdminSnapshot, service_name: &str) -> String {
json!({
"schema_version": ADMIN_SCHEMA_VERSION,
"command": "backend-health",
"generated_at_unix_ms": snapshot.generated_at_unix_ms,
"service_name": service_name,
"backends": snapshot.backends.iter()
.filter(|backend| backend.service_name == service_name)
.map(|backend| {
json!({
"service_version": backend.service_version,
"pid": backend.pid,
"state": backend.state,
"last_hello_unix_ms": backend.last_hello_unix_ms,
"last_error": backend.last_error,
})
})
.collect::<Vec<_>>(),
})
.to_string()
}
pub fn render_config_json(snapshot: &AdminSnapshot) -> String {
json!({
"schema_version": ADMIN_SCHEMA_VERSION,
"command": "config",
"generated_at_unix_ms": snapshot.generated_at_unix_ms,
"values": effective_config_json(snapshot),
})
.to_string()
}
pub fn render_diagnose_json(snapshot: &AdminSnapshot, output: &str) -> String {
let entries = diagnostic_bundle_entries_json(snapshot);
json!({
"schema_version": ADMIN_SCHEMA_VERSION,
"command": "diagnose",
"generated_at_unix_ms": snapshot.generated_at_unix_ms,
"output": output,
"bundle": {
"format": DIAGNOSTIC_BUNDLE_FORMAT,
"mode": DIAGNOSTIC_BUNDLE_MODE,
"created": false,
"entries": entries,
},
"files": diagnostic_bundle_file_paths(snapshot),
"redactions": diagnostic_redaction_names(),
"redaction_policy": diagnostic_redaction_policy_json(),
})
.to_string()
}
pub fn render_metrics_text(snapshot: &AdminSnapshot) -> String {
let mut out = String::new();
for metric in BROKER_METRICS {
out.push_str("# TYPE ");
out.push_str(metric.name);
out.push(' ');
out.push_str(metric_kind_name(metric.kind));
out.push('\n');
if metric.labels.is_empty() {
out.push_str(metric.name);
out.push(' ');
out.push_str(&metric_value(metric.name, snapshot));
out.push('\n');
}
}
out.push_str("# EOF\n");
out
}
pub fn render_healthz() -> &'static str {
"ok\n"
}
pub fn render_readyz(snapshot: &AdminSnapshot) -> &'static str {
if snapshot.accepting_hello {
"ready\n"
} else {
"not ready\n"
}
}
pub fn render_admin_reply(snapshot: &AdminSnapshot, request: &AdminRequest) -> AdminReply {
match AdminVerb::try_from(request.verb) {
Ok(AdminVerb::Status) => {
if request.json {
json_reply(render_status_json(snapshot))
} else {
text_reply(
format!(
"broker_instance: {}\naccepting_hello: {}\n",
snapshot.broker_instance, snapshot.accepting_hello
),
0,
)
}
}
Ok(AdminVerb::Dump) => json_reply(render_dump_json(snapshot)),
Ok(AdminVerb::ListInstances) => json_reply(render_list_instances_json(snapshot)),
Ok(AdminVerb::Healthz) => text_reply(render_healthz(), 0),
Ok(AdminVerb::Readyz) => {
let exit_code = if snapshot.accepting_hello { 0 } else { 1 };
text_reply(render_readyz(snapshot), exit_code)
}
Ok(AdminVerb::BackendHealth) => {
let service_name = if request.service_name.is_empty() {
"unknown"
} else {
&request.service_name
};
json_reply(render_backend_health_json(snapshot, service_name))
}
Ok(AdminVerb::Config) => json_reply(render_config_json(snapshot)),
Ok(AdminVerb::Diagnose) => {
let output = if request.output_path.is_empty() {
"bundle.tar.gz"
} else {
&request.output_path
};
json_reply(render_diagnose_json(snapshot, output))
}
Ok(AdminVerb::Metrics) => AdminReply {
kind: AdminReplyKind::Openmetrics as i32,
body: render_metrics_text(snapshot),
exit_code: 0,
content_type: "application/openmetrics-text".into(),
},
Ok(AdminVerb::Unspecified) | Err(_) => text_reply("unsupported admin verb\n", 2),
}
}
pub fn handle_admin_frame(
frame: Frame,
snapshot: &AdminSnapshot,
) -> Result<Frame, AdminFrameError> {
if frame.envelope_version != PROTOCOL_VERSION {
return Err(AdminFrameError::UnsupportedEnvelopeVersion(
frame.envelope_version,
));
}
if FrameKind::try_from(frame.kind) != Ok(FrameKind::Request) {
return Err(AdminFrameError::UnexpectedKind(frame.kind));
}
if frame.payload_protocol != ADMIN_PAYLOAD_PROTOCOL {
return Err(AdminFrameError::UnexpectedPayloadProtocol(
frame.payload_protocol,
));
}
if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
return Err(AdminFrameError::UnsupportedPayloadEncoding(
frame.payload_encoding,
));
}
let request =
AdminRequest::decode(frame.payload.as_slice()).map_err(AdminFrameError::Decode)?;
let reply = render_admin_reply(snapshot, &request);
Ok(Frame {
envelope_version: PROTOCOL_VERSION,
kind: FrameKind::Response as i32,
payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
payload: reply.encode_to_vec(),
request_id: frame.request_id,
payload_encoding: PayloadEncoding::None as i32,
deadline_unix_ms: 0,
traceparent: frame.traceparent,
tracestate: frame.tracestate,
})
}
pub fn handle_admin_connection<S: Read + Write>(
stream: &mut S,
snapshot: &AdminSnapshot,
) -> Result<AdminReply, AdminConnectionError> {
let request_bytes = read_frame(stream)?;
let request_frame =
Frame::decode(request_bytes.as_slice()).map_err(AdminConnectionError::DecodeFrame)?;
let response_frame = handle_admin_frame(request_frame, snapshot)?;
write_frame(stream, &response_frame.encode_to_vec())?;
AdminReply::decode(response_frame.payload.as_slice()).map_err(AdminConnectionError::DecodeReply)
}
pub fn serve_one_admin_socket(
socket_path: &str,
snapshot: &AdminSnapshot,
) -> Result<AdminReply, AdminConnectionError> {
let listener = bind_local_socket(socket_path)?;
let cleanup = LocalSocketCleanup(socket_path);
let result = (|| {
let mut stream = listener.accept()?;
handle_admin_connection(&mut stream, snapshot)
})();
drop(listener);
drop(cleanup);
result
}
#[derive(Debug, thiserror::Error)]
pub enum AdminFrameError {
#[error("unsupported admin frame envelope_version {0}")]
UnsupportedEnvelopeVersion(u32),
#[error("admin frame kind must be REQUEST, got {0}")]
UnexpectedKind(i32),
#[error("admin frame payload_protocol must be 0xAD01, got {0}")]
UnexpectedPayloadProtocol(u32),
#[error("admin frame payload must not be compressed, got {0}")]
UnsupportedPayloadEncoding(i32),
#[error(transparent)]
Decode(prost::DecodeError),
}
#[derive(Debug, thiserror::Error)]
pub enum AdminConnectionError {
#[error(transparent)]
Framing(#[from] FramingError),
#[error("failed to decode admin request Frame: {0}")]
DecodeFrame(prost::DecodeError),
#[error(transparent)]
AdminFrame(#[from] AdminFrameError),
#[error("failed to decode admin reply payload: {0}")]
DecodeReply(prost::DecodeError),
#[error(transparent)]
LocalSocket(#[from] BrokerConnectionError),
#[error(transparent)]
Io(#[from] io::Error),
}
fn json_reply(body: String) -> AdminReply {
AdminReply {
kind: AdminReplyKind::Json as i32,
body,
exit_code: 0,
content_type: "application/json".into(),
}
}
fn text_reply(body: impl Into<String>, exit_code: u32) -> AdminReply {
AdminReply {
kind: AdminReplyKind::Text as i32,
body: body.into(),
exit_code,
content_type: "text/plain".into(),
}
}
fn metric_kind_name(kind: MetricKind) -> &'static str {
match kind {
MetricKind::Counter => "counter",
MetricKind::Gauge => "gauge",
MetricKind::Histogram => "histogram",
}
}
fn metric_value(name: &str, snapshot: &AdminSnapshot) -> String {
match name {
"running_process_broker_v1_connections_open" => snapshot.connections_open.to_string(),
"running_process_broker_v1_fd_usage_ratio" => "0".into(),
"running_process_broker_v1_uptime_seconds" => snapshot.uptime.as_secs().to_string(),
_ => "0".into(),
}
}
fn effective_config_json(snapshot: &AdminSnapshot) -> serde_json::Value {
json!({
"broker": {
"broker_instance": sourced_value(&snapshot.broker_instance, "runtime"),
"broker_pid": sourced_value(snapshot.broker_pid, "runtime"),
"accepting_hello": sourced_value(snapshot.accepting_hello, "runtime"),
},
"protocol": {
"admin_payload_protocol": sourced_value(format!("0x{ADMIN_PAYLOAD_PROTOCOL:04X}"), "protocol-v1"),
"envelope_version": sourced_value(PROTOCOL_VERSION, "protocol-v1"),
"framing_version": sourced_value(ENVELOPE_VERSION, "protocol-v1"),
},
"limits": {
"max_frame_bytes": sourced_value(MAX_FRAME_BYTES, "protocol-v1"),
"max_hello_bytes": sourced_value(MAX_HELLO_BYTES, "protocol-v1"),
"connections_open": sourced_value(snapshot.connections_open, "runtime"),
},
"paths": {
"service_definition_dir": sourced_value(
service_definition_dir().display().to_string(),
service_definition_dir_source(),
),
},
"spawn_budget": {
"default_attempts_per_window": sourced_value(DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW, "default"),
"default_window_ms": sourced_value(duration_ms(DEFAULT_SPAWN_BUDGET_WINDOW), "default"),
"active_budget_rows": sourced_value(snapshot.spawn_budgets.len(), "runtime"),
},
"diagnostics": {
"bundle_format": sourced_value(DIAGNOSTIC_BUNDLE_FORMAT, "schema-v1"),
"bundle_mode": sourced_value(DIAGNOSTIC_BUNDLE_MODE, "schema-v1"),
"redactions": sourced_value(diagnostic_redaction_names(), "schema-v1"),
},
})
}
fn service_definition_dir_source() -> &'static str {
if std::env::var_os(SERVICE_DEF_DIR_ENV).is_some() {
"env:RUNNING_PROCESS_SERVICE_DEF_DIR"
} else {
"platform-default"
}
}
fn diagnostic_bundle_entries_json(snapshot: &AdminSnapshot) -> Vec<serde_json::Value> {
vec![
diagnostic_bundle_entry("admin/status.json", "json", "status", true, false, None),
diagnostic_bundle_entry("admin/dump.json", "json", "dump", true, true, None),
diagnostic_bundle_entry(
"config/effective.json",
"json",
"effective-config",
true,
false,
None,
),
diagnostic_bundle_entry(
"metrics/openmetrics.txt",
"openmetrics",
"metrics",
true,
false,
None,
),
diagnostic_bundle_entry(
"events/lifecycle.jsonl",
"jsonl",
"lifecycle-events",
false,
true,
None,
),
diagnostic_bundle_entry(
"manifest/backend-manifests.json",
"json",
"backend-manifest-index",
false,
true,
None,
),
diagnostic_bundle_entry(
"process/backends.json",
"json",
"backend-table",
true,
true,
Some(snapshot.backends.len()),
),
diagnostic_bundle_entry(
"system/summary.json",
"json",
"host-summary",
false,
true,
None,
),
]
}
fn diagnostic_bundle_file_paths(snapshot: &AdminSnapshot) -> Vec<String> {
diagnostic_bundle_entries_json(snapshot)
.into_iter()
.filter_map(|entry| {
entry
.get("path")
.and_then(serde_json::Value::as_str)
.map(str::to_owned)
})
.collect()
}
fn diagnostic_bundle_entry(
path: &str,
kind: &str,
source: &str,
required: bool,
redacted: bool,
record_count: Option<usize>,
) -> serde_json::Value {
let mut entry = json!({
"path": path,
"kind": kind,
"source": source,
"required": required,
"redacted": redacted,
});
if let Some(record_count) = record_count {
entry["record_count"] = json!(record_count);
}
entry
}
fn diagnostic_redaction_names() -> Vec<&'static str> {
DIAGNOSTIC_REDACTIONS.to_vec()
}
fn diagnostic_redaction_policy_json() -> Vec<serde_json::Value> {
vec![
json!({
"name": "home",
"replacement": "~",
}),
json!({
"name": "secret-env",
"matches": ["KEY", "TOKEN", "SECRET", "PASS"],
}),
json!({
"name": "acl-identities",
"replacement": "stable-hash",
}),
]
}
fn sourced_value(value: impl Serialize, source: &'static str) -> serde_json::Value {
json!({
"value": value,
"source": source,
})
}
fn duration_ms(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
fn unix_now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or(0)
}