use std::sync::Arc;
use axum::extract::State;
use axum::routing::get;
use axum::{Json, Router};
use openraft::Raft;
use serde::{Deserialize, Serialize};
use super::types::{YantrikNode, YantrikNodeId, YantrikRaftTypeConfig};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RaftStatus {
pub node_id: u64,
pub state: String,
pub current_term: u64,
pub current_leader: Option<u64>,
pub last_log_index: Option<u64>,
pub last_applied_index: Option<u64>,
pub last_applied_term: Option<u64>,
pub snapshot_index: Option<u64>,
pub purged_index: Option<u64>,
pub millis_since_quorum_ack: Option<u64>,
pub members: Vec<RaftMember>,
pub healthy: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RaftMember {
pub node_id: u64,
pub addr: String,
pub is_voter: bool,
}
impl RaftStatus {
pub fn from_raft(raft: &Arc<Raft<YantrikRaftTypeConfig>>) -> Self {
let metrics = raft.metrics().borrow().clone();
Self::from_metrics(&metrics)
}
pub fn from_metrics(m: &openraft::RaftMetrics<YantrikNodeId, YantrikNode>) -> Self {
let voters: std::collections::BTreeSet<YantrikNodeId> =
m.membership_config.membership().voter_ids().collect();
let mut members: Vec<RaftMember> = m
.membership_config
.nodes()
.map(|(id, node)| RaftMember {
node_id: id.raw(),
addr: node.addr.clone(),
is_voter: voters.contains(id),
})
.collect();
members.sort_by_key(|m| m.node_id);
Self {
node_id: m.id.raw(),
state: format!("{:?}", m.state).to_lowercase(),
current_term: m.current_term,
current_leader: m.current_leader.map(|id| id.raw()),
last_log_index: m.last_log_index,
last_applied_index: m.last_applied.as_ref().map(|l| l.index),
last_applied_term: m.last_applied.as_ref().map(|l| l.leader_id.term),
snapshot_index: m.snapshot.as_ref().map(|l| l.index),
purged_index: m.purged.as_ref().map(|l| l.index),
millis_since_quorum_ack: m.millis_since_quorum_ack,
members,
healthy: m.running_state.is_ok(),
}
}
pub fn is_leader(&self) -> bool {
self.current_leader == Some(self.node_id)
}
}
pub fn record_openraft_status(status: &RaftStatus) {
let voters = status.members.iter().filter(|m| m.is_voter).count() as u64;
let learners = status.members.iter().filter(|m| !m.is_voter).count() as u64;
crate::metrics::record_openraft_gauges(
status.current_term,
status.is_leader(),
status.last_log_index,
status.last_applied_index,
status.snapshot_index,
status.purged_index,
status.millis_since_quorum_ack,
status.healthy,
voters,
learners,
);
}
pub fn raft_status_router(raft: Arc<Raft<YantrikRaftTypeConfig>>) -> Router {
Router::new()
.route("/v1/cluster/raft", get(handle_raft_status))
.with_state(raft)
}
pub fn spawn_raft_metrics_recorder(
raft: Arc<Raft<YantrikRaftTypeConfig>>,
cancel: tokio_util::sync::CancellationToken,
) -> tokio::task::JoinHandle<()> {
let mut rx = raft.metrics();
tokio::spawn(async move {
let initial = rx.borrow().clone();
record_openraft_status(&RaftStatus::from_metrics(&initial));
loop {
tokio::select! {
_ = cancel.cancelled() => break,
changed = rx.changed() => {
if changed.is_err() {
break;
}
let m = rx.borrow().clone();
record_openraft_status(&RaftStatus::from_metrics(&m));
}
}
}
})
}
async fn handle_raft_status(
State(raft): State<Arc<Raft<YantrikRaftTypeConfig>>>,
) -> Json<RaftStatus> {
Json(RaftStatus::from_raft(&raft))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit::LocalSqliteCommitter;
use crate::raft::log_storage::SqliteRaftLogStorage;
use crate::raft::network::StubRaftNetworkFactory;
use crate::raft::state_machine::YantrikStateMachine;
use openraft::Config;
use std::collections::BTreeMap;
use std::time::Duration;
async fn build_single_node_raft() -> Arc<Raft<YantrikRaftTypeConfig>> {
let local = Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let log_store = SqliteRaftLogStorage::open_in_memory();
let state_machine = YantrikStateMachine::new(local);
let network = StubRaftNetworkFactory;
let config = Arc::new(
Config {
cluster_name: "yantrikdb-status-test".into(),
heartbeat_interval: 100,
election_timeout_min: 200,
election_timeout_max: 400,
..Default::default()
}
.validate()
.unwrap(),
);
let me = YantrikNodeId::new(1);
let raft = Arc::new(
Raft::<YantrikRaftTypeConfig>::new(me, config, network, log_store, state_machine)
.await
.unwrap(),
);
let mut nodes = BTreeMap::new();
nodes.insert(me, YantrikNode::new("http://127.0.0.1:0"));
raft.initialize(nodes).await.unwrap();
for _ in 0..30 {
if raft.current_leader().await == Some(me) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
raft
}
#[tokio::test]
async fn status_reflects_single_node_leader_state() {
let raft = build_single_node_raft().await;
let status = RaftStatus::from_raft(&raft);
assert_eq!(status.node_id, 1);
assert_eq!(status.state, "leader");
assert!(status.is_leader());
assert_eq!(status.current_leader, Some(1));
assert!(status.current_term >= 1);
assert!(status.healthy);
assert_eq!(status.members.len(), 1);
assert_eq!(status.members[0].node_id, 1);
assert!(status.members[0].is_voter);
}
#[tokio::test]
async fn status_serializes_to_stable_json_shape() {
let raft = build_single_node_raft().await;
let status = RaftStatus::from_raft(&raft);
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("\"node_id\""));
assert!(json.contains("\"state\""));
assert!(json.contains("\"current_term\""));
assert!(json.contains("\"current_leader\""));
assert!(json.contains("\"members\""));
assert!(json.contains("\"healthy\""));
let back: RaftStatus = serde_json::from_str(&json).unwrap();
assert_eq!(status, back);
}
#[tokio::test]
async fn raft_status_router_serves_json() {
use axum::body::to_bytes;
use axum::http::Request;
use tower::ServiceExt;
let raft = build_single_node_raft().await;
let app = raft_status_router(raft);
let req = Request::builder()
.uri("/v1/cluster/raft")
.body(axum::body::Body::empty())
.unwrap();
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), 200);
let body_bytes = to_bytes(res.into_body(), 64 * 1024).await.unwrap();
let parsed: RaftStatus = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(parsed.node_id, 1);
assert_eq!(parsed.state, "leader");
}
#[tokio::test]
async fn record_openraft_status_updates_prometheus_gauges() {
let raft = build_single_node_raft().await;
let status = RaftStatus::from_raft(&raft);
record_openraft_status(&status);
let rendered = crate::metrics::global().render_prometheus();
assert!(rendered.contains("yantrikdb_openraft_is_leader 1"));
assert!(rendered.contains("yantrikdb_openraft_voters 1"));
assert!(rendered.contains("yantrikdb_openraft_running_state_healthy 1"));
assert!(
rendered.contains("yantrikdb_openraft_current_term"),
"expected current_term gauge in rendered metrics"
);
assert!(rendered.contains("yantrikdb_openraft_last_applied_index"));
}
#[test]
fn record_openraft_status_handles_none_indices_with_minus_one() {
let s = RaftStatus {
node_id: 5,
state: "follower".into(),
current_term: 7,
current_leader: Some(1),
last_log_index: None,
last_applied_index: None,
last_applied_term: None,
snapshot_index: None,
purged_index: None,
millis_since_quorum_ack: None,
members: vec![],
healthy: true,
};
record_openraft_status(&s);
let rendered = crate::metrics::global().render_prometheus();
assert!(rendered.contains("yantrikdb_openraft_last_log_index -1"));
assert!(rendered.contains("yantrikdb_openraft_quorum_ack_lag_ms -1"));
assert!(rendered.contains("yantrikdb_openraft_is_leader 0"));
}
#[test]
fn is_leader_reads_current_leader_against_self() {
let leader = RaftStatus {
node_id: 1,
state: "leader".into(),
current_term: 1,
current_leader: Some(1),
last_log_index: None,
last_applied_index: None,
last_applied_term: None,
snapshot_index: None,
purged_index: None,
millis_since_quorum_ack: None,
members: vec![],
healthy: true,
};
assert!(leader.is_leader());
let follower = RaftStatus {
node_id: 2,
state: "follower".into(),
current_term: 1,
current_leader: Some(1),
last_log_index: None,
last_applied_index: None,
last_applied_term: None,
snapshot_index: None,
purged_index: None,
millis_since_quorum_ack: None,
members: vec![],
healthy: true,
};
assert!(!follower.is_leader());
let no_leader = RaftStatus {
node_id: 1,
state: "candidate".into(),
current_term: 2,
current_leader: None,
last_log_index: None,
last_applied_index: None,
last_applied_term: None,
snapshot_index: None,
purged_index: None,
millis_since_quorum_ack: None,
members: vec![],
healthy: true,
};
assert!(!no_leader.is_leader());
}
}