#![allow(dead_code)]
use std::sync::Arc;
use std::time::Duration;
use crabka_client_core::ConnectionOptions;
use crabka_protocol::owned::broker_heartbeat_request::BrokerHeartbeatRequest;
use crabka_security::ListenerProtocol;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
pub(crate) struct Config {
pub broker_id: i32,
pub interval: Duration,
pub controller: Arc<dyn crate::metadata_source::MetadataSource>,
pub shutdown: CancellationToken,
pub inter_broker_client: Arc<crate::network::client::InterBrokerClient>,
pub inter_broker_listener_protocol: ListenerProtocol,
pub inter_broker_listener_name: String,
pub want_shutdown: tokio::sync::watch::Receiver<bool>,
pub should_shutdown: Arc<tokio::sync::watch::Sender<bool>>,
pub log_dir_status: crate::log_dir_status::LogDirRegistry,
pub log_dir_ids: crate::log_dir_id::LogDirIds,
pub all_log_dirs: Vec<std::path::PathBuf>,
pub supervisor_shutdown: tokio_util::sync::CancellationToken,
}
fn offline_dir_uuids(
status: &crate::log_dir_status::LogDirRegistry,
ids: &crate::log_dir_id::LogDirIds,
) -> Vec<crabka_protocol::primitives::uuid::Uuid> {
status
.offline()
.into_iter()
.filter_map(|(path, _reason)| ids.id_for(&path))
.map(|u| crabka_protocol::primitives::uuid::Uuid(*u.as_bytes()))
.collect()
}
fn all_dirs_offline(
all_log_dirs: &[std::path::PathBuf],
status: &crate::log_dir_status::LogDirRegistry,
) -> bool {
!all_log_dirs.is_empty() && all_log_dirs.iter().all(|d| status.is_offline(d))
}
fn all_log_dirs_offline(cfg: &Config) -> bool {
all_dirs_offline(&cfg.all_log_dirs, &cfg.log_dir_status)
}
fn trigger_all_dirs_offline_shutdown(cfg: &mut Config, reason: &str) {
tracing::error!(
reason,
"all log dirs offline — initiating broker self-shutdown (KIP-112)"
);
let _ = cfg.should_shutdown.send(true);
cfg.supervisor_shutdown.cancel();
}
pub(crate) async fn run(mut cfg: Config) {
let mut tick = tokio::time::interval(cfg.interval);
loop {
tokio::select! {
_ = tick.tick() => {},
() = cfg.shutdown.cancelled() => return,
}
if all_log_dirs_offline(&cfg) {
trigger_all_dirs_offline_shutdown(&mut cfg, "detected before controller resolution");
return;
}
let leader_id = *cfg.controller.watch_leader().borrow();
let Some(leader_id) = leader_id else {
debug!("heartbeat: no controller leader yet");
continue;
};
let image = cfg.controller.current_image();
let Some(broker_rec) = image.broker(leader_id) else {
debug!("heartbeat: controller leader not in metadata image yet");
continue;
};
let (host, port) = broker_rec
.endpoints
.iter()
.find(|e| e.name == cfg.inter_broker_listener_name)
.map_or_else(
|| (broker_rec.host.clone(), broker_rec.port),
|e| (e.host.clone(), e.port),
);
let opts = ConnectionOptions {
client_id: format!("crabka-broker-{}-heartbeat", cfg.broker_id),
..ConnectionOptions::default()
};
let client_res = cfg
.inter_broker_client
.connect_as_connection(
&host,
port,
cfg.inter_broker_listener_protocol,
"localhost",
opts,
)
.await;
let Ok(client) = client_res else {
debug!("heartbeat: connect failed");
continue;
};
let want_shut_down = *cfg.want_shutdown.borrow_and_update();
let offline_log_dirs = offline_dir_uuids(&cfg.log_dir_status, &cfg.log_dir_ids);
let resp = client
.send(BrokerHeartbeatRequest {
broker_id: cfg.broker_id,
broker_epoch: 0,
current_metadata_offset: 0,
want_fence: false,
want_shut_down,
offline_log_dirs,
..Default::default()
})
.await;
match resp {
Ok(r) => {
if r.should_shut_down {
let _ = cfg.should_shutdown.send(true);
}
}
Err(e) => warn!(error = %e, "heartbeat send failed"),
}
if all_log_dirs_offline(&cfg) {
trigger_all_dirs_offline_shutdown(&mut cfg, "detected after heartbeat send");
return;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use tempfile::tempdir;
#[test]
fn offline_dir_uuids_maps_offline_paths() {
let a = tempdir().unwrap();
let b = tempdir().unwrap();
let paths = vec![a.path().to_path_buf(), b.path().to_path_buf()];
let ids = crate::log_dir_id::LogDirIds::resolve(&paths);
let status = crate::log_dir_status::LogDirRegistry::probe(&paths);
assert!(offline_dir_uuids(&status, &ids).is_empty());
status.mark_offline(a.path(), "test");
let result = offline_dir_uuids(&status, &ids);
assert!(result.len() == 1);
let expected_id = ids.id_for(a.path()).unwrap();
assert!(result[0].0 == *expected_id.as_bytes());
}
#[test]
fn all_dirs_offline_true_only_when_every_dir_offline() {
let a = tempdir().unwrap();
let b = tempdir().unwrap();
let paths = vec![a.path().to_path_buf(), b.path().to_path_buf()];
let status = crate::log_dir_status::LogDirRegistry::probe(&paths);
assert!(!all_dirs_offline(&[], &status));
assert!(!all_dirs_offline(&paths, &status));
status.mark_offline(a.path(), "disk error");
assert!(!all_dirs_offline(&paths, &status));
status.mark_offline(b.path(), "disk error");
assert!(all_dirs_offline(&paths, &status));
}
}