running_process/broker/server/handoff/
wire.rs1use std::io::{Read, Write};
42use std::time::Instant;
43
44use prost::Message;
45
46use crate::broker::protocol::{
47 read_frame, write_frame, Frame, FrameKind, HandoffAck, HandoffOffer, PayloadEncoding,
48};
49use crate::broker::server::handoff::handoff_token::HandoffToken;
50use crate::broker::server::handoff::orchestrate::{HandoffDelivery, HandoffDeliveryError};
51use crate::broker::server::handoff::windows::WindowsHandleValue;
52
53pub const HANDOFF_PAYLOAD_PROTOCOL: u32 = 0xD0FF;
59
60const PROTOCOL_VERSION: u32 = 1;
61
62pub fn handoff_offer_frame(offer: &HandoffOffer) -> Frame {
64 let mut payload = Vec::with_capacity(64);
65 offer.encode(&mut payload).expect(
66 "prost encoding HandoffOffer into Vec cannot fail because Vec writes are infallible",
67 );
68 Frame {
69 envelope_version: PROTOCOL_VERSION,
70 kind: FrameKind::Request as i32,
71 payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
72 payload,
73 request_id: offer.correlation_id,
74 payload_encoding: PayloadEncoding::None as i32,
75 deadline_unix_ms: 0,
76 traceparent: String::new(),
77 tracestate: String::new(),
78 }
79}
80
81pub fn handoff_ack_frame(ack: &HandoffAck) -> Frame {
83 let mut payload = Vec::with_capacity(64);
84 ack.encode(&mut payload)
85 .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
86 Frame {
87 envelope_version: PROTOCOL_VERSION,
88 kind: FrameKind::Response as i32,
89 payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
90 payload,
91 request_id: ack.correlation_id,
92 payload_encoding: PayloadEncoding::None as i32,
93 deadline_unix_ms: 0,
94 traceparent: String::new(),
95 tracestate: String::new(),
96 }
97}
98
99pub fn handoff_ready_frame(ack: &HandoffAck) -> Frame {
112 let mut payload = Vec::with_capacity(64);
113 ack.encode(&mut payload)
114 .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
115 Frame {
116 envelope_version: PROTOCOL_VERSION,
117 kind: FrameKind::Event as i32,
118 payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
119 payload,
120 request_id: ack.correlation_id,
121 payload_encoding: PayloadEncoding::None as i32,
122 deadline_unix_ms: 0,
123 traceparent: String::new(),
124 tracestate: String::new(),
125 }
126}
127
128pub fn validate_handoff_frame(frame: &Frame, expected_kind: FrameKind) -> Result<(), &'static str> {
133 if frame.envelope_version != PROTOCOL_VERSION {
134 return Err("envelope_version is not v1");
135 }
136 if FrameKind::try_from(frame.kind) != Ok(expected_kind) {
137 return Err(match expected_kind {
138 FrameKind::Request => "kind is not REQUEST",
139 FrameKind::Event => "kind is not EVENT",
140 _ => "kind is not RESPONSE",
141 });
142 }
143 if frame.payload_protocol != HANDOFF_PAYLOAD_PROTOCOL {
144 return Err("payload_protocol is not handoff");
145 }
146 if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
147 return Err("payload is compressed");
148 }
149 Ok(())
150}
151
152#[derive(Debug)]
161pub struct WireHandoffDelivery<S> {
162 stream: S,
163 service_name: String,
164 correlation_id: u64,
165}
166
167impl<S> WireHandoffDelivery<S> {
168 pub fn new(stream: S, service_name: impl Into<String>, correlation_id: u64) -> Self {
173 Self {
174 stream,
175 service_name: service_name.into(),
176 correlation_id,
177 }
178 }
179
180 pub fn correlation_id(&self) -> u64 {
182 self.correlation_id
183 }
184
185 pub fn into_stream(self) -> S {
188 self.stream
189 }
190}
191
192impl<S: Read + Write> HandoffDelivery for WireHandoffDelivery<S> {
193 fn deliver(
194 &mut self,
195 handle: WindowsHandleValue,
196 token: &HandoffToken,
197 ) -> Result<(), HandoffDeliveryError> {
198 let offer = HandoffOffer {
199 handle_value: handle.get() as u64,
200 token: token.as_bytes().to_vec(),
201 service_name: self.service_name.clone(),
202 correlation_id: self.correlation_id,
203 };
204 let frame = handoff_offer_frame(&offer);
205 let mut bytes = Vec::with_capacity(64);
206 frame
207 .encode(&mut bytes)
208 .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
209 write_frame(&mut self.stream, &bytes).map_err(|error| {
210 HandoffDeliveryError::DeliveryFailed {
211 detail: format!("failed to write HandoffOffer frame: {error}"),
212 }
213 })?;
214 Ok(())
215 }
216
217 fn await_backend_ack(
218 &mut self,
219 token: &HandoffToken,
220 deadline: Instant,
221 ) -> Result<Instant, HandoffDeliveryError> {
222 let bytes = read_frame(&mut self.stream).map_err(|error| {
223 ack_not_observed(format!("failed to read HandoffAck frame: {error}"))
224 })?;
225 let observed_at = Instant::now();
226 let frame = Frame::decode(bytes.as_slice()).map_err(|error| {
227 ack_not_observed(format!("failed to decode HandoffAck Frame: {error}"))
228 })?;
229 validate_handoff_frame(&frame, FrameKind::Response)
230 .map_err(|detail| ack_not_observed(format!("unexpected HandoffAck frame: {detail}")))?;
231 if frame.request_id != self.correlation_id {
232 return Err(ack_not_observed(format!(
233 "HandoffAck frame request_id {} does not match correlation id {}",
234 frame.request_id, self.correlation_id
235 )));
236 }
237 let ack = HandoffAck::decode(frame.payload.as_slice()).map_err(|error| {
238 ack_not_observed(format!("failed to decode HandoffAck payload: {error}"))
239 })?;
240 if ack.correlation_id != self.correlation_id {
241 return Err(ack_not_observed(format!(
242 "HandoffAck correlation id {} does not match offer correlation id {}",
243 ack.correlation_id, self.correlation_id
244 )));
245 }
246 if ack.token != token.as_bytes() {
247 return Err(ack_not_observed(
248 "HandoffAck token echo does not match the offered token".to_string(),
249 ));
250 }
251 if !ack.accepted {
252 return Err(ack_not_observed(format!(
253 "backend refused the handoff: {}",
254 if ack.error_detail.is_empty() {
255 "no detail provided"
256 } else {
257 ack.error_detail.as_str()
258 }
259 )));
260 }
261 if observed_at > deadline {
262 return Err(ack_not_observed(
263 "backend HandoffAck arrived after the ACK deadline".to_string(),
264 ));
265 }
266 Ok(observed_at)
267 }
268}
269
270fn ack_not_observed(detail: String) -> HandoffDeliveryError {
271 HandoffDeliveryError::AckNotObserved { detail }
272}