use serde::Serialize;
use crate::wire::{MAGIC, POSTAMBLE_SIZE, PREAMBLE_SIZE, Postamble, Preamble};
pub const REASON_SHORT_FETCH_FWD: &str = "short-fetch-fwd";
pub const REASON_BAD_MAGIC_FWD: &str = "bad-magic-fwd";
pub const REASON_PREAMBLE_PARSE_ERROR_FWD: &str = "preamble-parse-error-fwd";
pub const REASON_LENGTH_OUT_OF_RANGE_FWD: &str = "length-out-of-range-fwd";
pub const REASON_SHORT_FETCH_BWD: &str = "short-fetch-bwd";
pub const REASON_BAD_END_MAGIC_BWD: &str = "bad-end-magic-bwd";
pub const REASON_POSTAMBLE_PARSE_ERROR: &str = "postamble-parse-error";
pub const REASON_LENGTH_BELOW_MINIMUM_BWD: &str = "length-below-minimum-bwd";
pub const REASON_BACKWARD_ARITH_UNDERFLOW: &str = "backward-arith-underflow";
pub const REASON_BACKWARD_OVERLAPS_FORWARD: &str = "backward-overlaps-forward";
pub const REASON_BAD_MAGIC_BWD: &str = "bad-magic-bwd";
pub const REASON_PREAMBLE_PARSE_ERROR_BWD: &str = "preamble-parse-error-bwd";
pub const REASON_STREAMING_PREAMBLE_NON_TAIL: &str = "streaming-preamble-non-tail";
pub const REASON_PREAMBLE_POSTAMBLE_LENGTH_MISMATCH: &str = "preamble-postamble-length-mismatch";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(tag = "kind", rename_all_fields = "camelCase")]
pub enum BackwardOutcome {
Format {
reason: &'static str,
},
Streaming,
NeedPreambleValidation {
msg_start: u64,
length: u64,
first_footer_offset: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(tag = "kind", rename_all_fields = "camelCase")]
pub enum BackwardCommit {
Format {
reason: &'static str,
},
Layout {
offset: u64,
length: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(tag = "kind", rename_all_fields = "camelCase")]
pub enum ForwardOutcome {
Hit {
offset: u64,
length: u64,
msg_end: u64,
},
ExceedsBound {
offset: u64,
length: u64,
msg_end: u64,
},
Streaming { remaining: u64 },
Terminate {
reason: &'static str,
},
}
pub fn parse_backward_postamble(
pa_bytes: &[u8],
snap_next: u64,
snap_prev: u64,
) -> BackwardOutcome {
let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
if pa_bytes.len() < POSTAMBLE_SIZE {
return BackwardOutcome::Format {
reason: REASON_SHORT_FETCH_BWD,
};
}
let end_magic_start = POSTAMBLE_SIZE - crate::wire::END_MAGIC.len();
if &pa_bytes[end_magic_start..POSTAMBLE_SIZE] != crate::wire::END_MAGIC {
return BackwardOutcome::Format {
reason: REASON_BAD_END_MAGIC_BWD,
};
}
let postamble = match Postamble::read_from(&pa_bytes[..POSTAMBLE_SIZE]) {
Ok(p) => p,
Err(_) => {
return BackwardOutcome::Format {
reason: REASON_POSTAMBLE_PARSE_ERROR,
};
}
};
let total = postamble.total_length;
if total == 0 {
return BackwardOutcome::Streaming;
}
if total < min_message_size {
return BackwardOutcome::Format {
reason: REASON_LENGTH_BELOW_MINIMUM_BWD,
};
}
let msg_start = match snap_prev.checked_sub(total) {
Some(s) => s,
None => {
return BackwardOutcome::Format {
reason: REASON_BACKWARD_ARITH_UNDERFLOW,
};
}
};
if msg_start < snap_next {
return BackwardOutcome::Format {
reason: REASON_BACKWARD_OVERLAPS_FORWARD,
};
}
BackwardOutcome::NeedPreambleValidation {
msg_start,
length: total,
first_footer_offset: postamble.first_footer_offset,
}
}
pub fn footer_region_present(first_footer_offset: u64, length: u64) -> bool {
let pre = PREAMBLE_SIZE as u64;
let pa = POSTAMBLE_SIZE as u64;
let Some(footer_max) = length.checked_sub(pa) else {
return false;
};
first_footer_offset >= pre && first_footer_offset < footer_max
}
pub fn validate_backward_preamble(
preamble_bytes: &[u8],
msg_start: u64,
length: u64,
) -> BackwardCommit {
if preamble_bytes.len() < PREAMBLE_SIZE {
return BackwardCommit::Format {
reason: REASON_SHORT_FETCH_BWD,
};
}
if &preamble_bytes[..MAGIC.len()] != MAGIC {
return BackwardCommit::Format {
reason: REASON_BAD_MAGIC_BWD,
};
}
let preamble = match Preamble::read_from(preamble_bytes) {
Ok(p) => p,
Err(_) => {
return BackwardCommit::Format {
reason: REASON_PREAMBLE_PARSE_ERROR_BWD,
};
}
};
if preamble.total_length == 0 {
return BackwardCommit::Format {
reason: REASON_STREAMING_PREAMBLE_NON_TAIL,
};
}
if preamble.total_length != length {
return BackwardCommit::Format {
reason: REASON_PREAMBLE_POSTAMBLE_LENGTH_MISMATCH,
};
}
BackwardCommit::Layout {
offset: msg_start,
length,
}
}
pub fn parse_forward_preamble(
preamble_bytes: &[u8],
pos: u64,
file_size: u64,
bound: u64,
) -> ForwardOutcome {
let min_message_size = (PREAMBLE_SIZE + POSTAMBLE_SIZE) as u64;
if preamble_bytes.len() < PREAMBLE_SIZE {
return ForwardOutcome::Terminate {
reason: REASON_SHORT_FETCH_FWD,
};
}
if &preamble_bytes[..MAGIC.len()] != MAGIC {
return ForwardOutcome::Terminate {
reason: REASON_BAD_MAGIC_FWD,
};
}
let preamble = match Preamble::read_from(preamble_bytes) {
Ok(p) => p,
Err(_) => {
return ForwardOutcome::Terminate {
reason: REASON_PREAMBLE_PARSE_ERROR_FWD,
};
}
};
let msg_len = preamble.total_length;
if msg_len == 0 {
let remaining = file_size.saturating_sub(pos);
return ForwardOutcome::Streaming { remaining };
}
let end = match pos.checked_add(msg_len) {
Some(e) => e,
None => {
return ForwardOutcome::Terminate {
reason: REASON_LENGTH_OUT_OF_RANGE_FWD,
};
}
};
if msg_len < min_message_size || end > file_size {
return ForwardOutcome::Terminate {
reason: REASON_LENGTH_OUT_OF_RANGE_FWD,
};
}
if end > bound {
return ForwardOutcome::ExceedsBound {
offset: pos,
length: msg_len,
msg_end: end,
};
}
ForwardOutcome::Hit {
offset: pos,
length: msg_len,
msg_end: end,
}
}
pub fn same_message_check(
fwd_offset: u64,
fwd_length: u64,
layout_offset: u64,
layout_length: u64,
) -> bool {
fwd_offset == layout_offset && fwd_length == layout_length
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wire::{MessageFlags, WIRE_VERSION};
fn make_postamble(total_length: u64) -> Vec<u8> {
make_postamble_with_footer(total_length, 0)
}
fn make_postamble_with_footer(total_length: u64, first_footer_offset: u64) -> Vec<u8> {
let pa = Postamble {
first_footer_offset,
total_length,
};
let mut buf = Vec::with_capacity(POSTAMBLE_SIZE);
pa.write_to(&mut buf);
buf
}
fn make_preamble(total_length: u64) -> Vec<u8> {
let pre = Preamble {
version: WIRE_VERSION,
flags: MessageFlags::new(0),
reserved: 0,
total_length,
};
let mut buf = Vec::with_capacity(PREAMBLE_SIZE);
pre.write_to(&mut buf);
buf
}
#[test]
fn backward_short_fetch() {
let outcome = parse_backward_postamble(&[0u8; 4], 0, 100);
assert!(matches!(
outcome,
BackwardOutcome::Format {
reason: REASON_SHORT_FETCH_BWD
}
));
}
#[test]
fn backward_bad_end_magic() {
let buf = vec![0u8; POSTAMBLE_SIZE];
let outcome = parse_backward_postamble(&buf, 0, POSTAMBLE_SIZE as u64);
assert!(matches!(
outcome,
BackwardOutcome::Format {
reason: REASON_BAD_END_MAGIC_BWD
}
));
}
#[test]
fn backward_streaming_postamble() {
let buf = make_postamble(0);
let outcome = parse_backward_postamble(&buf, 0, POSTAMBLE_SIZE as u64);
assert_eq!(outcome, BackwardOutcome::Streaming);
}
#[test]
fn backward_length_below_minimum() {
let buf = make_postamble(1);
let outcome = parse_backward_postamble(&buf, 0, 100);
assert!(matches!(
outcome,
BackwardOutcome::Format {
reason: REASON_LENGTH_BELOW_MINIMUM_BWD
}
));
}
#[test]
fn backward_arith_underflow_when_total_exceeds_snap_prev() {
let buf = make_postamble(200);
let outcome = parse_backward_postamble(&buf, 0, 100);
assert!(matches!(
outcome,
BackwardOutcome::Format {
reason: REASON_BACKWARD_ARITH_UNDERFLOW
}
));
}
#[test]
fn backward_overlap_when_msg_start_below_snap_next() {
let buf = make_postamble(80);
let outcome = parse_backward_postamble(&buf, 50, 100);
assert!(matches!(
outcome,
BackwardOutcome::Format {
reason: REASON_BACKWARD_OVERLAPS_FORWARD
}
));
}
#[test]
fn backward_need_preamble_validation_when_disjoint_from_forward() {
let buf = make_postamble(80);
let outcome = parse_backward_postamble(&buf, 10, 100);
assert_eq!(
outcome,
BackwardOutcome::NeedPreambleValidation {
msg_start: 20,
length: 80,
first_footer_offset: 0,
}
);
}
#[test]
fn backward_need_preamble_validation_propagates_first_footer_offset() {
let buf = make_postamble_with_footer(200, 60);
let outcome = parse_backward_postamble(&buf, 0, 200);
assert_eq!(
outcome,
BackwardOutcome::NeedPreambleValidation {
msg_start: 0,
length: 200,
first_footer_offset: 60,
}
);
}
#[test]
fn footer_region_present_empty_when_offset_at_postamble() {
assert!(!footer_region_present(76, 100));
}
#[test]
fn footer_region_present_false_when_offset_before_preamble_end() {
assert!(!footer_region_present(0, 100));
assert!(!footer_region_present(23, 100));
}
#[test]
fn footer_region_present_true_when_offset_in_valid_range() {
assert!(footer_region_present(24, 100));
assert!(footer_region_present(48, 100));
assert!(footer_region_present(75, 100));
}
#[test]
fn footer_region_present_no_panic_on_underflow_inputs() {
assert!(!footer_region_present(0, 0));
assert!(!footer_region_present(0, 23));
assert!(!footer_region_present(u64::MAX, 100));
assert!(!footer_region_present(0, u64::MAX));
}
#[test]
fn backward_validate_short_fetch() {
let outcome = validate_backward_preamble(&[0u8; 4], 0, 100);
assert!(matches!(
outcome,
BackwardCommit::Format {
reason: REASON_SHORT_FETCH_BWD
}
));
}
#[test]
fn backward_validate_bad_magic() {
let mut buf = vec![0u8; PREAMBLE_SIZE];
buf[..8].copy_from_slice(b"NOTMAGIC");
let outcome = validate_backward_preamble(&buf, 0, 100);
assert!(matches!(
outcome,
BackwardCommit::Format {
reason: REASON_BAD_MAGIC_BWD
}
));
}
#[test]
fn backward_validate_streaming_at_non_tail() {
let buf = make_preamble(0);
let outcome = validate_backward_preamble(&buf, 0, 100);
assert!(matches!(
outcome,
BackwardCommit::Format {
reason: REASON_STREAMING_PREAMBLE_NON_TAIL
}
));
}
#[test]
fn backward_validate_length_mismatch() {
let buf = make_preamble(99);
let outcome = validate_backward_preamble(&buf, 0, 100);
assert!(matches!(
outcome,
BackwardCommit::Format {
reason: REASON_PREAMBLE_POSTAMBLE_LENGTH_MISMATCH
}
));
}
#[test]
fn backward_validate_layout() {
let buf = make_preamble(100);
let outcome = validate_backward_preamble(&buf, 50, 100);
assert_eq!(
outcome,
BackwardCommit::Layout {
offset: 50,
length: 100,
}
);
}
#[test]
fn forward_short_fetch() {
let outcome = parse_forward_preamble(&[0u8; 4], 0, 1024, 1024);
assert!(matches!(
outcome,
ForwardOutcome::Terminate {
reason: REASON_SHORT_FETCH_FWD
}
));
}
#[test]
fn forward_bad_magic() {
let mut buf = vec![0u8; PREAMBLE_SIZE];
buf[..8].copy_from_slice(b"NOTMAGIC");
let outcome = parse_forward_preamble(&buf, 0, 1024, 1024);
assert!(matches!(
outcome,
ForwardOutcome::Terminate {
reason: REASON_BAD_MAGIC_FWD
}
));
}
#[test]
fn forward_streaming_preamble() {
let buf = make_preamble(0);
let outcome = parse_forward_preamble(&buf, 0, 1024, 1024);
assert_eq!(outcome, ForwardOutcome::Streaming { remaining: 1024 });
}
#[test]
fn forward_length_below_minimum_message_size() {
let buf = make_preamble(10);
let outcome = parse_forward_preamble(&buf, 0, 1024, 1024);
assert!(matches!(
outcome,
ForwardOutcome::Terminate {
reason: REASON_LENGTH_OUT_OF_RANGE_FWD
}
));
}
#[test]
fn forward_length_exceeds_file() {
let buf = make_preamble(2000);
let outcome = parse_forward_preamble(&buf, 0, 1024, 1024);
assert!(matches!(
outcome,
ForwardOutcome::Terminate {
reason: REASON_LENGTH_OUT_OF_RANGE_FWD
}
));
}
#[test]
fn forward_hit() {
let buf = make_preamble(100);
let outcome = parse_forward_preamble(&buf, 0, 1024, 1024);
assert_eq!(
outcome,
ForwardOutcome::Hit {
offset: 0,
length: 100,
msg_end: 100,
}
);
}
#[test]
fn forward_exceeds_bound_when_msg_end_above_bound_below_file_size() {
let buf = make_preamble(100);
let outcome = parse_forward_preamble(&buf, 0, 1024, 50);
assert_eq!(
outcome,
ForwardOutcome::ExceedsBound {
offset: 0,
length: 100,
msg_end: 100,
}
);
}
#[test]
fn same_message_yes() {
assert!(same_message_check(0, 100, 0, 100));
}
#[test]
fn same_message_offset_diff() {
assert!(!same_message_check(0, 100, 50, 100));
}
#[test]
fn same_message_length_diff() {
assert!(!same_message_check(0, 100, 0, 200));
}
}