running_process/broker/backend_lib/
wire.rs1use std::io::{Read, Write};
20use std::time::Instant;
21
22use prost::Message;
23
24use crate::broker::backend_lib::accept_handed_off::{
25 accept_handed_off, HandedOffPayload, HandoffAcceptance,
26};
27use crate::broker::protocol::{
28 read_frame, write_frame, Frame, FrameKind, FramingError, HandoffAck, HandoffOffer,
29};
30use crate::broker::server::handoff::wire::{handoff_ack_frame, validate_handoff_frame};
31use crate::broker::server::{HandoffToken, HandoffTokenStore};
32
33#[derive(Debug, thiserror::Error)]
35pub enum BackendHandoffWireError {
36 #[error(transparent)]
38 Framing(#[from] FramingError),
39 #[error("failed to decode HandoffOffer Frame: {0}")]
41 DecodeFrame(prost::DecodeError),
42 #[error("failed to decode HandoffOffer payload: {0}")]
44 DecodePayload(prost::DecodeError),
45 #[error("unexpected HandoffOffer frame: {0}")]
47 UnexpectedFrame(&'static str),
48}
49
50pub fn read_handoff_offer<S: Read>(
52 stream: &mut S,
53) -> Result<HandoffOffer, BackendHandoffWireError> {
54 let bytes = read_frame(stream)?;
55 let frame = Frame::decode(bytes.as_slice()).map_err(BackendHandoffWireError::DecodeFrame)?;
56 validate_handoff_frame(&frame, FrameKind::Request)
57 .map_err(BackendHandoffWireError::UnexpectedFrame)?;
58 let offer = HandoffOffer::decode(frame.payload.as_slice())
59 .map_err(BackendHandoffWireError::DecodePayload)?;
60 if frame.request_id != offer.correlation_id {
61 return Err(BackendHandoffWireError::UnexpectedFrame(
62 "frame request_id does not match HandoffOffer correlation_id",
63 ));
64 }
65 Ok(offer)
66}
67
68pub fn respond_to_handoff_offer<S: Write>(
77 stream: &mut S,
78 pending_tokens: &mut HandoffTokenStore,
79 expected_token: HandoffToken,
80 offer: HandoffOffer,
81 now: Instant,
82) -> Result<HandoffAcceptance<HandoffOffer>, BackendHandoffWireError> {
83 let presented_token = offer.token.clone();
84 let correlation_id = offer.correlation_id;
85 let payload = HandedOffPayload::new(expected_token, presented_token.clone(), offer);
86 let acceptance = accept_handed_off(pending_tokens, payload, now);
87
88 let ack = match &acceptance {
89 HandoffAcceptance::Accepted(_) => HandoffAck {
90 token: presented_token,
91 accepted: true,
92 error_detail: String::new(),
93 correlation_id,
94 },
95 HandoffAcceptance::Rejected(rejected) => HandoffAck {
96 token: presented_token,
97 accepted: false,
98 error_detail: rejected.reason.to_string(),
99 correlation_id,
100 },
101 };
102 write_handoff_ack(stream, &ack)?;
103 Ok(acceptance)
104}
105
106pub fn write_handoff_ack<S: Write>(
108 stream: &mut S,
109 ack: &HandoffAck,
110) -> Result<(), BackendHandoffWireError> {
111 let frame = handoff_ack_frame(ack);
112 let mut bytes = Vec::with_capacity(64);
113 frame
114 .encode(&mut bytes)
115 .expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
116 write_frame(stream, &bytes)?;
117 Ok(())
118}
119
120pub fn serve_handoff_offer<S: Read + Write>(
126 stream: &mut S,
127 pending_tokens: &mut HandoffTokenStore,
128 expected_token: HandoffToken,
129 now: Instant,
130) -> Result<HandoffAcceptance<HandoffOffer>, BackendHandoffWireError> {
131 let offer = read_handoff_offer(stream)?;
132 respond_to_handoff_offer(stream, pending_tokens, expected_token, offer, now)
133}