use bytes::BytesMut;
use prost::Message;
use running_process::broker::protocol::frame_ext::{encode_framed, try_decode_framed};
use running_process::register_payload_protocol;
use super::{ProtocolError, BINCODE_PROTOCOL_VERSION, PROST_PROTOCOL_VERSION};
register_payload_protocol! {
pub const ZCCACHE_FRAME_PAYLOAD_PROTOCOL: u32 = 0x7A63;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FrameV1Decoded<M> {
pub message: M,
pub request_id: u64,
}
pub fn encode_frame_v1_request<M: Message>(
msg: &M,
request_id: u64,
) -> Result<BytesMut, ProtocolError> {
use running_process::broker::protocol::Frame;
let payload = encode_payload(msg)?;
let frame = Frame::request(ZCCACHE_FRAME_PAYLOAD_PROTOCOL, payload).with_request_id(request_id);
encode_framed_bytes(&frame)
}
pub fn encode_frame_v1_response<M: Message>(
msg: &M,
request_id: u64,
) -> Result<BytesMut, ProtocolError> {
use running_process::broker::protocol::Frame;
let template =
Frame::request(ZCCACHE_FRAME_PAYLOAD_PROTOCOL, Vec::new()).with_request_id(request_id);
let payload = encode_payload(msg)?;
let frame = Frame::response_to(&template, payload);
encode_framed_bytes(&frame)
}
fn encode_payload<M: Message>(msg: &M) -> Result<Vec<u8>, ProtocolError> {
let mut payload = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut payload)
.map_err(|e| ProtocolError::Serialization(e.to_string()))?;
Ok(payload)
}
fn encode_framed_bytes(
frame: &running_process::broker::protocol::Frame,
) -> Result<BytesMut, ProtocolError> {
use running_process::broker::protocol::FramingError;
let bytes = encode_framed(frame).map_err(|e| match e {
FramingError::FrameTooLarge { body_length, .. } => {
ProtocolError::MessageTooLarge(body_length)
}
other => ProtocolError::Serialization(other.to_string()),
})?;
Ok(BytesMut::from(bytes.as_slice()))
}
#[must_use]
pub fn buffer_starts_running_process_frame(buf: &[u8]) -> Option<bool> {
if buf.is_empty() {
return None;
}
if buf[0] != running_process::broker::protocol::ENVELOPE_VERSION {
return Some(false);
}
if buf.len() < 8 {
return None;
}
let zccache_len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
let zccache_version = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
Some(
!(zccache_len >= 4
&& matches!(
zccache_version,
BINCODE_PROTOCOL_VERSION | PROST_PROTOCOL_VERSION
)),
)
}
pub fn decode_frame_v1_message<M: Message + Default>(
buf: &mut BytesMut,
) -> Result<Option<FrameV1Decoded<M>>, ProtocolError> {
use bytes::Buf;
use running_process::broker::protocol::{FrameKind, FramingError, PayloadEncoding};
let decoded = match try_decode_framed(buf.as_ref()) {
Ok(Some(d)) => d,
Ok(None) => return Ok(None),
Err(FramingError::FrameTooLarge { body_length, .. }) => {
return Err(ProtocolError::MessageTooLarge(body_length))
}
Err(other) => {
return Err(ProtocolError::Deserialization(format!(
"running-process Frame: {other}"
)))
}
};
let frame = decoded.frame;
if frame.envelope_version != 1 {
return Err(ProtocolError::Deserialization(format!(
"unsupported running-process Frame envelope_version {}",
frame.envelope_version
)));
}
if frame.payload_protocol != ZCCACHE_FRAME_PAYLOAD_PROTOCOL {
return Err(ProtocolError::Deserialization(format!(
"running-process Frame payload_protocol {:#06X} is not the zccache payload protocol \
{ZCCACHE_FRAME_PAYLOAD_PROTOCOL:#06X}",
frame.payload_protocol
)));
}
if !matches!(
FrameKind::try_from(frame.kind),
Ok(FrameKind::Request | FrameKind::Response)
) {
return Err(ProtocolError::Deserialization(format!(
"unsupported running-process Frame kind {} on the zccache lane",
frame.kind
)));
}
if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
return Err(ProtocolError::Deserialization(format!(
"unsupported running-process Frame payload_encoding {} on the zccache lane",
frame.payload_encoding
)));
}
let message = M::decode(frame.payload.as_slice())
.map_err(|e| ProtocolError::Deserialization(e.to_string()))?;
buf.advance(decoded.consumed);
Ok(Some(FrameV1Decoded {
message,
request_id: frame.request_id,
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::wire_prost::zccache_v1;
use bytes::BufMut;
fn golden_request() -> zccache_v1::Request {
zccache_v1::Request {
body: Some(zccache_v1::request::Body::Lookup(zccache_v1::Lookup {
cache_key: "golden-cache-key".to_string(),
})),
request_id: "golden-request-id".to_string(),
}
}
#[test]
fn frame_v1_request_golden_bytes_are_frozen() {
let encoded = encode_frame_v1_request(&golden_request(), 0x0102_0304_0506_0708)
.expect("encode golden frame");
let expected: &[u8] = &[
0x01, 0x3A, 0x00, 0x00, 0x00, 0x08, 0x01, 0x18, 0xE3, 0xF4, 0x01, 0x22, 0x28, 0x22, 0x12, 0x0A, 0x10, b'g', b'o', b'l', b'd', b'e', b'n', b'-', b'c', b'a', b'c', b'h', b'e', b'-', b'k',
b'e', b'y', 0xA2, 0x06, 0x11, b'g', b'o', b'l', b'd', b'e', b'n', b'-', b'r', b'e', b'q', b'u', b'e', b's', b't',
b'-', b'i', b'd', 0x28, 0x88, 0x8E, 0x98, 0xA8, 0xC0, 0xE0, 0x80, 0x81,
0x01, ];
assert_eq!(
encoded.as_ref(),
expected,
"0x7A63 Frame lane golden bytes changed — wire is frozen; \
see zackees/running-process#435"
);
}
#[test]
fn frame_v1_golden_round_trips() {
let request_id = 0x0102_0304_0506_0708;
let mut buf =
encode_frame_v1_request(&golden_request(), request_id).expect("encode golden frame");
assert_eq!(buffer_starts_running_process_frame(&buf), Some(true));
let decoded: FrameV1Decoded<zccache_v1::Request> = decode_frame_v1_message(&mut buf)
.expect("decode")
.expect("complete frame");
assert_eq!(decoded.request_id, request_id);
assert_eq!(decoded.message, golden_request());
}
#[test]
fn frame_v1_rejects_foreign_payload_protocol() {
use prost::Message as _;
use running_process::broker::protocol::{Frame, FrameKind, PayloadEncoding};
let frame = Frame {
envelope_version: 1,
kind: FrameKind::Request as i32,
payload_protocol: 0x7001, payload: golden_request().encode_to_vec(),
request_id: 7,
payload_encoding: PayloadEncoding::None as i32,
deadline_unix_ms: 0,
traceparent: String::new(),
tracestate: String::new(),
};
let body = frame.encode_to_vec();
let mut buf = BytesMut::new();
buf.put_u8(running_process::broker::protocol::ENVELOPE_VERSION);
buf.put_u32_le(u32::try_from(body.len()).unwrap());
buf.extend_from_slice(&body);
let err = decode_frame_v1_message::<zccache_v1::Request>(&mut buf)
.expect_err("foreign payload protocol must be rejected");
assert!(
matches!(err, ProtocolError::Deserialization(msg) if msg.contains("payload_protocol")),
"expected a payload_protocol rejection"
);
}
}