use peat_lite::protocol::constants::{HEADER_SIZE, MAGIC};
use peat_lite::protocol::document as pl_doc;
use peat_lite::protocol::error::MessageError;
use peat_lite::protocol::header::{decode_header, encode_header, Header};
use peat_lite::protocol::message_type::MessageType;
pub use crate::peat_mesh::PeatLiteDocumentFrame;
#[derive(Debug, Clone)]
pub enum PeatLiteFrameOutcome {
NotPeatLiteFrame,
Handled,
Decoded(PeatLiteDocumentFrame),
}
pub fn try_handle_peat_lite_frame(buf: &[u8]) -> PeatLiteFrameOutcome {
if !is_peat_lite_frame(buf) {
return PeatLiteFrameOutcome::NotPeatLiteFrame;
}
let (header, view) = match decode_peat_lite_document(buf) {
Ok(pair) => pair,
Err(FrameError::NotPeatLiteFrame) => {
log::warn!(
"ble: peat-lite magic-prefix matched but decode reported NotPeatLiteFrame; \
codec drift?"
);
return PeatLiteFrameOutcome::Handled;
}
Err(FrameError::UnsupportedMessageType) => {
log::debug!("ble: dropping peat-lite frame with non-Document MessageType");
return PeatLiteFrameOutcome::Handled;
}
Err(FrameError::Envelope(e)) => {
log::warn!(
"ble: dropping malformed peat-lite Document frame (envelope error: {:?})",
e
);
return PeatLiteFrameOutcome::Handled;
}
};
PeatLiteFrameOutcome::Decoded(PeatLiteDocumentFrame {
source_node_id: header.node_id,
seq_num: header.seq_num,
flags: view.flags,
collection: view.collection.to_string(),
doc_id: view.doc_id.to_string(),
timestamp_ms: view.timestamp_ms,
body: view.body.to_vec(),
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FrameError {
Envelope(MessageError),
NotPeatLiteFrame,
UnsupportedMessageType,
}
impl From<MessageError> for FrameError {
fn from(e: MessageError) -> Self {
Self::Envelope(e)
}
}
pub fn is_peat_lite_frame(buf: &[u8]) -> bool {
buf.len() >= MAGIC.len() && buf[..MAGIC.len()] == MAGIC
}
pub fn encode_peat_lite_document(
node_id: u32,
seq_num: u32,
payload: &[u8],
) -> Result<Vec<u8>, FrameError> {
let header = Header {
msg_type: MessageType::Document,
flags: 0,
node_id,
seq_num,
};
let total = HEADER_SIZE + payload.len();
let mut buf = vec![0u8; total];
encode_header(&header, &mut buf[..HEADER_SIZE])?;
buf[HEADER_SIZE..].copy_from_slice(payload);
Ok(buf)
}
pub fn decode_peat_lite_document(
buf: &[u8],
) -> Result<(Header, pl_doc::DocumentRef<'_>), FrameError> {
if !is_peat_lite_frame(buf) {
return Err(FrameError::NotPeatLiteFrame);
}
let (header, payload) = decode_header(buf)?;
if !matches!(header.msg_type, MessageType::Document) {
return Err(FrameError::UnsupportedMessageType);
}
let view = pl_doc::decode(payload)?;
Ok((header, view))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn roundtrip_document_frame() {
let mut env = vec![0u8; 256];
let env_n = pl_doc::encode(
0, "markers",
"uuid-abc",
1_700_000_000_000,
b"opaque-body",
&mut env,
)
.expect("envelope encode");
env.truncate(env_n);
let frame = encode_peat_lite_document(0xCAFE_BABE, 42, &env).expect("frame encode");
assert!(
is_peat_lite_frame(&frame),
"wrapped frame must start with MAGIC"
);
let (header, view) = decode_peat_lite_document(&frame).expect("frame decode");
assert_eq!(header.msg_type, MessageType::Document);
assert_eq!(header.node_id, 0xCAFE_BABE);
assert_eq!(header.seq_num, 42);
assert_eq!(view.collection, "markers");
assert_eq!(view.doc_id, "uuid-abc");
assert_eq!(view.timestamp_ms, 1_700_000_000_000);
assert_eq!(view.body, b"opaque-body");
}
#[test]
fn non_magic_buffer_is_rejected() {
let translator_frame = [0xB6_u8, 0x01, 0x00, 0x00];
assert!(!is_peat_lite_frame(&translator_frame));
assert_eq!(
decode_peat_lite_document(&translator_frame),
Err(FrameError::NotPeatLiteFrame),
);
assert!(!is_peat_lite_frame(&[]));
assert_eq!(
decode_peat_lite_document(&[]),
Err(FrameError::NotPeatLiteFrame),
);
}
#[test]
fn non_document_message_type_is_rejected() {
let mut buf = [0u8; 32];
let header = Header {
msg_type: MessageType::Heartbeat,
flags: 0,
node_id: 1,
seq_num: 1,
};
encode_header(&header, &mut buf[..HEADER_SIZE]).expect("header");
assert_eq!(
decode_peat_lite_document(&buf[..HEADER_SIZE]),
Err(FrameError::UnsupportedMessageType),
);
}
#[test]
fn corrupt_header_surfaces_envelope_error() {
let mut buf = [0u8; HEADER_SIZE];
buf[0..4].copy_from_slice(&MAGIC);
buf[4] = 99; buf[5] = MessageType::Document as u8;
match decode_peat_lite_document(&buf) {
Err(FrameError::Envelope(MessageError::UnsupportedVersion)) => {}
other => panic!("expected Envelope(UnsupportedVersion), got {:?}", other),
}
}
#[test]
fn encode_size_is_exact() {
let mut env = vec![0u8; 64];
let env_n =
pl_doc::encode(0, "tracks", "id", 0, b"body", &mut env).expect("envelope encode");
env.truncate(env_n);
let frame = encode_peat_lite_document(0, 0, &env).expect("frame encode");
assert_eq!(
frame.len(),
HEADER_SIZE + env.len(),
"frame should be exactly header + payload bytes",
);
}
#[test]
fn dispatcher_decodes_well_formed_frame() {
let mut env = vec![0u8; 256];
let env_n = pl_doc::encode(
pl_doc::DOC_FLAG_TOMBSTONE,
"platforms",
"ANDROID-abc",
1_700_000_000_000,
&[],
&mut env,
)
.expect("envelope");
env.truncate(env_n);
let frame = encode_peat_lite_document(0xCAFE_BABE, 7, &env).expect("frame");
match try_handle_peat_lite_frame(&frame) {
PeatLiteFrameOutcome::Decoded(doc) => {
assert_eq!(doc.source_node_id, 0xCAFE_BABE);
assert_eq!(doc.seq_num, 7);
assert_eq!(doc.collection, "platforms");
assert_eq!(doc.doc_id, "ANDROID-abc");
assert_eq!(doc.timestamp_ms, 1_700_000_000_000);
assert!(doc.is_tombstone());
assert!(doc.body.is_empty());
}
other => panic!("expected Decoded, got {:?}", other),
}
}
#[test]
fn dispatcher_falls_through_for_translator_frames() {
let translator_frame = vec![0xB6, 0x02, 0xDE, 0xAD, 0xBE, 0xEF];
match try_handle_peat_lite_frame(&translator_frame) {
PeatLiteFrameOutcome::NotPeatLiteFrame => {} other => panic!(
"translator-frame buffer must NOT be claimed by peat-lite dispatcher, got {:?}",
other
),
}
}
#[test]
fn dispatcher_falls_through_for_empty_input() {
match try_handle_peat_lite_frame(&[]) {
PeatLiteFrameOutcome::NotPeatLiteFrame => {} other => panic!("expected NotPeatLiteFrame, got {:?}", other),
}
}
#[test]
fn dispatcher_handles_unsupported_message_type_without_fallthrough() {
let mut buf = [0u8; HEADER_SIZE];
let header = Header {
msg_type: MessageType::Heartbeat,
flags: 0,
node_id: 1,
seq_num: 1,
};
encode_header(&header, &mut buf[..HEADER_SIZE]).expect("header");
match try_handle_peat_lite_frame(&buf) {
PeatLiteFrameOutcome::Handled => {}
other => panic!("expected Handled (silent drop), got {:?}", other),
}
}
#[test]
fn dispatcher_handles_corrupt_header_without_fallthrough() {
let mut buf = [0u8; HEADER_SIZE];
buf[0..4].copy_from_slice(&MAGIC);
buf[4] = 99; buf[5] = MessageType::Document as u8;
match try_handle_peat_lite_frame(&buf) {
PeatLiteFrameOutcome::Handled => {}
other => panic!("expected Handled (silent drop), got {:?}", other),
}
}
#[test]
fn end_to_end_outbound_then_dispatcher_roundtrip() {
let mut env = vec![0u8; 512];
let body = b"{\"lat\":33.71576,\"lon\":-84.41152}";
let env_n = pl_doc::encode(
0,
"markers",
"marker-uuid-001",
1_700_000_000_000,
body,
&mut env,
)
.expect("envelope encode");
env.truncate(env_n);
let wire_bytes = encode_peat_lite_document(0xBEEFCAFE, 99, &env).expect("frame encode");
match try_handle_peat_lite_frame(&wire_bytes) {
PeatLiteFrameOutcome::Decoded(doc) => {
assert_eq!(doc.source_node_id, 0xBEEFCAFE);
assert_eq!(doc.seq_num, 99);
assert_eq!(doc.collection, "markers");
assert_eq!(doc.doc_id, "marker-uuid-001");
assert_eq!(doc.timestamp_ms, 1_700_000_000_000);
assert!(!doc.is_tombstone());
assert_eq!(doc.body.as_slice(), body);
}
other => panic!("end-to-end roundtrip failed: {:?}", other),
}
}
}