running-process 4.1.0

Subprocess and PTY runtime for the running-process project
Documentation
//! Production wire-frame handoff delivery over a broker↔backend control
//! connection (#354, slice 6).
//!
//! Earlier slices abstracted delivery of the `(handle value, token)` pair
//! behind [`HandoffDelivery`] because the v1
//! envelope reserved no backend→broker ACK frame. This module closes that
//! gap with two envelope messages riding the existing v1 frame layout on a
//! framed broker↔backend connection:
//!
//! - [`HandoffOffer`] (broker → backend, `FRAME_KIND_REQUEST`): the
//!   duplicated handle value (Windows; zero on Unix where the fd travels
//!   via `SCM_RIGHTS`), the 16-byte one-time token, the service name, and
//!   a correlation id.
//! - [`HandoffAck`] (backend → broker, `FRAME_KIND_RESPONSE`): the token
//!   echo, an accepted flag plus error detail, and the correlation id echo.
//!
//! Both ride `Frame.payload` under [`HANDOFF_PAYLOAD_PROTOCOL`], mirroring
//! how Hello (`0x00`), admin verbs (`0xAD01`), and endpoint probes
//! (`0xB232`) share the envelope.
//!
//! [`WireHandoffDelivery`] implements [`HandoffDelivery`] over any framed
//! `Read + Write` stream (the same local-socket framing used
//! by every other broker connection). Any malformed frame, token-echo
//! mismatch, correlation-id mismatch, refused ACK, or overdue ACK is
//! reported as a delivery error; the orchestration in
//! [`super::orchestrate`] then revokes the token and falls back to the
//! `backend_pipe` reconnect path. This module never panics on wire input.
//!
//! # Deadline enforcement
//!
//! [`WireHandoffDelivery::await_backend_ack`] performs a blocking framed
//! read. Wall-clock interruption of a backend that never writes anything
//! relies on the caller configuring a read timeout on the underlying
//! stream (e.g. `set_nonblocking` + polling, or a socket read timeout);
//! a closed/erroring stream surfaces immediately. Independently of the
//! stream, an ACK observed after `deadline` is rejected here, and the
//! [`HandoffAckRegistry`](super::HandoffAckRegistry) re-validates the
//! observation instant against the deadline registered at issuance, so a
//! slow stream can never complete an overdue handoff.

use std::io::{Read, Write};
use std::time::Instant;

use prost::Message;

use crate::broker::protocol::{
    read_frame, write_frame, Frame, FrameKind, HandoffAck, HandoffOffer, PayloadEncoding,
};
use crate::broker::server::handoff::handoff_token::HandoffToken;
use crate::broker::server::handoff::orchestrate::{HandoffDelivery, HandoffDeliveryError};
use crate::broker::server::handoff::windows::WindowsHandleValue;

/// Payload protocol reserved for broker↔backend handoff offer/ACK frames.
///
/// Lives in the same envelope-multiplexing space as the control plane
/// (`0x00`), admin verbs (`0xAD01`), and backend-handle endpoint probes
/// (`0xB232`).
pub const HANDOFF_PAYLOAD_PROTOCOL: u32 = 0xD0FF;

const PROTOCOL_VERSION: u32 = 1;

/// Build the v1 frame carrying one broker→backend [`HandoffOffer`].
pub fn handoff_offer_frame(offer: &HandoffOffer) -> Frame {
    let mut payload = Vec::with_capacity(64);
    offer.encode(&mut payload).expect(
        "prost encoding HandoffOffer into Vec cannot fail because Vec writes are infallible",
    );
    Frame {
        envelope_version: PROTOCOL_VERSION,
        kind: FrameKind::Request as i32,
        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
        payload,
        request_id: offer.correlation_id,
        payload_encoding: PayloadEncoding::None as i32,
        deadline_unix_ms: 0,
        traceparent: String::new(),
        tracestate: String::new(),
    }
}

/// Build the v1 frame carrying one backend→broker [`HandoffAck`].
pub fn handoff_ack_frame(ack: &HandoffAck) -> Frame {
    let mut payload = Vec::with_capacity(64);
    ack.encode(&mut payload)
        .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
    Frame {
        envelope_version: PROTOCOL_VERSION,
        kind: FrameKind::Response as i32,
        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
        payload,
        request_id: ack.correlation_id,
        payload_encoding: PayloadEncoding::None as i32,
        deadline_unix_ms: 0,
        traceparent: String::new(),
        tracestate: String::new(),
    }
}

/// Build the v1 frame relaying one completed handoff to the CLIENT (#354,
/// slice 7).
///
/// After the backend ACKs a [`HandoffOffer`], the broker relays the
/// backend's [`HandoffAck`] verbatim to the waiting client on the client's
/// original broker connection — the same socket that carried Hello and is
/// now backend-served. The relay rides the envelope as a broker→client push
/// (`FRAME_KIND_EVENT`) under [`HANDOFF_PAYLOAD_PROTOCOL`], mirroring how
/// the offer/ACK pair rides the broker↔backend control connection. The
/// client matches the relay by the one-time token echo (the only handoff
/// secret it knows); the correlation id is broker↔backend bookkeeping that
/// the client does not validate.
pub fn handoff_ready_frame(ack: &HandoffAck) -> Frame {
    let mut payload = Vec::with_capacity(64);
    ack.encode(&mut payload)
        .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
    Frame {
        envelope_version: PROTOCOL_VERSION,
        kind: FrameKind::Event as i32,
        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
        payload,
        request_id: ack.correlation_id,
        payload_encoding: PayloadEncoding::None as i32,
        deadline_unix_ms: 0,
        traceparent: String::new(),
        tracestate: String::new(),
    }
}

