use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use crate::control::startup::ReadyGate;
use crate::control::state::SharedState;
pub struct ClusterReadyGates {
pub raft_gate: ReadyGate,
pub schema_gate: ReadyGate,
pub sanity_gate: ReadyGate,
pub data_groups_gate: ReadyGate,
pub transport_gate: ReadyGate,
pub warm_peers_gate: ReadyGate,
pub health_loop_gate: ReadyGate,
pub gateway_enable_gate: ReadyGate,
}
pub async fn await_cluster_ready(
shared: &Arc<SharedState>,
raft_ready_rx: Option<tokio::sync::watch::Receiver<bool>>,
gates: ClusterReadyGates,
) -> anyhow::Result<()> {
let ClusterReadyGates {
raft_gate,
schema_gate,
sanity_gate,
data_groups_gate,
transport_gate,
warm_peers_gate,
health_loop_gate,
gateway_enable_gate,
} = gates;
if let Some(mut ready_rx) = raft_ready_rx {
const RAFT_READY_TIMEOUT: Duration = Duration::from_secs(30);
match tokio::time::timeout(RAFT_READY_TIMEOUT, ready_rx.wait_for(|v| *v)).await {
Ok(Ok(_)) => {
info!("metadata raft group ready — opening client listeners");
}
Ok(Err(_)) => {
raft_gate.fail("raft readiness watch dropped before signalling ready");
return Err(anyhow::anyhow!(
"raft readiness watch dropped before signalling ready"
));
}
Err(_) => {
raft_gate.fail(format!(
"raft readiness timeout after {RAFT_READY_TIMEOUT:?}"
));
return Err(anyhow::anyhow!(
"raft readiness timeout after {RAFT_READY_TIMEOUT:?} — \
metadata group failed to apply first entry"
));
}
}
}
raft_gate.fire();
schema_gate.fire();
let verify_report = crate::control::cluster::verify_and_repair(shared).await?;
if verify_report.is_acceptable() {
info!(report = %verify_report, "catalog sanity check passed");
} else {
sanity_gate.fail(format!("catalog sanity check failed: {verify_report}"));
return Err(anyhow::anyhow!(
"catalog sanity check failed: {verify_report}"
));
}
sanity_gate.fire();
data_groups_gate.fire();
transport_gate.fire();
if let (Some(transport), Some(topology)) = (
shared.cluster_transport.as_ref(),
shared.cluster_topology.as_ref(),
) {
let topo_snapshot = {
let guard = topology.read().unwrap_or_else(|p| p.into_inner());
guard.clone()
};
let warm_report = crate::control::cluster::warm_known_peers(
transport,
&topo_snapshot,
shared.node_id,
Duration::from_secs(2),
)
.await;
if warm_report.attempted > 0 {
info!(report = %warm_report, "peer cache warm-up complete");
if !warm_report.is_complete() {
for (id, err) in &warm_report.failed {
tracing::warn!(node_id = id, error = %err, "peer warm failed");
}
}
}
}
warm_peers_gate.fire();
health_loop_gate.fire();
gateway_enable_gate.fire();
Ok(())
}