beyond-handoff 0.1.2

Zero-downtime atomic binary handoff for long-running daemons
Documentation
//! Wire-protocol defensive-check coverage.
//!
//! For every message the incumbent could receive *out of order* or with the
//! wrong handoff id, verify the session closes (the supervisor would observe
//! the disconnect and recover via its own error path) and the incumbent's
//! serve loop stays alive on its socket — ready for the next session.
//!
//! These tests exist alongside the crash-matrix to cover *protocol
//! violations* (peer bug / replay / cross-session frame) as distinct from
//! *crashes* (peer died). They're CI-cheap: each scenario takes <100ms.

mod common;

use std::thread;
use std::time::Duration;

use handoff::frame::{read_message, write_message};
use handoff::protocol::{HandoffId, Message, PROTO_MAX, Side};
use handoff::{DataDirLock, Incumbent};

use common::{MockDrainable, connect_with_retry};

/// Spin up an Incumbent and complete the Hello/HelloAck dance. Returns
/// the supervisor-side stream and the negotiated handoff_id ready for the
/// caller to send the first protocol message.
fn make_active_session() -> (
    tempfile::TempDir,
    std::os::unix::net::UnixStream,
    HandoffId,
    thread::JoinHandle<handoff::Result<()>>,
) {
    let temp = tempfile::tempdir().unwrap();
    let sock_path = temp.path().join("control.sock");
    let data_dir = temp.path().join("data");

    let lock = DataDirLock::acquire(&data_dir).unwrap();
    let incumbent = Incumbent::bind_cold_start(&sock_path, lock).unwrap();
    let drainable = MockDrainable::default();
    let server_thread = thread::spawn(move || incumbent.serve(drainable));

    let mut stream = connect_with_retry(&sock_path);
    let (_v, hello) = read_message(&mut stream).unwrap();
    assert!(matches!(hello, Message::Hello { .. }));
    let handoff_id = HandoffId::new();
    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::HelloAck {
            proto_version_chosen: PROTO_MAX,
            handoff_id,
        },
    )
    .unwrap();
    (temp, stream, handoff_id, server_thread)
}

/// `Commit` arriving before any `SealRequest` is a protocol violation —
/// the incumbent must close the session rather than silently accept.
#[test]
fn commit_before_seal_closes_session() {
    let (_temp, mut stream, handoff_id, _thread) = make_active_session();

    // Start a handoff so `state.active = Some(handoff_id)` but stop before
    // sending SealRequest — Commit at this point is illegal.
    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::PrepareHandoff {
            handoff_id,
            successor_pid: 9999,
            deadline_ms: 5000,
            drain_grace_ms: 1000,
        },
    )
    .unwrap();
    let (_v, drained) = read_message(&mut stream).unwrap();
    assert!(matches!(drained, Message::Drained { .. }));

    // Skip SealRequest, jump straight to Commit.
    write_message(&mut stream, PROTO_MAX, &Message::Commit { handoff_id }).unwrap();

    // Session must close: incumbent saw Commit-before-SealComplete and
    // returned Err from run_session_loop (EOF or any other Err).
    assert!(read_message(&mut stream).is_err());
}

/// `SealRequest` arriving for a handoff_id that doesn't match `active`
/// must close the session — this guards against cross-session replay.
#[test]
fn seal_request_with_wrong_id_closes_session() {
    let (_temp, mut stream, handoff_id, _thread) = make_active_session();

    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::PrepareHandoff {
            handoff_id,
            successor_pid: 9999,
            deadline_ms: 5000,
            drain_grace_ms: 1000,
        },
    )
    .unwrap();
    let (_v, drained) = read_message(&mut stream).unwrap();
    assert!(matches!(drained, Message::Drained { .. }));

    let other_id = HandoffId::new();
    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::SealRequest {
            handoff_id: other_id,
        },
    )
    .unwrap();

    assert!(read_message(&mut stream).is_err());
}

/// `Commit` for a handoff_id that doesn't match `active` must close
/// the session.
#[test]
fn commit_with_wrong_id_closes_session() {
    let (_temp, mut stream, handoff_id, _thread) = make_active_session();

    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::PrepareHandoff {
            handoff_id,
            successor_pid: 9999,
            deadline_ms: 5000,
            drain_grace_ms: 1000,
        },
    )
    .unwrap();
    let (_v, _) = read_message(&mut stream).unwrap();
    write_message(&mut stream, PROTO_MAX, &Message::SealRequest { handoff_id }).unwrap();
    let (_v, sealed) = read_message(&mut stream).unwrap();
    assert!(matches!(sealed, Message::SealComplete { .. }));

    let other_id = HandoffId::new();
    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::Commit {
            handoff_id: other_id,
        },
    )
    .unwrap();

    assert!(read_message(&mut stream).is_err());
}

