crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! KIP-853 controller auto-join.
//!
//! A broker started in [`crate::BootstrapMode::Join`] with
//! `auto_join = true` is NOT yet a member of the controller raft group: its
//! raft log is empty and it waits in openraft's `Learner` state. This module
//! drives the joiner side of the dance — it discovers the leader via the
//! configured `bootstrap_servers` and sends the **Kafka `AddRaftVoter` wire
//! RPC** (`api_key` 80) carrying its own voter identity. The leader-side
//! handler (`crate::handlers::add_raft_voter`) runs `add_learner` (dialing
//! this joiner's controller listener to replicate the log), waits for the
//! observer to catch up, promotes it via `change_membership`, and submits the
//! authoritative `V1Voters` record. Once the joiner sees itself in the
//! committed voter set it stops.
//!
//! The joiner advertises its **real bound** controller endpoint (not the
//! configured `controller_listen_addr`, which may carry port 0 for an
//! OS-assigned port) so the leader's `add_learner` can dial it back.
//!
//! This is purely a client-side driver: it does NOT touch the reconfiguration
//! coordinator or openraft membership directly. All the lockstep safety lives
//! on the leader.

use std::sync::Arc;
use std::time::Duration;

use bytes::{Bytes, BytesMut};

use crabka_protocol::owned::add_raft_voter_request::{self, AddRaftVoterRequest, Listener};
use crabka_protocol::owned::add_raft_voter_response::AddRaftVoterResponse;
use crabka_protocol::{Decode, Encode};

use crate::codes;

/// Backoff between join attempts so a failed dial / not-yet-caught-up reply
/// doesn't hot-spin the loop.
const RETRY_BACKOFF: Duration = Duration::from_millis(500);

/// Everything the auto-join driver needs, pulled out of `BrokerConfig` +
/// `Broker` so the loop can be spawned *before* the full `Broker` Arc exists.
/// A `Join` broker's `Broker::start` blocks waiting for a leader, and that
/// leader only appears once this loop has driven the leader-side `add_learner`
/// + promotion — so the two must run concurrently.
pub(crate) struct AutoJoinParams {
    pub auto_join: bool,
    pub node_id: crabka_raft::NodeId,
    pub directory_id: uuid::Uuid,
    pub cluster_id: Option<uuid::Uuid>,
    pub bootstrap_servers: Vec<std::net::SocketAddr>,
    /// Protocol of the bootstrap server's data-plane listener (the
    /// inter-broker listener protocol) — `AddRaftVoter` is served there.
    pub listener_protocol: crabka_security::ListenerProtocol,
    pub controller: Arc<dyn crate::metadata_source::MetadataSource>,
    pub inter_broker_client: Arc<crate::network::client::InterBrokerClient>,
}

/// Drive the auto-join loop. Returns immediately (without touching the
/// network) when `auto_join` is disabled. Otherwise loops until this broker
/// appears in the committed voter set, rotating across `bootstrap_servers`.
/// Intended to be spawned as a detached background task during `Broker::start`.
pub(crate) async fn run(params: AutoJoinParams) {
    if !params.auto_join {
        return;
    }

    let self_id = params.node_id;
    let bootstrap_servers = params.bootstrap_servers;
    if bootstrap_servers.is_empty() {
        tracing::warn!(
            node_id = self_id,
            "auto_join enabled but bootstrap_servers is empty; cannot discover a leader"
        );
        return;
    }

    // Self's voter identity, advertising the REAL bound controller endpoint
    // (resolved port, not the possibly-zero configured port) so the leader's
    // add_learner can dial us back.
    let bound = params.controller.controller_bound_addr();
    let Ok(voter_id) = i32::try_from(self_id) else {
        tracing::error!(node_id = self_id, "node_id exceeds i32; cannot auto-join");
        return;
    };
    let directory_id = crabka_protocol::primitives::uuid::Uuid(*params.directory_id.as_bytes());
    let listener = Listener {
        name: "CONTROLLER".to_string(),
        host: bound.ip().to_string(),
        port: bound.port(),
        ..Default::default()
    };

    let protocol = params.listener_protocol;
    let client = params.inter_broker_client;
    let controller = params.controller;
    let cluster_id = params.cluster_id;

    let mut next_server = 0usize;
    loop {
        // Terminate as soon as the committed voter set includes us.
        if controller.current_image().voters().contains(self_id) {
            tracing::info!(node_id = self_id, "auto-join complete; node is a voter");
            return;
        }

        let target = bootstrap_servers[next_server % bootstrap_servers.len()];
        next_server = next_server.wrapping_add(1);

        let req = AddRaftVoterRequest {
            cluster_id: cluster_id.map(|u| u.to_string()),
            timeout_ms: 30_000,
            voter_id,
            voter_directory_id: directory_id,
            listeners: vec![listener.clone()],
            ack_when_committed: true,
            ..Default::default()
        };

        match send_add_raft_voter(&client, protocol, target, &req).await {
            Ok(resp) => log_join_outcome(self_id, target, &resp),
            Err(e) => {
                tracing::debug!(
                    node_id = self_id,
                    server = %target,
                    error = %e,
                    "auto-join: dial/RPC failed; trying next bootstrap server"
                );
            }
        }

        tokio::time::sleep(RETRY_BACKOFF).await;
    }
}

