use std::io::{Read, Write};
use std::time::Instant;
use prost::Message;
use crate::broker::backend_lib::accept_handed_off::{
accept_handed_off, HandedOffPayload, HandoffAcceptance,
};
use crate::broker::protocol::{
read_frame, write_frame, Frame, FrameKind, FramingError, HandoffAck, HandoffOffer,
};
use crate::broker::server::handoff::wire::{handoff_ack_frame, validate_handoff_frame};
use crate::broker::server::{HandoffToken, HandoffTokenStore};
#[derive(Debug, thiserror::Error)]
pub enum BackendHandoffWireError {
#[error(transparent)]
Framing(#[from] FramingError),
#[error("failed to decode HandoffOffer Frame: {0}")]
DecodeFrame(prost::DecodeError),
#[error("failed to decode HandoffOffer payload: {0}")]
DecodePayload(prost::DecodeError),
#[error("unexpected HandoffOffer frame: {0}")]
UnexpectedFrame(&'static str),
}
pub fn read_handoff_offer<S: Read>(
stream: &mut S,
) -> Result<HandoffOffer, BackendHandoffWireError> {
let bytes = read_frame(stream)?;
let frame = Frame::decode(bytes.as_slice()).map_err(BackendHandoffWireError::DecodeFrame)?;
validate_handoff_frame(&frame, FrameKind::Request)
.map_err(BackendHandoffWireError::UnexpectedFrame)?;
let offer = HandoffOffer::decode(frame.payload.as_slice())
.map_err(BackendHandoffWireError::DecodePayload)?;
if frame.request_id != offer.correlation_id {
return Err(BackendHandoffWireError::UnexpectedFrame(
"frame request_id does not match HandoffOffer correlation_id",
));
}
Ok(offer)
}
pub fn respond_to_handoff_offer<S: Write>(
stream: &mut S,
pending_tokens: &mut HandoffTokenStore,
expected_token: HandoffToken,
offer: HandoffOffer,
now: Instant,
) -> Result<HandoffAcceptance<HandoffOffer>, BackendHandoffWireError> {
let presented_token = offer.token.clone();
let correlation_id = offer.correlation_id;
let payload = HandedOffPayload::new(expected_token, presented_token.clone(), offer);
let acceptance = accept_handed_off(pending_tokens, payload, now);
let ack = match &acceptance {
HandoffAcceptance::Accepted(_) => HandoffAck {
token: presented_token,
accepted: true,
error_detail: String::new(),
correlation_id,
},
HandoffAcceptance::Rejected(rejected) => HandoffAck {
token: presented_token,
accepted: false,
error_detail: rejected.reason.to_string(),
correlation_id,
},
};
write_handoff_ack(stream, &ack)?;
Ok(acceptance)
}
pub fn write_handoff_ack<S: Write>(
stream: &mut S,
ack: &HandoffAck,
) -> Result<(), BackendHandoffWireError> {
let frame = handoff_ack_frame(ack);
let mut bytes = Vec::with_capacity(64);
frame
.encode(&mut bytes)
.expect("prost encoding Frame into Vec cannot fail because Vec writes are infallible");
write_frame(stream, &bytes)?;
Ok(())
}
pub fn serve_handoff_offer<S: Read + Write>(
stream: &mut S,
pending_tokens: &mut HandoffTokenStore,
expected_token: HandoffToken,
now: Instant,
) -> Result<HandoffAcceptance<HandoffOffer>, BackendHandoffWireError> {
let offer = read_handoff_offer(stream)?;
respond_to_handoff_offer(stream, pending_tokens, expected_token, offer, now)
}