Skip to main content

running_process/broker/server/handoff/
wire.rs

1//! Production wire-frame handoff delivery over a broker↔backend control
2//! connection (#354, slice 6).
3//!
4//! Earlier slices abstracted delivery of the `(handle value, token)` pair
5//! behind [`HandoffDelivery`] because the v1
6//! envelope reserved no backend→broker ACK frame. This module closes that
7//! gap with two envelope messages riding the existing v1 frame layout on a
8//! framed broker↔backend connection:
9//!
10//! - [`HandoffOffer`] (broker → backend, `FRAME_KIND_REQUEST`): the
11//!   duplicated handle value (Windows; zero on Unix where the fd travels
12//!   via `SCM_RIGHTS`), the 16-byte one-time token, the service name, and
13//!   a correlation id.
14//! - [`HandoffAck`] (backend → broker, `FRAME_KIND_RESPONSE`): the token
15//!   echo, an accepted flag plus error detail, and the correlation id echo.
16//!
17//! Both ride `Frame.payload` under [`HANDOFF_PAYLOAD_PROTOCOL`], mirroring
18//! how Hello (`0x00`), admin verbs (`0xAD01`), and endpoint probes
19//! (`0xB232`) share the envelope.
20//!
21//! [`WireHandoffDelivery`] implements [`HandoffDelivery`] over any framed
22//! `Read + Write` stream (the same local-socket framing used
23//! by every other broker connection). Any malformed frame, token-echo
24//! mismatch, correlation-id mismatch, refused ACK, or overdue ACK is
25//! reported as a delivery error; the orchestration in
26//! [`super::orchestrate`] then revokes the token and falls back to the
27//! `backend_pipe` reconnect path. This module never panics on wire input.
28//!
29//! # Deadline enforcement
30//!
31//! [`WireHandoffDelivery::await_backend_ack`] performs a blocking framed
32//! read. Wall-clock interruption of a backend that never writes anything
33//! relies on the caller configuring a read timeout on the underlying
34//! stream (e.g. `set_nonblocking` + polling, or a socket read timeout);
35//! a closed/erroring stream surfaces immediately. Independently of the
36//! stream, an ACK observed after `deadline` is rejected here, and the
37//! [`HandoffAckRegistry`](super::HandoffAckRegistry) re-validates the
38//! observation instant against the deadline registered at issuance, so a
39//! slow stream can never complete an overdue handoff.
40
41use std::io::{Read, Write};
42use std::time::Instant;
43
44use prost::Message;
45
46use crate::broker::protocol::{
47    read_frame, write_frame, Frame, FrameKind, HandoffAck, HandoffOffer, PayloadEncoding,
48};
49use crate::broker::server::handoff::handoff_token::HandoffToken;
50use crate::broker::server::handoff::orchestrate::{HandoffDelivery, HandoffDeliveryError};
51use crate::broker::server::handoff::windows::WindowsHandleValue;
52
53/// Payload protocol reserved for broker↔backend handoff offer/ACK frames.
54///
55/// Lives in the same envelope-multiplexing space as the control plane
56/// (`0x00`), admin verbs (`0xAD01`), and backend-handle endpoint probes
57/// (`0xB232`).
58pub const HANDOFF_PAYLOAD_PROTOCOL: u32 = 0xD0FF;
59
60const PROTOCOL_VERSION: u32 = 1;
61
62/// Build the v1 frame carrying one broker→backend [`HandoffOffer`].
63pub fn handoff_offer_frame(offer: &HandoffOffer) -> Frame {
64    let mut payload = Vec::with_capacity(64);
65    offer.encode(&mut payload).expect(
66        "prost encoding HandoffOffer into Vec cannot fail because Vec writes are infallible",
67    );
68    Frame {
69        envelope_version: PROTOCOL_VERSION,
70        kind: FrameKind::Request as i32,
71        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
72        payload,
73        request_id: offer.correlation_id,
74        payload_encoding: PayloadEncoding::None as i32,
75        deadline_unix_ms: 0,
76        traceparent: String::new(),
77        tracestate: String::new(),
78    }
79}
80
81/// Build the v1 frame carrying one backend→broker [`HandoffAck`].
82pub fn handoff_ack_frame(ack: &HandoffAck) -> Frame {
83    let mut payload = Vec::with_capacity(64);
84    ack.encode(&mut payload)
85        .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
86    Frame {
87        envelope_version: PROTOCOL_VERSION,
88        kind: FrameKind::Response as i32,
89        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
90        payload,
91        request_id: ack.correlation_id,
92        payload_encoding: PayloadEncoding::None as i32,
93        deadline_unix_ms: 0,
94        traceparent: String::new(),
95        tracestate: String::new(),
96    }
97}
98
99/// Build the v1 frame relaying one completed handoff to the CLIENT (#354,
100/// slice 7).
101///
102/// After the backend ACKs a [`HandoffOffer`], the broker relays the
103/// backend's [`HandoffAck`] verbatim to the waiting client on the client's
104/// original broker connection — the same socket that carried Hello and is
105/// now backend-served. The relay rides the envelope as a broker→client push
106/// (`FRAME_KIND_EVENT`) under [`HANDOFF_PAYLOAD_PROTOCOL`], mirroring how
107/// the offer/ACK pair rides the broker↔backend control connection. The
108/// client matches the relay by the one-time token echo (the only handoff
109/// secret it knows); the correlation id is broker↔backend bookkeeping that
110/// the client does not validate.
111pub fn handoff_ready_frame(ack: &HandoffAck) -> Frame {
112    let mut payload = Vec::with_capacity(64);
113    ack.encode(&mut payload)
114        .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
115    Frame {
116        envelope_version: PROTOCOL_VERSION,
117        kind: FrameKind::Event as i32,
118        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
119        payload,
120        request_id: ack.correlation_id,
121        payload_encoding: PayloadEncoding::None as i32,
122        deadline_unix_ms: 0,
123        traceparent: String::new(),
124        tracestate: String::new(),
125    }
126}
127
128/// Validate the envelope fields shared by both handoff frame directions.
129///
130/// Returns the expected-vs-actual mismatch as a static description so both
131/// the broker and backend sides report wire violations uniformly.
132pub fn validate_handoff_frame(frame: &Frame, expected_kind: FrameKind) -> Result<(), &'static str> {
133    if frame.envelope_version != PROTOCOL_VERSION {
134        return Err("envelope_version is not v1");
135    }
136    if FrameKind::try_from(frame.kind) != Ok(expected_kind) {
137        return Err(match expected_kind {
138            FrameKind::Request => "kind is not REQUEST",
139            FrameKind::Event => "kind is not EVENT",
140            _ => "kind is not RESPONSE",
141        });
142    }
143    if frame.payload_protocol != HANDOFF_PAYLOAD_PROTOCOL {
144        return Err("payload_protocol is not handoff");
145    }
146    if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
147        return Err("payload is compressed");
148    }
149    Ok(())
150}
151
152/// [`HandoffDelivery`] implementation that sends [`HandoffOffer`] frames to
153/// the backend over a framed control connection and waits for the matching
154/// [`HandoffAck`].
155///
156/// `deliver` writes one offer frame; `await_backend_ack` reads one response
157/// frame and requires the token echo, the correlation id, and the accepted
158/// flag to all match. Every violation is a delivery error — the
159/// orchestration falls back to reconnect and revokes the token.
160#[derive(Debug)]
161pub struct WireHandoffDelivery<S> {
162    stream: S,
163    service_name: String,
164    correlation_id: u64,
165}
166
167impl<S> WireHandoffDelivery<S> {
168    /// Wrap a framed broker↔backend connection for one handoff.
169    ///
170    /// `correlation_id` ties the offer to its ACK; reuse the request or
171    /// connection id of the client Hello that triggered the handoff.
172    pub fn new(stream: S, service_name: impl Into<String>, correlation_id: u64) -> Self {
173        Self {
174            stream,
175            service_name: service_name.into(),
176            correlation_id,
177        }
178    }
179
180    /// Return the correlation id stamped on the offer and required on the ACK.
181    pub fn correlation_id(&self) -> u64 {
182        self.correlation_id
183    }
184
185    /// Unwrap the underlying connection (e.g. to keep using it after a
186    /// completed handoff).
187    pub fn into_stream(self) -> S {
188        self.stream
189    }
190}
191
192impl<S: Read + Write> HandoffDelivery for WireHandoffDelivery<S> {
193    fn deliver(
194        &mut self,
195        handle: WindowsHandleValue,
196        token: &HandoffToken,
197    ) -> Result<(), HandoffDeliveryError> {
198        let offer = HandoffOffer {
199            handle_value: handle.get() as u64,
200            token: token.as_bytes().to_vec(),
201            service_name: self.service_name.clone(),
202            correlation_id: self.correlation_id,
203        };
204        let frame = handoff_offer_frame(&offer);
205        let mut bytes = Vec::with_capacity(64);
206        frame
207            .encode(&mut bytes)
208            .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
209        write_frame(&mut self.stream, &bytes).map_err(|error| {
210            HandoffDeliveryError::DeliveryFailed {
211                detail: format!("failed to write HandoffOffer frame: {error}"),
212            }
213        })?;
214        Ok(())
215    }
216
217    fn await_backend_ack(
218        &mut self,
219        token: &HandoffToken,
220        deadline: Instant,
221    ) -> Result<Instant, HandoffDeliveryError> {
222        let bytes = read_frame(&mut self.stream).map_err(|error| {
223            ack_not_observed(format!("failed to read HandoffAck frame: {error}"))
224        })?;
225        let observed_at = Instant::now();
226        let frame = Frame::decode(bytes.as_slice()).map_err(|error| {
227            ack_not_observed(format!("failed to decode HandoffAck Frame: {error}"))
228        })?;
229        validate_handoff_frame(&frame, FrameKind::Response)
230            .map_err(|detail| ack_not_observed(format!("unexpected HandoffAck frame: {detail}")))?;
231        if frame.request_id != self.correlation_id {
232            return Err(ack_not_observed(format!(
233                "HandoffAck frame request_id {} does not match correlation id {}",
234                frame.request_id, self.correlation_id
235            )));
236        }
237        let ack = HandoffAck::decode(frame.payload.as_slice()).map_err(|error| {
238            ack_not_observed(format!("failed to decode HandoffAck payload: {error}"))
239        })?;
240        if ack.correlation_id != self.correlation_id {
241            return Err(ack_not_observed(format!(
242                "HandoffAck correlation id {} does not match offer correlation id {}",
243                ack.correlation_id, self.correlation_id
244            )));
245        }
246        if ack.token != token.as_bytes() {
247            return Err(ack_not_observed(
248                "HandoffAck token echo does not match the offered token".to_string(),
249            ));
250        }
251        if !ack.accepted {
252            return Err(ack_not_observed(format!(
253                "backend refused the handoff: {}",
254                if ack.error_detail.is_empty() {
255                    "no detail provided"
256                } else {
257                    ack.error_detail.as_str()
258                }
259            )));
260        }
261        if observed_at > deadline {
262            return Err(ack_not_observed(
263                "backend HandoffAck arrived after the ACK deadline".to_string(),
264            ));
265        }
266        Ok(observed_at)
267    }
268}
269
270fn ack_not_observed(detail: String) -> HandoffDeliveryError {
271    HandoffDeliveryError::AckNotObserved { detail }
272}