Skip to main content

running_process/broker/backend_lib/
wire.rs

1//! Backend-side wire handling for broker handoff offers (#354, slice 6).
2//!
3//! Given a framed connection to the broker (the same v1 local-socket
4//! framing used by every other broker connection), this module:
5//!
6//! 1. reads one [`HandoffOffer`] frame
7//!    ([`read_handoff_offer`]),
8//! 2. validates and consumes the presented one-time token through the
9//!    existing [`accept_handed_off`] path, and
10//! 3. replies with a [`HandoffAck`] frame echoing the token and
11//!    correlation id ([`respond_to_handoff_offer`]).
12//!
13//! [`serve_handoff_offer`] composes all three for the common case. A
14//! rejected token still produces a well-formed `HandoffAck` with
15//! `accepted = false` and the rejection detail, so the broker can fall
16//! back to the reconnect path immediately instead of waiting out its ACK
17//! deadline.
18
19use std::io::{Read, Write};
20use std::time::Instant;
21
22use prost::Message;
23
24use crate::broker::backend_lib::accept_handed_off::{
25    accept_handed_off, HandedOffPayload, HandoffAcceptance,
26};
27use crate::broker::protocol::{
28    read_frame, write_frame, Frame, FrameKind, FramingError, HandoffAck, HandoffOffer,
29};
30use crate::broker::server::handoff::wire::{handoff_ack_frame, validate_handoff_frame};
31use crate::broker::server::{HandoffToken, HandoffTokenStore};
32
33/// Errors surfaced while reading or answering a handoff offer frame.
34#[derive(Debug, thiserror::Error)]
35pub enum BackendHandoffWireError {
36    /// v1 framing failed.
37    #[error(transparent)]
38    Framing(#[from] FramingError),
39    /// The offer frame could not be decoded.
40    #[error("failed to decode HandoffOffer Frame: {0}")]
41    DecodeFrame(prost::DecodeError),
42    /// The offer payload could not be decoded.
43    #[error("failed to decode HandoffOffer payload: {0}")]
44    DecodePayload(prost::DecodeError),
45    /// The frame did not match the handoff-offer contract.
46    #[error("unexpected HandoffOffer frame: {0}")]
47    UnexpectedFrame(&'static str),
48}
49
50/// Read and validate one broker→backend [`HandoffOffer`] frame.
51pub fn read_handoff_offer<S: Read>(
52    stream: &mut S,
53) -> Result<HandoffOffer, BackendHandoffWireError> {
54    let bytes = read_frame(stream)?;
55    let frame = Frame::decode(bytes.as_slice()).map_err(BackendHandoffWireError::DecodeFrame)?;
56    validate_handoff_frame(&frame, FrameKind::Request)
57        .map_err(BackendHandoffWireError::UnexpectedFrame)?;
58    let offer = HandoffOffer::decode(frame.payload.as_slice())
59        .map_err(BackendHandoffWireError::DecodePayload)?;
60    if frame.request_id != offer.correlation_id {
61        return Err(BackendHandoffWireError::UnexpectedFrame(
62            "frame request_id does not match HandoffOffer correlation_id",
63        ));
64    }
65    Ok(offer)
66}
67
68/// Validate/consume one received offer and write the matching [`HandoffAck`].
69///
70/// The presented token rides the offer; `expected_token` is the token the
71/// backend was told to expect (it arrived out of band, e.g. through the
72/// spawn environment). On acceptance the one-time token is consumed from
73/// `pending_tokens` exactly once and the ACK reports `accepted = true`; on
74/// rejection the ACK carries `accepted = false` plus the rejection detail.
75/// Either way the ACK echoes the offer's token bytes and correlation id.
76pub fn respond_to_handoff_offer<S: Write>(
77    stream: &mut S,
78    pending_tokens: &mut HandoffTokenStore,
79    expected_token: HandoffToken,
80    offer: HandoffOffer,
81    now: Instant,
82) -> Result<HandoffAcceptance<HandoffOffer>, BackendHandoffWireError> {
83    let presented_token = offer.token.clone();
84    let correlation_id = offer.correlation_id;
85    let payload = HandedOffPayload::new(expected_token, presented_token.clone(), offer);
86    let acceptance = accept_handed_off(pending_tokens, payload, now);
87
88    let ack = match &acceptance {
89        HandoffAcceptance::Accepted(_) => HandoffAck {
90            token: presented_token,
91            accepted: true,
92            error_detail: String::new(),
93            correlation_id,
94        },
95        HandoffAcceptance::Rejected(rejected) => HandoffAck {
96            token: presented_token,
97            accepted: false,
98            error_detail: rejected.reason.to_string(),
99            correlation_id,
100        },
101    };
102    write_handoff_ack(stream, &ack)?;
103    Ok(acceptance)
104}
105
106/// Write one backend→broker [`HandoffAck`] frame.
107pub fn write_handoff_ack<S: Write>(
108    stream: &mut S,
109    ack: &HandoffAck,
110) -> Result<(), BackendHandoffWireError> {
111    let frame = handoff_ack_frame(ack);
112    let mut bytes = Vec::with_capacity(64);
113    frame
114        .encode(&mut bytes)
115        .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
116    write_frame(stream, &bytes)?;
117    Ok(())
118}
119
120/// Read one offer, validate/consume the token, and reply with the ACK.
121///
122/// Convenience composition of [`read_handoff_offer`] and
123/// [`respond_to_handoff_offer`] for backends serving one handoff per
124/// control exchange.
125pub fn serve_handoff_offer<S: Read + Write>(
126    stream: &mut S,
127    pending_tokens: &mut HandoffTokenStore,
128    expected_token: HandoffToken,
129    now: Instant,
130) -> Result<HandoffAcceptance<HandoffOffer>, BackendHandoffWireError> {
131    let offer = read_handoff_offer(stream)?;
132    respond_to_handoff_offer(stream, pending_tokens, expected_token, offer, now)
133}