use std::sync::Arc;
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use crabka_protocol::owned::add_raft_voter_request::{self, AddRaftVoterRequest, Listener};
use crabka_protocol::owned::add_raft_voter_response::AddRaftVoterResponse;
use crabka_protocol::{Decode, Encode};
use crate::codes;
const RETRY_BACKOFF: Duration = Duration::from_millis(500);
pub(crate) struct AutoJoinParams {
pub auto_join: bool,
pub node_id: crabka_raft::NodeId,
pub directory_id: uuid::Uuid,
pub cluster_id: Option<uuid::Uuid>,
pub bootstrap_servers: Vec<std::net::SocketAddr>,
pub listener_protocol: crabka_security::ListenerProtocol,
pub controller: Arc<dyn crate::metadata_source::MetadataSource>,
pub inter_broker_client: Arc<crate::network::client::InterBrokerClient>,
}
pub(crate) async fn run(params: AutoJoinParams) {
if !params.auto_join {
return;
}
let self_id = params.node_id;
let bootstrap_servers = params.bootstrap_servers;
if bootstrap_servers.is_empty() {
tracing::warn!(
node_id = self_id,
"auto_join enabled but bootstrap_servers is empty; cannot discover a leader"
);
return;
}
let bound = params.controller.controller_bound_addr();
let Ok(voter_id) = i32::try_from(self_id) else {
tracing::error!(node_id = self_id, "node_id exceeds i32; cannot auto-join");
return;
};
let directory_id = crabka_protocol::primitives::uuid::Uuid(*params.directory_id.as_bytes());
let listener = Listener {
name: "CONTROLLER".to_string(),
host: bound.ip().to_string(),
port: bound.port(),
..Default::default()
};
let protocol = params.listener_protocol;
let client = params.inter_broker_client;
let controller = params.controller;
let cluster_id = params.cluster_id;
let mut next_server = 0usize;
loop {
if controller.current_image().voters().contains(self_id) {
tracing::info!(node_id = self_id, "auto-join complete; node is a voter");
return;
}
let target = bootstrap_servers[next_server % bootstrap_servers.len()];
next_server = next_server.wrapping_add(1);
let req = AddRaftVoterRequest {
cluster_id: cluster_id.map(|u| u.to_string()),
timeout_ms: 30_000,
voter_id,
voter_directory_id: directory_id,
listeners: vec![listener.clone()],
ack_when_committed: true,
..Default::default()
};
match send_add_raft_voter(&client, protocol, target, &req).await {
Ok(resp) => log_join_outcome(self_id, target, &resp),
Err(e) => {
tracing::debug!(
node_id = self_id,
server = %target,
error = %e,
"auto-join: dial/RPC failed; trying next bootstrap server"
);
}
}
tokio::time::sleep(RETRY_BACKOFF).await;
}
}
fn log_join_outcome(
self_id: crabka_raft::NodeId,
target: std::net::SocketAddr,
resp: &AddRaftVoterResponse,
) {
match resp.error_code {
codes::NONE => {
tracing::info!(
node_id = self_id,
leader = %target,
"auto-join accepted by leader"
);
}
codes::NOT_LEADER_OR_FOLLOWER => {
tracing::debug!(
node_id = self_id,
server = %target,
msg = ?resp.error_message,
"auto-join target is not the leader; trying next bootstrap server"
);
}
codes::REQUEST_TIMED_OUT => {
tracing::debug!(
node_id = self_id,
server = %target,
"auto-join: reconfiguration in progress on leader; retrying"
);
}
codes::INVALID_REQUEST => {
tracing::debug!(
node_id = self_id,
server = %target,
msg = ?resp.error_message,
"auto-join: not yet caught up; retrying"
);
}
other => {
tracing::warn!(
node_id = self_id,
server = %target,
error_code = other,
msg = ?resp.error_message,
"auto-join: unexpected error_code; retrying"
);
}
}
}
async fn send_add_raft_voter(
client: &crate::network::client::InterBrokerClient,
protocol: crabka_security::ListenerProtocol,
target: std::net::SocketAddr,
req: &AddRaftVoterRequest,
) -> Result<AddRaftVoterResponse, String> {
let version = add_raft_voter_request::MAX_VERSION;
let mut body = BytesMut::with_capacity(req.encoded_len(version));
req.encode(&mut body, version)
.map_err(|e| format!("AddRaftVoter encode: {e}"))?;
let opts = crabka_client_core::ConnectionOptions {
client_id: "crabka-auto-join".to_string(),
..crabka_client_core::ConnectionOptions::default()
};
let conn = client
.connect_as_connection(
&target.ip().to_string(),
target.port(),
protocol,
"localhost",
opts,
)
.await
.map_err(|e| format!("dial {target}: {e}"))?;
let resp_body = conn
.raw_request(add_raft_voter_request::API_KEY, version, Bytes::from(body))
.await
.map_err(|e| format!("AddRaftVoter raw_request: {e}"));
conn.close();
let resp_body = resp_body?;
let mut cur: &[u8] = &resp_body;
AddRaftVoterResponse::decode(&mut cur, version).map_err(|e| format!("AddRaftVoter decode: {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn run_returns_immediately_when_auto_join_disabled() {
let tempdir = tempfile::tempdir().expect("tempdir");
let config = crate::BrokerConfig::for_tests(tempdir.path().to_path_buf());
let handle = crate::Broker::start(config).await.expect("broker start");
let broker = handle.broker_arc_for_test();
let params = AutoJoinParams {
auto_join: false,
node_id: 1,
directory_id: uuid::Uuid::from_u128(1),
cluster_id: None,
bootstrap_servers: vec!["127.0.0.1:1".parse().unwrap()],
listener_protocol: crabka_security::ListenerProtocol::Plaintext,
controller: broker.controller_for_test(),
inter_broker_client: broker.inter_broker_client_for_test(),
};
tokio::time::timeout(Duration::from_secs(2), run(params))
.await
.expect("run() returned immediately for auto_join=false");
handle.shutdown().await;
}
}