/// Log the leader's `AddRaftVoter` reply at the appropriate level. None of the
/// outcomes terminate the loop — the `voters().contains` check at the top of
/// `run` is the sole exit — so this is purely diagnostic.
fn log_join_outcome(
    self_id: crabka_raft::NodeId,
    target: std::net::SocketAddr,
    resp: &AddRaftVoterResponse,
) {
    match resp.error_code {
        codes::NONE => {
            tracing::info!(
                node_id = self_id,
                leader = %target,
                "auto-join accepted by leader"
            );
            // The committed V1Voters record may not be visible in our local
            // image yet (we're still catching up); the next loop iteration's
            // `voters().contains` check confirms before exiting.
        }
        codes::NOT_LEADER_OR_FOLLOWER => {
            // Not the leader. The error message may name the current leader,
            // but it isn't a routable address — fall back to rotating across
            // the configured bootstrap servers.
            tracing::debug!(
                node_id = self_id,
                server = %target,
                msg = ?resp.error_message,
                "auto-join target is not the leader; trying next bootstrap server"
            );
        }
        codes::REQUEST_TIMED_OUT => {
            tracing::debug!(
                node_id = self_id,
                server = %target,
                "auto-join: reconfiguration in progress on leader; retrying"
            );
        }
        codes::INVALID_REQUEST => {
            // Observer not yet caught up within the lag bound. Keep replicating
            // (openraft is doing that in the background) and retry shortly.
            tracing::debug!(
                node_id = self_id,
                server = %target,
                msg = ?resp.error_message,
                "auto-join: not yet caught up; retrying"
            );
        }
        other => {
            tracing::warn!(
                node_id = self_id,
                server = %target,
                error_code = other,
                msg = ?resp.error_message,
                "auto-join: unexpected error_code; retrying"
            );
        }
    }
}

/// Dial `target`'s controller listener (terminating TLS / SASL as the
/// protocol demands) and send a single `AddRaftVoter` request, returning the
/// decoded response. A fresh connection per attempt mirrors
/// `Controller::forward_submit_to`.
async fn send_add_raft_voter(
    client: &crate::network::client::InterBrokerClient,
    protocol: crabka_security::ListenerProtocol,
    target: std::net::SocketAddr,
    req: &AddRaftVoterRequest,
) -> Result<AddRaftVoterResponse, String> {
    let version = add_raft_voter_request::MAX_VERSION;

    let mut body = BytesMut::with_capacity(req.encoded_len(version));
    req.encode(&mut body, version)
        .map_err(|e| format!("AddRaftVoter encode: {e}"))?;

    let opts = crabka_client_core::ConnectionOptions {
        client_id: "crabka-auto-join".to_string(),
        ..crabka_client_core::ConnectionOptions::default()
    };
    let conn = client
        .connect_as_connection(
            &target.ip().to_string(),
            target.port(),
            protocol,
            "localhost",
            opts,
        )
        .await
        .map_err(|e| format!("dial {target}: {e}"))?;

    let resp_body = conn
        .raw_request(add_raft_voter_request::API_KEY, version, Bytes::from(body))
        .await
        .map_err(|e| format!("AddRaftVoter raw_request: {e}"));
    conn.close();
    let resp_body = resp_body?;

    let mut cur: &[u8] = &resp_body;
    AddRaftVoterResponse::decode(&mut cur, version).map_err(|e| format!("AddRaftVoter decode: {e}"))
}

#[cfg(test)]
mod tests {
    use super::*;

    /// `run` returns immediately when `auto_join` is disabled — no panic, no
    /// network dial. Build params with a real controller + inter-broker client
    /// but `auto_join = false`, and a deliberately bogus bootstrap server. If
    /// `run` honoured the flag it never dials; if it regressed and dialed, the
    /// loop would spin against the unreachable address and the timeout would
    /// fire (failing the test).
    #[tokio::test]
    async fn run_returns_immediately_when_auto_join_disabled() {
        let tempdir = tempfile::tempdir().expect("tempdir");
        let config = crate::BrokerConfig::for_tests(tempdir.path().to_path_buf());
        let handle = crate::Broker::start(config).await.expect("broker start");
        let broker = handle.broker_arc_for_test();

        let params = AutoJoinParams {
            auto_join: false,
            node_id: 1,
            directory_id: uuid::Uuid::from_u128(1),
            cluster_id: None,
            // Unroutable: would hang the loop if `run` ignored auto_join=false.
            bootstrap_servers: vec!["127.0.0.1:1".parse().unwrap()],
            listener_protocol: crabka_security::ListenerProtocol::Plaintext,
            controller: broker.controller_for_test(),
            inter_broker_client: broker.inter_broker_client_for_test(),
        };

        tokio::time::timeout(Duration::from_secs(2), run(params))
            .await
            .expect("run() returned immediately for auto_join=false");

        handle.shutdown().await;
    }
}