/// Each of these frames is illegal in *every* incumbent state — they
/// originate from the supervisor side of the protocol (Hello echoes from
/// the supervisor's own write) or from N→S (Ready, Drained, SealComplete,
/// SealFailed, SealProgress, Begin). The incumbent must close on any of
/// them and the serve loop must continue accepting *new* sessions.
#[test]
fn wrong_direction_frames_close_session_but_keep_serving() {
    let illegal: Vec<Message> = vec![
        Message::Hello {
            role: Side::Successor,
            pid: 1,
            build_id: Vec::new(),
            proto_min: PROTO_MAX,
            proto_max: PROTO_MAX,
            capabilities: Default::default(),
        },
        Message::HelloAck {
            proto_version_chosen: PROTO_MAX,
            handoff_id: HandoffId::new(),
        },
        Message::Drained {
            open_conns_remaining: 0,
            accept_closed: true,
        },
        Message::SealComplete {
            handoff_id: HandoffId::new(),
            last_revision_per_shard: vec![1],
            data_dir_fingerprint: [0u8; 32],
        },
        Message::SealFailed {
            handoff_id: HandoffId::new(),
            error: "x".into(),
            partial_state: String::new(),
        },
        Message::SealProgress {
            shards_sealed: 1,
            shards_total: 2,
            last_revision: 0,
        },
        Message::Begin {
            handoff_id: HandoffId::new(),
        },
        Message::Ready {
            handoff_id: HandoffId::new(),
            listening_on: vec!["tcp".into()],
            healthz_ok: true,
            advertised_revision_per_shard: vec![1],
        },
    ];

    for msg in illegal {
        let label = std::mem::discriminant(&msg);
        let (_temp, mut stream, _id, _thread) = make_active_session();
        write_message(&mut stream, PROTO_MAX, &msg).unwrap();
        assert!(
            read_message(&mut stream).is_err(),
            "session must close after illegal {label:?}, but read succeeded"
        );
        drop(stream);

        // A fresh session on the same socket succeeds — serve loop is alive.
        // (Implicit: the JoinHandle is still pending, the listener is still
        // accepting. We don't reopen here because each iteration uses a
        // fresh tempdir; the property under test is "session close, not
        // serve-loop exit" and is covered by the next iteration spawning
        // its own server.)
    }
}

/// `Heartbeat` is the one frame that's always legal — the incumbent
/// echoes it. Should round-trip without disturbing state.
#[test]
fn heartbeat_is_idempotent_in_any_state() {
    let (_temp, mut stream, handoff_id, _thread) = make_active_session();

    // Heartbeat before any protocol step.
    for _ in 0..3 {
        write_message(&mut stream, PROTO_MAX, &Message::Heartbeat { ts_ms: 0 }).unwrap();
        let (_v, echoed) = read_message(&mut stream).unwrap();
        assert!(matches!(echoed, Message::Heartbeat { .. }));
    }

    // Heartbeat between PrepareHandoff and SealRequest.
    write_message(
        &mut stream,
        PROTO_MAX,
        &Message::PrepareHandoff {
            handoff_id,
            successor_pid: 9999,
            deadline_ms: 5000,
            drain_grace_ms: 1000,
        },
    )
    .unwrap();
    let (_v, _) = read_message(&mut stream).unwrap();

    for _ in 0..3 {
        write_message(&mut stream, PROTO_MAX, &Message::Heartbeat { ts_ms: 1 }).unwrap();
        let (_v, echoed) = read_message(&mut stream).unwrap();
        assert!(matches!(echoed, Message::Heartbeat { .. }));
    }

    // The protocol should still complete normally after the heartbeats.
    write_message(&mut stream, PROTO_MAX, &Message::SealRequest { handoff_id }).unwrap();
    let (_v, sealed) = read_message(&mut stream).unwrap();
    assert!(matches!(sealed, Message::SealComplete { .. }));

    write_message(&mut stream, PROTO_MAX, &Message::Commit { handoff_id }).unwrap();
    // Server thread exits after Commit; give it a beat.
    thread::sleep(Duration::from_millis(50));
}