use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use talea_core::events::LedgerEvent;
use talea_core::types::Seq;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WireEvent {
pub seq: Seq,
pub at: DateTime<Utc>,
pub event: LedgerEvent,
}
pub const HEADER_LEN: usize = 8;
#[derive(Debug, thiserror::Error)]
pub enum FrameError {
#[error("torn frame at end of data")]
Torn,
#[error("corrupt frame: {reason}")]
Corrupt { reason: String },
}
#[derive(Debug, thiserror::Error)]
pub enum EncodeError {
#[error("event payload serialization: {0}")]
Json(#[from] serde_json::Error),
#[error("event payload of {len} bytes exceeds the u32 frame length limit")]
TooLarge { len: usize },
}
pub fn encode_frame(ev: &WireEvent) -> Result<Vec<u8>, EncodeError> {
let payload = serde_json::to_vec(ev)?;
let len_u32 =
u32::try_from(payload.len()).map_err(|_| EncodeError::TooLarge { len: payload.len() })?;
let mut buf = Vec::with_capacity(HEADER_LEN + payload.len());
buf.extend_from_slice(&len_u32.to_le_bytes());
buf.extend_from_slice(&crc32fast::hash(&payload).to_le_bytes());
buf.extend_from_slice(&payload);
Ok(buf)
}
pub fn decode_frame(buf: &[u8]) -> Result<Option<(WireEvent, usize)>, FrameError> {
if buf.is_empty() {
return Ok(None);
}
let Some((len_bytes, rest)) = buf.split_first_chunk::<4>() else {
return Err(FrameError::Torn);
};
let Some((crc_bytes, _)) = rest.split_first_chunk::<4>() else {
return Err(FrameError::Torn);
};
let len = u32::from_le_bytes(*len_bytes) as usize;
let crc = u32::from_le_bytes(*crc_bytes);
let Some(payload) = buf.get(HEADER_LEN..HEADER_LEN + len) else {
return Err(FrameError::Torn);
};
if crc32fast::hash(payload) != crc {
return Err(FrameError::Corrupt {
reason: "crc mismatch".into(),
});
}
let ev = serde_json::from_slice(payload).map_err(|e| FrameError::Corrupt {
reason: format!("json: {e}"),
})?;
Ok(Some((ev, HEADER_LEN + len)))
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use talea_core::events::LedgerEvent;
use talea_core::types::*;
fn tx_event() -> WireEvent {
WireEvent {
seq: 7,
at: talea_core::store::ledger_now(),
event: LedgerEvent::TransactionPosted(Transaction {
id: TxId(uuid::Uuid::now_v7()),
book: Book("b".into()),
postings: vec![],
idempotency_key: IdempotencyKey("k".into()),
external_refs: vec![],
metadata: serde_json::json!({"note": "meta survives json"}),
occurred_at: Utc::now(),
}),
}
}
#[test]
fn frame_round_trips() {
let ev = tx_event();
let buf = encode_frame(&ev).unwrap();
let (back, consumed) = decode_frame(&buf).unwrap().unwrap();
assert_eq!(consumed, buf.len());
assert_eq!(back.seq, 7);
assert!(matches!(back.event, LedgerEvent::TransactionPosted(_)));
}
#[test]
fn short_buffer_is_torn_not_corrupt() {
let buf = encode_frame(&tx_event()).unwrap();
for cut in [3, 8, buf.len() - 1] {
match decode_frame(&buf[..cut]) {
Err(FrameError::Torn) => {}
other => panic!("cut at {cut}: expected Torn, got {other:?}"),
}
}
}
#[test]
fn bit_flip_with_full_length_is_corrupt() {
let mut buf = encode_frame(&tx_event()).unwrap();
let last = buf.len() - 1;
buf[last] ^= 0xff; assert!(matches!(
decode_frame(&buf),
Err(FrameError::Corrupt { .. })
));
}
#[test]
fn empty_buffer_is_clean_end() {
assert!(decode_frame(&[]).unwrap().is_none());
}
#[test]
fn chained_frames_decode_sequentially() {
let ev1 = tx_event();
let mut ev2 = tx_event();
ev2.seq = 8;
let mut buf = encode_frame(&ev1).unwrap();
let frame2 = encode_frame(&ev2).unwrap();
buf.extend_from_slice(&frame2);
let (back1, consumed1) = decode_frame(&buf).unwrap().unwrap();
let (back2, consumed2) = decode_frame(&buf[consumed1..]).unwrap().unwrap();
assert_eq!(back1.seq, 7);
assert_eq!(back2.seq, 8);
assert_eq!(consumed1 + consumed2, buf.len());
}
#[test]
fn adversarial_large_len_header_returns_torn_not_panic() {
let mut buf = Vec::new();
buf.extend_from_slice(&u32::MAX.to_le_bytes()); buf.extend_from_slice(&0u32.to_le_bytes()); buf.extend_from_slice(b"tiny");
match decode_frame(&buf) {
Err(FrameError::Torn) => {}
other => panic!("expected Torn for adversarial len header, got {other:?}"),
}
}
}