Skip to main content

running_process/broker/server/handoff/
wire.rs

1//! Production wire-frame handoff delivery over a broker↔backend control
2//! connection (#354, slice 6).
3//!
4//! Earlier slices abstracted delivery of the `(handle value, token)` pair
5//! behind [`HandoffDelivery`] because the v1
6//! envelope reserved no backend→broker ACK frame. This module closes that
7//! gap with two envelope messages riding the existing v1 frame layout on a
8//! framed broker↔backend connection:
9//!
10//! - [`HandoffOffer`] (broker → backend, `FRAME_KIND_REQUEST`): the
11//!   duplicated handle value (Windows; zero on Unix where the fd travels
12//!   via `SCM_RIGHTS`), the 16-byte one-time token, the service name, and
13//!   a correlation id.
14//! - [`HandoffAck`] (backend → broker, `FRAME_KIND_RESPONSE`): the token
15//!   echo, an accepted flag plus error detail, and the correlation id echo.
16//!
17//! Both ride `Frame.payload` under [`HANDOFF_PAYLOAD_PROTOCOL`], mirroring
18//! how Hello (`0x00`), admin verbs (`0xAD01`), and endpoint probes
19//! (`0xB232`) share the envelope.
20//!
21//! [`WireHandoffDelivery`] implements [`HandoffDelivery`] over any framed
22//! `Read + Write` stream (the same local-socket framing used
23//! by every other broker connection). Any malformed frame, token-echo
24//! mismatch, correlation-id mismatch, refused ACK, or overdue ACK is
25//! reported as a delivery error; the orchestration in
26//! [`super::orchestrate`] then revokes the token and falls back to the
27//! `backend_pipe` reconnect path. This module never panics on wire input.
28//!
29//! # Deadline enforcement
30//!
31//! [`WireHandoffDelivery::await_backend_ack`] performs a blocking framed
32//! read. Wall-clock interruption of a backend that never writes anything
33//! relies on the caller configuring a read timeout on the underlying
34//! stream (e.g. `set_nonblocking` + polling, or a socket read timeout);
35//! a closed/erroring stream surfaces immediately. Independently of the
36//! stream, an ACK observed after `deadline` is rejected here, and the
37//! [`HandoffAckRegistry`](super::HandoffAckRegistry) re-validates the
38//! observation instant against the deadline registered at issuance, so a
39//! slow stream can never complete an overdue handoff.
40
41use std::io::{Read, Write};
42use std::time::Instant;
43
44use prost::Message;
45
46use crate::broker::protocol::{
47    read_frame, validate_frame_envelope, write_frame, Frame, FrameKind, FrameValidationError,
48    HandoffAck, HandoffOffer, PayloadEncoding, PROTOCOL_VERSION,
49};
50use crate::broker::server::handoff::handoff_token::HandoffToken;
51use crate::broker::server::handoff::orchestrate::{HandoffDelivery, HandoffDeliveryError};
52use crate::broker::server::handoff::windows::WindowsHandleValue;
53
54/// Payload protocol reserved for broker↔backend handoff offer/ACK frames.
55///
56/// Re-exported from the authoritative
57/// [`registry`](crate::broker::protocol::registry), which owns every v1
58/// payload-protocol ID (#375). Lives in the same envelope-multiplexing
59/// space as the control plane (`0x00`), admin verbs (`0xAD01`), and
60/// backend-handle endpoint probes (`0xB232`).
61pub use crate::broker::protocol::registry::HANDOFF_PAYLOAD_PROTOCOL;
62
63/// Build the v1 frame carrying one broker→backend [`HandoffOffer`].
64pub fn handoff_offer_frame(offer: &HandoffOffer) -> Frame {
65    let mut payload = Vec::with_capacity(64);
66    offer.encode(&mut payload).expect(
67        "prost encoding HandoffOffer into Vec cannot fail because Vec writes are infallible",
68    );
69    Frame {
70        envelope_version: PROTOCOL_VERSION,
71        kind: FrameKind::Request as i32,
72        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
73        payload,
74        request_id: offer.correlation_id,
75        payload_encoding: PayloadEncoding::None as i32,
76        deadline_unix_ms: 0,
77        traceparent: String::new(),
78        tracestate: String::new(),
79    }
80}
81
82/// Build the v1 frame carrying one backend→broker [`HandoffAck`].
83pub fn handoff_ack_frame(ack: &HandoffAck) -> Frame {
84    let mut payload = Vec::with_capacity(64);
85    ack.encode(&mut payload)
86        .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
87    Frame {
88        envelope_version: PROTOCOL_VERSION,
89        kind: FrameKind::Response as i32,
90        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
91        payload,
92        request_id: ack.correlation_id,
93        payload_encoding: PayloadEncoding::None as i32,
94        deadline_unix_ms: 0,
95        traceparent: String::new(),
96        tracestate: String::new(),
97    }
98}
99
100/// Build the v1 frame relaying one completed handoff to the CLIENT (#354,
101/// slice 7).
102///
103/// After the backend ACKs a [`HandoffOffer`], the broker relays the
104/// backend's [`HandoffAck`] verbatim to the waiting client on the client's
105/// original broker connection — the same socket that carried Hello and is
106/// now backend-served. The relay rides the envelope as a broker→client push
107/// (`FRAME_KIND_EVENT`) under [`HANDOFF_PAYLOAD_PROTOCOL`], mirroring how
108/// the offer/ACK pair rides the broker↔backend control connection. The
109/// client matches the relay by the one-time token echo (the only handoff
110/// secret it knows); the correlation id is broker↔backend bookkeeping that
111/// the client does not validate.
112pub fn handoff_ready_frame(ack: &HandoffAck) -> Frame {
113    let mut payload = Vec::with_capacity(64);
114    ack.encode(&mut payload)
115        .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
116    Frame {
117        envelope_version: PROTOCOL_VERSION,
118        kind: FrameKind::Event as i32,
119        payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
120        payload,
121        request_id: ack.correlation_id,
122        payload_encoding: PayloadEncoding::None as i32,
123        deadline_unix_ms: 0,
124        traceparent: String::new(),
125        tracestate: String::new(),
126    }
127}
128
129/// Validate the envelope fields shared by both handoff frame directions.
130///
131/// Returns the expected-vs-actual mismatch as a static description so both
132/// the broker and backend sides report wire violations uniformly.
133pub fn validate_handoff_frame(frame: &Frame, expected_kind: FrameKind) -> Result<(), &'static str> {
134    validate_frame_envelope(frame, expected_kind, HANDOFF_PAYLOAD_PROTOCOL).map_err(|error| {
135        match error {
136            FrameValidationError::EnvelopeVersion { .. } => "envelope_version is not v1",
137            FrameValidationError::Kind { .. } => match expected_kind {
138                FrameKind::Request => "kind is not REQUEST",
139                FrameKind::Event => "kind is not EVENT",
140                _ => "kind is not RESPONSE",
141            },
142            FrameValidationError::PayloadProtocol { .. } => "payload_protocol is not handoff",
143            FrameValidationError::PayloadEncoding { .. } => "payload is compressed",
144        }
145    })
146}
147
148/// [`HandoffDelivery`] implementation that sends [`HandoffOffer`] frames to
149/// the backend over a framed control connection and waits for the matching
150/// [`HandoffAck`].
151///
152/// `deliver` writes one offer frame; `await_backend_ack` reads one response
153/// frame and requires the token echo, the correlation id, and the accepted
154/// flag to all match. Every violation is a delivery error — the
155/// orchestration falls back to reconnect and revokes the token.
156#[derive(Debug)]
157pub struct WireHandoffDelivery<S> {
158    stream: S,
159    service_name: String,
160    correlation_id: u64,
161}
162
163impl<S> WireHandoffDelivery<S> {
164    /// Wrap a framed broker↔backend connection for one handoff.
165    ///
166    /// `correlation_id` ties the offer to its ACK; reuse the request or
167    /// connection id of the client Hello that triggered the handoff.
168    pub fn new(stream: S, service_name: impl Into<String>, correlation_id: u64) -> Self {
169        Self {
170            stream,
171            service_name: service_name.into(),
172            correlation_id,
173        }
174    }
175
176    /// Return the correlation id stamped on the offer and required on the ACK.
177    pub fn correlation_id(&self) -> u64 {
178        self.correlation_id
179    }
180
181    /// Borrow the underlying connection (e.g. to read its raw fd for the
182    /// Unix `SCM_RIGHTS` send that precedes the offer frame).
183    pub fn stream(&self) -> &S {
184        &self.stream
185    }
186
187    /// Unwrap the underlying connection (e.g. to keep using it after a
188    /// completed handoff).
189    pub fn into_stream(self) -> S {
190        self.stream
191    }
192}
193
194impl<S: Read + Write> HandoffDelivery for WireHandoffDelivery<S> {
195    fn deliver(
196        &mut self,
197        handle: WindowsHandleValue,
198        token: &HandoffToken,
199    ) -> Result<(), HandoffDeliveryError> {
200        let offer = HandoffOffer {
201            handle_value: handle.get() as u64,
202            token: token.as_bytes().to_vec(),
203            service_name: self.service_name.clone(),
204            correlation_id: self.correlation_id,
205        };
206        let frame = handoff_offer_frame(&offer);
207        let mut bytes = Vec::with_capacity(64);
208        frame
209            .encode(&mut bytes)
210            .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
211        write_frame(&mut self.stream, &bytes).map_err(|error| {
212            HandoffDeliveryError::DeliveryFailed {
213                detail: format!("failed to write HandoffOffer frame: {error}"),
214            }
215        })?;
216        Ok(())
217    }
218
219    fn await_backend_ack(
220        &mut self,
221        token: &HandoffToken,
222        deadline: Instant,
223    ) -> Result<Instant, HandoffDeliveryError> {
224        let bytes = read_frame(&mut self.stream).map_err(|error| {
225            ack_not_observed(format!("failed to read HandoffAck frame: {error}"))
226        })?;
227        let observed_at = Instant::now();
228        let frame = Frame::decode(bytes.as_slice()).map_err(|error| {
229            ack_not_observed(format!("failed to decode HandoffAck Frame: {error}"))
230        })?;
231        validate_handoff_frame(&frame, FrameKind::Response)
232            .map_err(|detail| ack_not_observed(format!("unexpected HandoffAck frame: {detail}")))?;
233        if frame.request_id != self.correlation_id {
234            return Err(ack_not_observed(format!(
235                "HandoffAck frame request_id {} does not match correlation id {}",
236                frame.request_id, self.correlation_id
237            )));
238        }
239        let ack = HandoffAck::decode(frame.payload.as_slice()).map_err(|error| {
240            ack_not_observed(format!("failed to decode HandoffAck payload: {error}"))
241        })?;
242        if ack.correlation_id != self.correlation_id {
243            return Err(ack_not_observed(format!(
244                "HandoffAck correlation id {} does not match offer correlation id {}",
245                ack.correlation_id, self.correlation_id
246            )));
247        }
248        if ack.token != token.as_bytes() {
249            return Err(ack_not_observed(
250                "HandoffAck token echo does not match the offered token".to_string(),
251            ));
252        }
253        if !ack.accepted {
254            return Err(ack_not_observed(format!(
255                "backend refused the handoff: {}",
256                if ack.error_detail.is_empty() {
257                    "no detail provided"
258                } else {
259                    ack.error_detail.as_str()
260                }
261            )));
262        }
263        if observed_at > deadline {
264            return Err(ack_not_observed(
265                "backend HandoffAck arrived after the ACK deadline".to_string(),
266            ));
267        }
268        Ok(observed_at)
269    }
270}
271
272fn ack_not_observed(detail: String) -> HandoffDeliveryError {
273    HandoffDeliveryError::AckNotObserved { detail }
274}