use std::sync::Arc;
use ahash::HashMap;
use axum::extract::{self, Query};
use super::{AppError, ForestState};
use crate::chain_sync::NodeSyncStatus;
use crate::networks::calculate_expected_epoch;
use crate::rpc::f3::F3IsRunning;
const VERBOSE_PARAM: &str = "verbose";
const MAX_REQ_DURATION_SECS: u64 = 2;
pub(crate) async fn livez(
extract::State(state): extract::State<Arc<ForestState>>,
Query(params): Query<HashMap<String, String>>,
) -> Result<String, AppError> {
let mut acc = MessageAccumulator::new_with_enabled(params.contains_key(VERBOSE_PARAM));
let mut lively = true;
lively &= check_sync_status_not_error(&state, &mut acc);
lively &= check_peers_connected(&state, &mut acc);
lively &= check_rpc_server_running(&state, &mut acc).await;
if lively {
Ok(acc.result_ok())
} else {
Err(AppError(anyhow::anyhow!(acc.result_err())))
}
}
pub(crate) async fn readyz(
extract::State(state): extract::State<Arc<ForestState>>,
Query(params): Query<HashMap<String, String>>,
) -> Result<String, AppError> {
let mut acc = MessageAccumulator::new_with_enabled(params.contains_key(VERBOSE_PARAM));
let mut ready = true;
ready &= check_sync_status_synced(&state, &mut acc);
ready &= check_epoch_up_to_date(&state, &mut acc);
ready &= check_rpc_server_running(&state, &mut acc).await;
ready &= check_f3_running(&state, &mut acc).await;
if ready {
Ok(acc.result_ok())
} else {
Err(AppError(anyhow::anyhow!(acc.result_err())))
}
}
pub(crate) async fn healthz(
extract::State(state): extract::State<Arc<ForestState>>,
Query(params): Query<HashMap<String, String>>,
) -> Result<String, AppError> {
let mut acc = MessageAccumulator::new_with_enabled(params.contains_key(VERBOSE_PARAM));
let mut healthy = true;
healthy &= check_epoch_up_to_date(&state, &mut acc);
healthy &= check_rpc_server_running(&state, &mut acc).await;
healthy &= check_sync_status_not_error(&state, &mut acc);
healthy &= check_peers_connected(&state, &mut acc);
healthy &= check_f3_running(&state, &mut acc).await;
if healthy {
Ok(acc.result_ok())
} else {
Err(AppError(anyhow::anyhow!(acc.result_err())))
}
}
fn check_sync_status_synced(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
if state.sync_status.read().status == NodeSyncStatus::Synced {
acc.push_ok("sync complete");
true
} else {
acc.push_err("sync incomplete");
false
}
}
fn check_sync_status_not_error(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
if state.sync_status.read().status != NodeSyncStatus::Error {
acc.push_ok("sync ok");
true
} else {
acc.push_err("sync error");
false
}
}
fn check_epoch_up_to_date(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
const MAX_EPOCH_DIFF: i64 = 5;
let now_epoch = calculate_expected_epoch(
chrono::Utc::now().timestamp() as u64,
state.genesis_timestamp,
state.chain_config.block_delay_secs,
);
if state.sync_status.read().current_head_epoch >= now_epoch - MAX_EPOCH_DIFF {
acc.push_ok("epoch up to date");
true
} else {
acc.push_err("epoch outdated");
false
}
}
async fn check_rpc_server_running(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
if !state.config.client.enable_rpc {
acc.push_ok("rpc server disabled");
true
} else if tokio::time::timeout(
std::time::Duration::from_secs(MAX_REQ_DURATION_SECS),
tokio::net::TcpStream::connect(state.config.client.rpc_address),
)
.await
.is_ok_and(|connected| connected.is_ok())
{
acc.push_ok("rpc server running");
true
} else {
acc.push_err("rpc server not running");
false
}
}
fn check_peers_connected(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
if state.peer_manager.peer_count() > 0 {
acc.push_ok("peers connected");
true
} else {
acc.push_err("no peers connected");
false
}
}
async fn check_f3_running(state: &ForestState, acc: &mut MessageAccumulator) -> bool {
if !crate::f3::is_sidecar_ffi_enabled(&state.chain_config) {
acc.push_ok("f3 disabled");
true
} else if tokio::time::timeout(
std::time::Duration::from_secs(MAX_REQ_DURATION_SECS),
F3IsRunning::is_f3_running(),
)
.await
.is_ok_and(|is_running| is_running.unwrap_or_default())
{
acc.push_ok("f3 running");
true
} else if crate::f3::get_f3_sidecar_params(&state.chain_config).bootstrap_epoch
> state.sync_status.read().network_head_epoch
{
acc.push_ok("f3 pending activation");
true
} else {
acc.push_err("f3 not running");
false
}
}
struct MessageAccumulator {
messages: Vec<String>,
enabled: bool,
}
impl MessageAccumulator {
fn new_with_enabled(enabled: bool) -> Self {
Self {
messages: Vec::new(),
enabled,
}
}
fn push_ok<S: AsRef<str>>(&mut self, message: S) {
if self.enabled {
self.messages.push(format!("[+] {}", message.as_ref()));
}
}
fn push_err<S: AsRef<str>>(&mut self, message: S) {
if self.enabled {
self.messages.push(format!("[!] {}", message.as_ref()));
}
}
fn result_ok(&self) -> String {
if self.enabled {
self.messages.join("\n")
} else {
"OK".to_string()
}
}
fn result_err(&self) -> String {
if self.enabled {
self.messages.join("\n")
} else {
"ERROR".to_string()
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_message_accumulator() {
let mut acc = MessageAccumulator::new_with_enabled(true);
acc.push_ok("ok1");
acc.push_err("err1");
acc.push_ok("ok2");
acc.push_err("err2");
assert_eq!(acc.result_ok(), "[+] ok1\n[!] err1\n[+] ok2\n[!] err2");
assert_eq!(acc.result_err(), "[+] ok1\n[!] err1\n[+] ok2\n[!] err2");
}
#[test]
fn test_message_accumulator_disabled() {
let mut acc = MessageAccumulator::new_with_enabled(false);
acc.push_ok("ok1");
acc.push_err("err1");
acc.push_ok("ok2");
acc.push_err("err2");
assert_eq!(acc.result_ok(), "OK");
assert_eq!(acc.result_err(), "ERROR");
}
}