#[macro_use]
extern crate log;
mod cli;
mod cluster_status;
mod constants;
mod consumer_groups;
mod http;
mod internals;
mod kafka_types;
mod konsumer_offsets_data;
mod lag_register;
mod logging;
mod partition_offsets;
mod prometheus_metrics;
use std::{error::Error, sync::Arc};
use tokio_util::sync::CancellationToken;
use crate::cli::Cli;
use crate::internals::Awaitable;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = parse_cli_and_init_logging();
let admin_client_config = cli.build_client_config();
let shutdown_token = build_shutdown_token();
let (cs_reg, cs_join) =
cluster_status::init(admin_client_config.clone(), cli.cluster_id, shutdown_token.clone());
cs_reg.await_ready(shutdown_token.clone()).await?;
let cs_reg_arc = Arc::new(cs_reg);
let (po_reg, po_join) = partition_offsets::init(
admin_client_config.clone(),
cli.offsets_history,
0.1,
cs_reg_arc.clone(),
shutdown_token.clone(),
);
po_reg.await_ready(shutdown_token.clone()).await?;
let po_reg_arc = Arc::new(po_reg);
let (kod_rx, kod_join) =
konsumer_offsets_data::init(admin_client_config.clone(), shutdown_token.clone());
let (cg_rx, cg_join) =
consumer_groups::init(admin_client_config.clone(), shutdown_token.clone());
let lag_reg = lag_register::init(cg_rx, kod_rx, po_reg_arc.clone());
lag_reg.await_ready(shutdown_token.clone()).await?;
let lag_reg_arc = Arc::new(lag_reg);
let prom_reg = prometheus_metrics::init(cs_reg_arc.get_cluster_id().await);
let http_fut = http::init(
cli.listen_on,
cs_reg_arc.clone(),
po_reg_arc.clone(),
lag_reg_arc.clone(),
shutdown_token.clone(),
Arc::new(prom_reg),
);
let _ = tokio::join!(cs_join, po_join, kod_join, cg_join, http_fut);
info!("Shutdown!");
std::process::exit(exit_code::SUCCESS);
}
fn parse_cli_and_init_logging() -> Cli {
let cli = Cli::parse_and_validate();
logging::init(cli.verbosity_level());
trace!("Created:\n{:#?}", cli);
cli
}
fn build_shutdown_token() -> CancellationToken {
let shutdown_token = CancellationToken::new();
let shutdown_token_clone = shutdown_token.clone();
if let Err(e) = ctrlc::set_handler(move || {
info!("Beginning shutdown...");
shutdown_token_clone.cancel();
}) {
error!("Failed to register signal handler: {e}");
}
shutdown_token
}