running_process/broker/server/handoff/
wire.rs1use std::io::{Read, Write};
42use std::time::Instant;
43
44use prost::Message;
45
46use crate::broker::protocol::{
47 read_frame, validate_frame_envelope, write_frame, Frame, FrameKind, FrameValidationError,
48 HandoffAck, HandoffOffer, PayloadEncoding, PROTOCOL_VERSION,
49};
50use crate::broker::server::handoff::handoff_token::HandoffToken;
51use crate::broker::server::handoff::orchestrate::{HandoffDelivery, HandoffDeliveryError};
52use crate::broker::server::handoff::windows::WindowsHandleValue;
53
54pub use crate::broker::protocol::registry::HANDOFF_PAYLOAD_PROTOCOL;
62
63pub fn handoff_offer_frame(offer: &HandoffOffer) -> Frame {
65 let mut payload = Vec::with_capacity(64);
66 offer.encode(&mut payload).expect(
67 "prost encoding HandoffOffer into Vec cannot fail because Vec writes are infallible",
68 );
69 Frame {
70 envelope_version: PROTOCOL_VERSION,
71 kind: FrameKind::Request as i32,
72 payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
73 payload,
74 request_id: offer.correlation_id,
75 payload_encoding: PayloadEncoding::None as i32,
76 deadline_unix_ms: 0,
77 traceparent: String::new(),
78 tracestate: String::new(),
79 }
80}
81
82pub fn handoff_ack_frame(ack: &HandoffAck) -> Frame {
84 let mut payload = Vec::with_capacity(64);
85 ack.encode(&mut payload)
86 .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
87 Frame {
88 envelope_version: PROTOCOL_VERSION,
89 kind: FrameKind::Response as i32,
90 payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
91 payload,
92 request_id: ack.correlation_id,
93 payload_encoding: PayloadEncoding::None as i32,
94 deadline_unix_ms: 0,
95 traceparent: String::new(),
96 tracestate: String::new(),
97 }
98}
99
100pub fn handoff_ready_frame(ack: &HandoffAck) -> Frame {
113 let mut payload = Vec::with_capacity(64);
114 ack.encode(&mut payload)
115 .expect("prost encoding HandoffAck into Vec cannot fail because Vec writes are infallible");
116 Frame {
117 envelope_version: PROTOCOL_VERSION,
118 kind: FrameKind::Event as i32,
119 payload_protocol: HANDOFF_PAYLOAD_PROTOCOL,
120 payload,
121 request_id: ack.correlation_id,
122 payload_encoding: PayloadEncoding::None as i32,
123 deadline_unix_ms: 0,
124 traceparent: String::new(),
125 tracestate: String::new(),
126 }
127}
128
129pub fn validate_handoff_frame(frame: &Frame, expected_kind: FrameKind) -> Result<(), &'static str> {
134 validate_frame_envelope(frame, expected_kind, HANDOFF_PAYLOAD_PROTOCOL).map_err(|error| {
135 match error {
136 FrameValidationError::EnvelopeVersion { .. } => "envelope_version is not v1",
137 FrameValidationError::Kind { .. } => match expected_kind {
138 FrameKind::Request => "kind is not REQUEST",
139 FrameKind::Event => "kind is not EVENT",
140 _ => "kind is not RESPONSE",
141 },
142 FrameValidationError::PayloadProtocol { .. } => "payload_protocol is not handoff",
143 FrameValidationError::PayloadEncoding { .. } => "payload is compressed",
144 }
145 })
146}
147
148#[derive(Debug)]
157pub struct WireHandoffDelivery<S> {
158 stream: S,
159 service_name: String,
160 correlation_id: u64,
161}
162
163impl<S> WireHandoffDelivery<S> {
164 pub fn new(stream: S, service_name: impl Into<String>, correlation_id: u64) -> Self {
169 Self {
170 stream,
171 service_name: service_name.into(),
172 correlation_id,
173 }
174 }
175
176 pub fn correlation_id(&self) -> u64 {
178 self.correlation_id
179 }
180
181 pub fn stream(&self) -> &S {
184 &self.stream
185 }
186
187 pub fn into_stream(self) -> S {
190 self.stream
191 }
192}
193
194impl<S: Read + Write> HandoffDelivery for WireHandoffDelivery<S> {
195 fn deliver(
196 &mut self,
197 handle: WindowsHandleValue,
198 token: &HandoffToken,
199 ) -> Result<(), HandoffDeliveryError> {
200 let offer = HandoffOffer {
201 handle_value: handle.get() as u64,
202 token: token.as_bytes().to_vec(),
203 service_name: self.service_name.clone(),
204 correlation_id: self.correlation_id,
205 };
206 let frame = handoff_offer_frame(&offer);
207 let mut bytes = Vec::with_capacity(64);
208 frame
209 .encode(&mut bytes)
210 .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
211 write_frame(&mut self.stream, &bytes).map_err(|error| {
212 HandoffDeliveryError::DeliveryFailed {
213 detail: format!("failed to write HandoffOffer frame: {error}"),
214 }
215 })?;
216 Ok(())
217 }
218
219 fn await_backend_ack(
220 &mut self,
221 token: &HandoffToken,
222 deadline: Instant,
223 ) -> Result<Instant, HandoffDeliveryError> {
224 let bytes = read_frame(&mut self.stream).map_err(|error| {
225 ack_not_observed(format!("failed to read HandoffAck frame: {error}"))
226 })?;
227 let observed_at = Instant::now();
228 let frame = Frame::decode(bytes.as_slice()).map_err(|error| {
229 ack_not_observed(format!("failed to decode HandoffAck Frame: {error}"))
230 })?;
231 validate_handoff_frame(&frame, FrameKind::Response)
232 .map_err(|detail| ack_not_observed(format!("unexpected HandoffAck frame: {detail}")))?;
233 if frame.request_id != self.correlation_id {
234 return Err(ack_not_observed(format!(
235 "HandoffAck frame request_id {} does not match correlation id {}",
236 frame.request_id, self.correlation_id
237 )));
238 }
239 let ack = HandoffAck::decode(frame.payload.as_slice()).map_err(|error| {
240 ack_not_observed(format!("failed to decode HandoffAck payload: {error}"))
241 })?;
242 if ack.correlation_id != self.correlation_id {
243 return Err(ack_not_observed(format!(
244 "HandoffAck correlation id {} does not match offer correlation id {}",
245 ack.correlation_id, self.correlation_id
246 )));
247 }
248 if ack.token != token.as_bytes() {
249 return Err(ack_not_observed(
250 "HandoffAck token echo does not match the offered token".to_string(),
251 ));
252 }
253 if !ack.accepted {
254 return Err(ack_not_observed(format!(
255 "backend refused the handoff: {}",
256 if ack.error_detail.is_empty() {
257 "no detail provided"
258 } else {
259 ack.error_detail.as_str()
260 }
261 )));
262 }
263 if observed_at > deadline {
264 return Err(ack_not_observed(
265 "backend HandoffAck arrived after the ACK deadline".to_string(),
266 ));
267 }
268 Ok(observed_at)
269 }
270}
271
272fn ack_not_observed(detail: String) -> HandoffDeliveryError {
273 HandoffDeliveryError::AckNotObserved { detail }
274}