use crate::json::{Map, Value as JsonValue};
#[derive(Debug, Clone)]
pub(crate) struct ClusterStatusInputs {
pub(crate) snapshot_at_unix_ms: u64,
pub(crate) version: String,
pub(crate) phase: String,
pub(crate) uptime_secs: f64,
pub(crate) started_at_unix_ms: u64,
pub(crate) ready_at_unix_ms: Option<u64>,
pub(crate) read_only: bool,
pub(crate) deployment_shape: DeploymentShapeView,
pub(crate) process_role: ProcessRoleView,
pub(crate) transport: TransportSnapshot,
pub(crate) connections: ConnectionSnapshot,
pub(crate) storage: StorageSnapshot,
pub(crate) wal: WalSnapshot,
pub(crate) system: SystemSnapshot,
pub(crate) replication: ReplicationSnapshot,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DeploymentShapeView {
Embedded,
File,
Server,
Serverless,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ProcessRoleView {
Standalone,
Primary,
Replica,
Unknown,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct TransportListenerView {
pub(crate) transport: String,
pub(crate) bind_addr: String,
pub(crate) explicit: bool,
pub(crate) reason: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct TransportSnapshot {
pub(crate) active: Vec<TransportListenerView>,
pub(crate) failed: Vec<TransportListenerView>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ConnectionSnapshot {
pub(crate) active: u64,
pub(crate) idle: u64,
pub(crate) total_checkouts: u64,
pub(crate) max: Option<u64>,
}
#[derive(Debug, Clone)]
pub(crate) struct StorageSnapshot {
pub(crate) db_size_bytes: Option<u64>,
pub(crate) remote_backend: Option<String>,
pub(crate) encryption_state: String,
pub(crate) encryption_error: Option<String>,
pub(crate) paged_mode: bool,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct WalSnapshot {
pub(crate) current_lsn: u64,
pub(crate) last_archived_lsn: u64,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SystemSnapshot {
pub(crate) pid: u32,
pub(crate) cpu_cores: usize,
pub(crate) os: String,
pub(crate) arch: String,
pub(crate) hostname: String,
pub(crate) total_memory_bytes: Option<u64>,
pub(crate) available_memory_bytes: Option<u64>,
}
#[derive(Debug, Clone)]
pub(crate) struct ReplicationSnapshot {
pub(crate) role: ProcessRoleView,
pub(crate) commit_policy: String,
pub(crate) replicas: Vec<ReplicaView>,
pub(crate) apply_health: Option<String>,
pub(crate) apply_errors: Vec<(String, u64)>,
}
#[derive(Debug, Clone)]
pub(crate) struct ReplicaView {
pub(crate) id: String,
pub(crate) last_acked_lsn: u64,
pub(crate) last_sent_lsn: u64,
pub(crate) last_durable_lsn: u64,
pub(crate) last_seen_at_unix_ms: u128,
pub(crate) region: Option<String>,
}
pub(crate) fn unavailable_json(reason: &str) -> JsonValue {
let mut object = Map::new();
object.insert("available".to_string(), JsonValue::Bool(false));
object.insert("reason".to_string(), JsonValue::String(reason.to_string()));
JsonValue::Object(object)
}
pub(crate) fn cluster_status_json(inputs: &ClusterStatusInputs) -> JsonValue {
let mut object = Map::new();
object.insert(
"snapshot_at_unix_ms".to_string(),
JsonValue::Number(inputs.snapshot_at_unix_ms as f64),
);
object.insert(
"version".to_string(),
JsonValue::String(inputs.version.clone()),
);
object.insert("phase".to_string(), JsonValue::String(inputs.phase.clone()));
object.insert(
"uptime_secs".to_string(),
JsonValue::Number((inputs.uptime_secs * 1000.0).round() / 1000.0),
);
object.insert(
"started_at_unix_ms".to_string(),
JsonValue::Number(inputs.started_at_unix_ms as f64),
);
if let Some(ready) = inputs.ready_at_unix_ms {
object.insert(
"ready_at_unix_ms".to_string(),
JsonValue::Number(ready as f64),
);
}
object.insert("read_only".to_string(), JsonValue::Bool(inputs.read_only));
object.insert("deployment".to_string(), deployment_json(inputs));
object.insert("transports".to_string(), transport_json(&inputs.transport));
object.insert(
"connections".to_string(),
connections_json(&inputs.connections),
);
object.insert("storage".to_string(), storage_json(&inputs.storage));
object.insert("wal".to_string(), wal_json(&inputs.wal));
object.insert("system".to_string(), system_json(&inputs.system));
object.insert(
"throughput".to_string(),
unavailable_json("throughput_not_sampled"),
);
object.insert(
"latency".to_string(),
unavailable_json("latency_not_sampled"),
);
object.insert(
"last_error".to_string(),
unavailable_json("last_error_not_tracked"),
);
object.insert(
"replication".to_string(),
replication_json(&inputs.replication, inputs.wal.current_lsn),
);
JsonValue::Object(object)
}
fn deployment_json(inputs: &ClusterStatusInputs) -> JsonValue {
let mut object = Map::new();
object.insert(
"shape".to_string(),
match inputs.deployment_shape {
DeploymentShapeView::Embedded => JsonValue::String("embedded".to_string()),
DeploymentShapeView::File => JsonValue::String("file".to_string()),
DeploymentShapeView::Server => JsonValue::String("server".to_string()),
DeploymentShapeView::Serverless => JsonValue::String("serverless".to_string()),
DeploymentShapeView::Unknown => unavailable_json("deployment_shape_unknown"),
},
);
object.insert(
"process_role".to_string(),
match inputs.process_role {
ProcessRoleView::Standalone => JsonValue::String("standalone".to_string()),
ProcessRoleView::Primary => JsonValue::String("primary".to_string()),
ProcessRoleView::Replica => JsonValue::String("replica".to_string()),
ProcessRoleView::Unknown => unavailable_json("process_role_unknown"),
},
);
object.insert(
"container".to_string(),
unavailable_json("container_metadata_not_probed"),
);
JsonValue::Object(object)
}
fn transport_listener_json(listener: &TransportListenerView) -> JsonValue {
let mut object = Map::new();
object.insert(
"transport".to_string(),
JsonValue::String(listener.transport.clone()),
);
object.insert(
"bind_addr".to_string(),
JsonValue::String(listener.bind_addr.clone()),
);
object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
if let Some(reason) = &listener.reason {
object.insert("reason".to_string(), JsonValue::String(reason.clone()));
}
JsonValue::Object(object)
}
fn transport_json(transport: &TransportSnapshot) -> JsonValue {
let mut object = Map::new();
object.insert(
"active".to_string(),
JsonValue::Array(
transport
.active
.iter()
.map(transport_listener_json)
.collect(),
),
);
object.insert(
"failed".to_string(),
JsonValue::Array(
transport
.failed
.iter()
.map(transport_listener_json)
.collect(),
),
);
JsonValue::Object(object)
}
fn connections_json(conn: &ConnectionSnapshot) -> JsonValue {
let mut object = Map::new();
object.insert("active".to_string(), JsonValue::Number(conn.active as f64));
object.insert("idle".to_string(), JsonValue::Number(conn.idle as f64));
object.insert(
"total_checkouts".to_string(),
JsonValue::Number(conn.total_checkouts as f64),
);
object.insert(
"max".to_string(),
match conn.max {
Some(n) => JsonValue::Number(n as f64),
None => unavailable_json("max_connections_unconfigured"),
},
);
JsonValue::Object(object)
}
fn storage_json(storage: &StorageSnapshot) -> JsonValue {
let mut object = Map::new();
object.insert(
"db_size_bytes".to_string(),
match storage.db_size_bytes {
Some(n) => JsonValue::Number(n as f64),
None => unavailable_json("db_size_not_file_backed"),
},
);
object.insert(
"remote_backend".to_string(),
match &storage.remote_backend {
Some(name) => JsonValue::String(name.clone()),
None => JsonValue::Null,
},
);
object.insert(
"paged_mode".to_string(),
JsonValue::Bool(storage.paged_mode),
);
let mut enc = Map::new();
enc.insert(
"state".to_string(),
JsonValue::String(storage.encryption_state.clone()),
);
if let Some(err) = &storage.encryption_error {
enc.insert("error".to_string(), JsonValue::String(err.clone()));
}
object.insert("encryption_at_rest".to_string(), JsonValue::Object(enc));
JsonValue::Object(object)
}
fn wal_json(wal: &WalSnapshot) -> JsonValue {
let mut object = Map::new();
object.insert(
"current_lsn".to_string(),
JsonValue::Number(wal.current_lsn as f64),
);
object.insert(
"last_archived_lsn".to_string(),
JsonValue::Number(wal.last_archived_lsn as f64),
);
object.insert(
"archive_lag_records".to_string(),
JsonValue::Number(wal.current_lsn.saturating_sub(wal.last_archived_lsn) as f64),
);
object.insert(
"bytes".to_string(),
unavailable_json("wal_bytes_not_tracked"),
);
JsonValue::Object(object)
}
fn system_json(system: &SystemSnapshot) -> JsonValue {
let mut object = Map::new();
object.insert("pid".to_string(), JsonValue::Number(system.pid as f64));
object.insert(
"cpu_cores".to_string(),
JsonValue::Number(system.cpu_cores as f64),
);
object.insert("os".to_string(), JsonValue::String(system.os.clone()));
object.insert("arch".to_string(), JsonValue::String(system.arch.clone()));
object.insert(
"hostname".to_string(),
JsonValue::String(system.hostname.clone()),
);
object.insert(
"total_memory_bytes".to_string(),
match system.total_memory_bytes {
Some(n) => JsonValue::Number(n as f64),
None => unavailable_json("memory_probe_not_supported"),
},
);
object.insert(
"available_memory_bytes".to_string(),
match system.available_memory_bytes {
Some(n) => JsonValue::Number(n as f64),
None => unavailable_json("memory_probe_not_supported"),
},
);
object.insert(
"cpu_usage".to_string(),
unavailable_json("cpu_usage_not_sampled"),
);
object.insert(
"ram_usage".to_string(),
unavailable_json("ram_usage_not_sampled"),
);
JsonValue::Object(object)
}
fn replication_json(repl: &ReplicationSnapshot, current_lsn: u64) -> JsonValue {
let mut object = Map::new();
object.insert(
"role".to_string(),
match repl.role {
ProcessRoleView::Standalone => JsonValue::String("standalone".to_string()),
ProcessRoleView::Primary => JsonValue::String("primary".to_string()),
ProcessRoleView::Replica => JsonValue::String("replica".to_string()),
ProcessRoleView::Unknown => unavailable_json("process_role_unknown"),
},
);
object.insert(
"commit_policy".to_string(),
JsonValue::String(repl.commit_policy.clone()),
);
object.insert(
"replica_count".to_string(),
JsonValue::Number(repl.replicas.len() as f64),
);
let replicas = repl
.replicas
.iter()
.map(|r| {
let mut o = Map::new();
o.insert("id".to_string(), JsonValue::String(r.id.clone()));
o.insert(
"last_acked_lsn".to_string(),
JsonValue::Number(r.last_acked_lsn as f64),
);
o.insert(
"last_sent_lsn".to_string(),
JsonValue::Number(r.last_sent_lsn as f64),
);
o.insert(
"last_durable_lsn".to_string(),
JsonValue::Number(r.last_durable_lsn as f64),
);
o.insert(
"last_seen_at_unix_ms".to_string(),
JsonValue::Number(r.last_seen_at_unix_ms as f64),
);
o.insert(
"lag_records".to_string(),
JsonValue::Number(current_lsn.saturating_sub(r.last_acked_lsn) as f64),
);
o.insert(
"region".to_string(),
match &r.region {
Some(reg) => JsonValue::String(reg.clone()),
None => JsonValue::Null,
},
);
JsonValue::Object(o)
})
.collect();
object.insert("replicas".to_string(), JsonValue::Array(replicas));
let (apply_health_json, degraded_json) = match repl.role {
ProcessRoleView::Replica => match repl.apply_health.as_deref() {
Some(label) => {
let degraded = matches!(label, "stalled_gap" | "divergence" | "apply_error");
(
JsonValue::String(label.to_string()),
JsonValue::Bool(degraded),
)
}
None => (
unavailable_json("apply_health_not_observed"),
unavailable_json("apply_health_not_observed"),
),
},
ProcessRoleView::Standalone | ProcessRoleView::Primary => (
JsonValue::String("not_applicable".to_string()),
JsonValue::Bool(false),
),
ProcessRoleView::Unknown => (
unavailable_json("process_role_unknown"),
unavailable_json("process_role_unknown"),
),
};
object.insert("apply_health".to_string(), apply_health_json);
object.insert("degraded".to_string(), degraded_json);
let mut errors_obj = Map::new();
for (kind, count) in &repl.apply_errors {
errors_obj.insert(kind.clone(), JsonValue::Number(*count as f64));
}
object.insert("apply_errors".to_string(), JsonValue::Object(errors_obj));
JsonValue::Object(object)
}
#[cfg(test)]
mod tests {
use super::*;
fn base_inputs() -> ClusterStatusInputs {
ClusterStatusInputs {
snapshot_at_unix_ms: 1_700_000_000_000,
version: "0.0.0-test".to_string(),
phase: "ready".to_string(),
uptime_secs: 12.5,
started_at_unix_ms: 1_699_999_987_500,
ready_at_unix_ms: Some(1_699_999_990_000),
read_only: false,
deployment_shape: DeploymentShapeView::Server,
process_role: ProcessRoleView::Standalone,
transport: TransportSnapshot {
active: vec![TransportListenerView {
transport: "http".to_string(),
bind_addr: "127.0.0.1:8080".to_string(),
explicit: true,
reason: None,
}],
failed: vec![],
},
connections: ConnectionSnapshot {
active: 1,
idle: 4,
total_checkouts: 17,
max: Some(50),
},
storage: StorageSnapshot {
db_size_bytes: Some(4096),
remote_backend: None,
encryption_state: "off".to_string(),
encryption_error: None,
paged_mode: false,
},
wal: WalSnapshot {
current_lsn: 100,
last_archived_lsn: 100,
},
system: SystemSnapshot {
pid: 4242,
cpu_cores: 8,
os: "linux".to_string(),
arch: "x86_64".to_string(),
hostname: "test-host".to_string(),
total_memory_bytes: Some(16 * 1024 * 1024 * 1024),
available_memory_bytes: Some(8 * 1024 * 1024 * 1024),
},
replication: ReplicationSnapshot {
role: ProcessRoleView::Standalone,
commit_policy: "local".to_string(),
replicas: vec![],
apply_health: None,
apply_errors: vec![],
},
}
}
fn obj<'a>(v: &'a JsonValue) -> &'a Map<String, JsonValue> {
v.as_object().unwrap()
}
fn s<'a>(v: &'a JsonValue) -> &'a str {
v.as_str().unwrap()
}
fn n(v: &JsonValue) -> f64 {
v.as_f64().unwrap()
}
fn unavail_reason<'a>(v: &'a JsonValue) -> &'a str {
let o = obj(v);
assert_eq!(o.get("available").and_then(JsonValue::as_bool), Some(false));
s(o.get("reason").unwrap())
}
#[test]
fn cluster_status_standalone_classifies_server_and_marks_unmeasurable_fields() {
let json = cluster_status_json(&base_inputs());
let root = obj(&json);
for key in [
"snapshot_at_unix_ms",
"version",
"phase",
"uptime_secs",
"started_at_unix_ms",
"ready_at_unix_ms",
"read_only",
"deployment",
"transports",
"connections",
"storage",
"wal",
"system",
"throughput",
"latency",
"last_error",
"replication",
] {
assert!(root.contains_key(key), "missing key {key}");
}
assert_eq!(s(root.get("phase").unwrap()), "ready");
assert_eq!(s(root.get("version").unwrap()), "0.0.0-test");
let dep = obj(root.get("deployment").unwrap());
assert_eq!(s(dep.get("shape").unwrap()), "server");
assert_eq!(s(dep.get("process_role").unwrap()), "standalone");
assert_eq!(
unavail_reason(dep.get("container").unwrap()),
"container_metadata_not_probed"
);
let tr = obj(root.get("transports").unwrap());
let active = tr.get("active").and_then(JsonValue::as_array).unwrap();
assert_eq!(active.len(), 1);
let listener = obj(&active[0]);
assert_eq!(s(listener.get("transport").unwrap()), "http");
assert_eq!(s(listener.get("bind_addr").unwrap()), "127.0.0.1:8080");
let conn = obj(root.get("connections").unwrap());
assert_eq!(n(conn.get("active").unwrap()), 1.0);
assert_eq!(n(conn.get("max").unwrap()), 50.0);
let st = obj(root.get("storage").unwrap());
assert_eq!(n(st.get("db_size_bytes").unwrap()), 4096.0);
assert!(matches!(st.get("remote_backend").unwrap(), JsonValue::Null));
assert_eq!(
s(obj(st.get("encryption_at_rest").unwrap())
.get("state")
.unwrap()),
"off"
);
let wal = obj(root.get("wal").unwrap());
assert_eq!(n(wal.get("current_lsn").unwrap()), 100.0);
assert_eq!(n(wal.get("archive_lag_records").unwrap()), 0.0);
assert_eq!(
unavail_reason(wal.get("bytes").unwrap()),
"wal_bytes_not_tracked"
);
let sys = obj(root.get("system").unwrap());
assert_eq!(n(sys.get("cpu_cores").unwrap()), 8.0);
assert!(matches!(
sys.get("total_memory_bytes").unwrap(),
JsonValue::Number(_)
));
assert_eq!(
unavail_reason(sys.get("cpu_usage").unwrap()),
"cpu_usage_not_sampled"
);
assert_eq!(
unavail_reason(sys.get("ram_usage").unwrap()),
"ram_usage_not_sampled"
);
assert_eq!(
unavail_reason(root.get("throughput").unwrap()),
"throughput_not_sampled"
);
assert_eq!(
unavail_reason(root.get("latency").unwrap()),
"latency_not_sampled"
);
assert_eq!(
unavail_reason(root.get("last_error").unwrap()),
"last_error_not_tracked"
);
let repl = obj(root.get("replication").unwrap());
assert_eq!(s(repl.get("role").unwrap()), "standalone");
assert_eq!(s(repl.get("commit_policy").unwrap()), "local");
assert_eq!(n(repl.get("replica_count").unwrap()), 0.0);
assert!(repl
.get("replicas")
.and_then(JsonValue::as_array)
.unwrap()
.is_empty());
assert_eq!(s(repl.get("apply_health").unwrap()), "not_applicable");
assert_eq!(
repl.get("degraded").and_then(JsonValue::as_bool),
Some(false)
);
}
#[test]
fn cluster_status_degraded_replica_reports_degraded_true_and_unavailable_memory() {
let mut inputs = base_inputs();
inputs.process_role = ProcessRoleView::Replica;
inputs.system.total_memory_bytes = None;
inputs.system.available_memory_bytes = None;
inputs.system.os = "macos".to_string();
inputs.transport.failed.push(TransportListenerView {
transport: "grpc".to_string(),
bind_addr: "0.0.0.0:50051".to_string(),
explicit: true,
reason: Some("port_in_use".to_string()),
});
inputs.wal.current_lsn = 200;
inputs.wal.last_archived_lsn = 150;
inputs.replication = ReplicationSnapshot {
role: ProcessRoleView::Replica,
commit_policy: "ack_n=2".to_string(),
replicas: vec![ReplicaView {
id: "replica-a".to_string(),
last_acked_lsn: 180,
last_sent_lsn: 195,
last_durable_lsn: 175,
last_seen_at_unix_ms: 1_700_000_000_000,
region: Some("us-east-1".to_string()),
}],
apply_health: Some("stalled_gap".to_string()),
apply_errors: vec![("gap".to_string(), 3), ("apply".to_string(), 0)],
};
let json = cluster_status_json(&inputs);
let root = obj(&json);
let dep = obj(root.get("deployment").unwrap());
assert_eq!(s(dep.get("process_role").unwrap()), "replica");
let tr = obj(root.get("transports").unwrap());
let failed = tr.get("failed").and_then(JsonValue::as_array).unwrap();
assert_eq!(failed.len(), 1);
assert_eq!(s(obj(&failed[0]).get("reason").unwrap()), "port_in_use");
let wal = obj(root.get("wal").unwrap());
assert_eq!(n(wal.get("archive_lag_records").unwrap()), 50.0);
let sys = obj(root.get("system").unwrap());
assert_eq!(
unavail_reason(sys.get("total_memory_bytes").unwrap()),
"memory_probe_not_supported"
);
assert_eq!(
unavail_reason(sys.get("available_memory_bytes").unwrap()),
"memory_probe_not_supported"
);
let repl = obj(root.get("replication").unwrap());
assert_eq!(s(repl.get("role").unwrap()), "replica");
assert_eq!(s(repl.get("apply_health").unwrap()), "stalled_gap");
assert_eq!(
repl.get("degraded").and_then(JsonValue::as_bool),
Some(true)
);
let replicas = repl.get("replicas").and_then(JsonValue::as_array).unwrap();
assert_eq!(replicas.len(), 1);
let r0 = obj(&replicas[0]);
assert_eq!(s(r0.get("id").unwrap()), "replica-a");
assert_eq!(n(r0.get("lag_records").unwrap()), 20.0);
assert_eq!(s(r0.get("region").unwrap()), "us-east-1");
let errs = obj(repl.get("apply_errors").unwrap());
assert_eq!(n(errs.get("gap").unwrap()), 3.0);
assert_eq!(n(errs.get("apply").unwrap()), 0.0);
}
#[test]
fn cluster_status_unknown_role_marks_role_and_apply_health_unavailable() {
let mut inputs = base_inputs();
inputs.process_role = ProcessRoleView::Unknown;
inputs.replication.role = ProcessRoleView::Unknown;
inputs.replication.apply_health = None;
let json = cluster_status_json(&inputs);
let repl = obj(obj(&json).get("replication").unwrap());
assert_eq!(
unavail_reason(repl.get("role").unwrap()),
"process_role_unknown"
);
assert_eq!(
unavail_reason(repl.get("apply_health").unwrap()),
"process_role_unknown"
);
assert_eq!(
unavail_reason(repl.get("degraded").unwrap()),
"process_role_unknown"
);
}
#[test]
fn unavailable_envelope_is_stable() {
let v = unavailable_json("foo");
let o = obj(&v);
assert_eq!(o.get("available").and_then(JsonValue::as_bool), Some(false));
assert_eq!(s(o.get("reason").unwrap()), "foo");
assert_eq!(o.len(), 2);
}
}