/// Validate the envelope fields shared by both handoff frame directions.
///
/// Returns the expected-vs-actual mismatch as a static description so both
/// the broker and backend sides report wire violations uniformly.
pub fn validate_handoff_frame(frame: &Frame, expected_kind: FrameKind) -> Result<(), &'static str> {
    if frame.envelope_version != PROTOCOL_VERSION {
        return Err("envelope_version is not v1");
    }
    if FrameKind::try_from(frame.kind) != Ok(expected_kind) {
        return Err(match expected_kind {
            FrameKind::Request => "kind is not REQUEST",
            FrameKind::Event => "kind is not EVENT",
            _ => "kind is not RESPONSE",
        });
    }
    if frame.payload_protocol != HANDOFF_PAYLOAD_PROTOCOL {
        return Err("payload_protocol is not handoff");
    }
    if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
        return Err("payload is compressed");
    }
    Ok(())
}

/// [`HandoffDelivery`] implementation that sends [`HandoffOffer`] frames to
/// the backend over a framed control connection and waits for the matching
/// [`HandoffAck`].
///
/// `deliver` writes one offer frame; `await_backend_ack` reads one response
/// frame and requires the token echo, the correlation id, and the accepted
/// flag to all match. Every violation is a delivery error — the
/// orchestration falls back to reconnect and revokes the token.
#[derive(Debug)]
pub struct WireHandoffDelivery<S> {
    stream: S,
    service_name: String,
    correlation_id: u64,
}

impl<S> WireHandoffDelivery<S> {
    /// Wrap a framed broker↔backend connection for one handoff.
    ///
    /// `correlation_id` ties the offer to its ACK; reuse the request or
    /// connection id of the client Hello that triggered the handoff.
    pub fn new(stream: S, service_name: impl Into<String>, correlation_id: u64) -> Self {
        Self {
            stream,
            service_name: service_name.into(),
            correlation_id,
        }
    }

    /// Return the correlation id stamped on the offer and required on the ACK.
    pub fn correlation_id(&self) -> u64 {
        self.correlation_id
    }

    /// Unwrap the underlying connection (e.g. to keep using it after a
    /// completed handoff).
    pub fn into_stream(self) -> S {
        self.stream
    }
}

impl<S: Read + Write> HandoffDelivery for WireHandoffDelivery<S> {
    fn deliver(
        &mut self,
        handle: WindowsHandleValue,
        token: &HandoffToken,
    ) -> Result<(), HandoffDeliveryError> {
        let offer = HandoffOffer {
            handle_value: handle.get() as u64,
            token: token.as_bytes().to_vec(),
            service_name: self.service_name.clone(),
            correlation_id: self.correlation_id,
        };
        let frame = handoff_offer_frame(&offer);
        let mut bytes = Vec::with_capacity(64);
        frame
            .encode(&mut bytes)
            .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
        write_frame(&mut self.stream, &bytes).map_err(|error| {
            HandoffDeliveryError::DeliveryFailed {
                detail: format!("failed to write HandoffOffer frame: {error}"),
            }
        })?;
        Ok(())
    }

    fn await_backend_ack(
        &mut self,
        token: &HandoffToken,
        deadline: Instant,
    ) -> Result<Instant, HandoffDeliveryError> {
        let bytes = read_frame(&mut self.stream).map_err(|error| {
            ack_not_observed(format!("failed to read HandoffAck frame: {error}"))
        })?;
        let observed_at = Instant::now();
        let frame = Frame::decode(bytes.as_slice()).map_err(|error| {
            ack_not_observed(format!("failed to decode HandoffAck Frame: {error}"))
        })?;
        validate_handoff_frame(&frame, FrameKind::Response)
            .map_err(|detail| ack_not_observed(format!("unexpected HandoffAck frame: {detail}")))?;
        if frame.request_id != self.correlation_id {
            return Err(ack_not_observed(format!(
                "HandoffAck frame request_id {} does not match correlation id {}",
                frame.request_id, self.correlation_id
            )));
        }
        let ack = HandoffAck::decode(frame.payload.as_slice()).map_err(|error| {
            ack_not_observed(format!("failed to decode HandoffAck payload: {error}"))
        })?;
        if ack.correlation_id != self.correlation_id {
            return Err(ack_not_observed(format!(
                "HandoffAck correlation id {} does not match offer correlation id {}",
                ack.correlation_id, self.correlation_id
            )));
        }
        if ack.token != token.as_bytes() {
            return Err(ack_not_observed(
                "HandoffAck token echo does not match the offered token".to_string(),
            ));
        }
        if !ack.accepted {
            return Err(ack_not_observed(format!(
                "backend refused the handoff: {}",
                if ack.error_detail.is_empty() {
                    "no detail provided"
                } else {
                    ack.error_detail.as_str()
                }
            )));
        }
        if observed_at > deadline {
            return Err(ack_not_observed(
                "backend HandoffAck arrived after the ACK deadline".to_string(),
            ));
        }
        Ok(observed_at)
    }
}

fn ack_not_observed(detail: String) -> HandoffDeliveryError {
    HandoffDeliveryError::AckNotObserved { detail }
}