use crate::GapPayload;
use gbp::CodecError;
use gbp_core::{GbpFlags, MemberId, PayloadCodec, StreamType, timeouts};
use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
use std::collections::HashMap;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
#[cfg(target_arch = "wasm32")]
use web_time::Instant;
#[derive(Debug, thiserror::Error)]
pub enum GapError {
#[error("decode: {0}")]
Decode(#[from] CodecError),
#[error("epoch stale: kp={kp}, expected={expected}")]
EpochStale {
kp: u32,
expected: u32,
},
#[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
RtpReplay {
src: u32,
seq: u32,
hw: u32,
},
#[error("node: {0}")]
Node(#[from] NodeError),
}
#[derive(Debug)]
pub enum GapAccept {
New(GapPayload),
Late(GapPayload),
}
struct OldEpochWindow {
epoch: u64,
in_hw: HashMap<u32, u32>,
expires: Instant,
}
pub struct GapClient {
out_rtp_seq: HashMap<u32, u32>,
in_hw: HashMap<u32, u32>,
current_epoch: Option<u64>,
old_windows: Vec<OldEpochWindow>,
}
impl Default for GapClient {
fn default() -> Self {
Self {
out_rtp_seq: HashMap::new(),
in_hw: HashMap::new(),
current_epoch: None,
old_windows: Vec::new(),
}
}
}
impl GapClient {
pub fn new() -> Self {
Self::default()
}
pub fn send<S: Sealer>(
&mut self,
node: &mut GroupNode,
seal: &mut S,
target: MemberId,
media_source_id: u32,
rtp_timestamp: u64,
opus: Vec<u8>,
codec: PayloadCodec,
) -> Result<OutboundFrame, GapError> {
self.sync_epoch(node.current_epoch);
let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
*seq = seq.wrapping_add(1) & 0xFFFF;
let payload = GapPayload {
media_source_id,
rtp_sequence: *seq,
rtp_timestamp,
key_phase: node.current_epoch as u32,
opus_frame: serde_bytes::ByteBuf::from(opus),
};
let stream_id = node.member_stream_id(2);
Ok(node.send_payload(
seal,
target,
StreamType::Audio,
stream_id,
GbpFlags::ordered_only(),
&payload.to_bytes(codec),
codec,
)?)
}
pub fn accept(
&mut self,
plaintext: &[u8],
current_epoch: u64,
codec: PayloadCodec,
) -> Result<GapAccept, GapError> {
self.sync_epoch(current_epoch);
let p = GapPayload::from_bytes(plaintext, codec)?;
if p.key_phase == current_epoch as u32 {
let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
return Ok(GapAccept::Late(p));
}
self.in_hw.insert(p.media_source_id, p.rtp_sequence);
return Ok(GapAccept::New(p));
}
let now = Instant::now();
if let Some(old) = self
.old_windows
.iter_mut()
.find(|w| w.epoch == p.key_phase as u64 && w.expires > now)
{
let hw = old.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
return Ok(GapAccept::Late(p));
}
old.in_hw.insert(p.media_source_id, p.rtp_sequence);
return Ok(GapAccept::New(p));
}
Err(GapError::EpochStale {
kp: p.key_phase,
expected: current_epoch as u32,
})
}
pub fn sync_epoch(&mut self, epoch: u64) {
let now = Instant::now();
self.old_windows.retain(|w| w.expires > now);
if Some(epoch) != self.current_epoch {
if let Some(old_epoch) = self.current_epoch {
if !self.in_hw.is_empty() {
self.old_windows.push(OldEpochWindow {
epoch: old_epoch,
in_hw: std::mem::take(&mut self.in_hw),
expires: now + Duration::from_millis(timeouts::T_GAP_KEY_OVERLAP_MS),
});
}
}
self.out_rtp_seq.clear();
self.in_hw.clear();
self.current_epoch = Some(epoch);
}
}
pub fn reset(&mut self) {
self.out_rtp_seq.clear();
self.in_hw.clear();
self.old_windows.clear();
self.current_epoch = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
crate::GapPayload {
media_source_id: 1,
rtp_sequence: seq,
rtp_timestamp: 960,
key_phase,
opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
}
.to_bytes(PayloadCodec::Cbor)
}
#[test]
fn wraparound_after_ffff_is_accepted() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(0xFFFE, 1), 1, PayloadCodec::Cbor).unwrap();
let _ = client.accept(&make_payload(0xFFFF, 1), 1, PayloadCodec::Cbor).unwrap();
let result = client.accept(&make_payload(0x0000, 1), 1, PayloadCodec::Cbor).unwrap();
assert!(
matches!(result, GapAccept::New(_)),
"seq=0 after 0xFFFF must be New"
);
}
#[test]
fn strict_replay_within_window_is_late() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
let result = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
assert!(
matches!(result, GapAccept::Late(_)),
"exact dup must be Late"
);
}
#[test]
fn epoch_change_clears_window() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
let result = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
assert!(
matches!(result, GapAccept::New(_)),
"new epoch resets window"
);
}
#[test]
fn old_epoch_frame_accepted_within_overlap() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor).unwrap();
assert!(
matches!(result, GapAccept::New(_)),
"late epoch-1 frame accepted within T_overlap"
);
}
#[test]
fn old_epoch_replay_is_late_within_overlap() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
let result = client.accept(&make_payload(5, 1), 2, PayloadCodec::Cbor).unwrap();
assert!(
matches!(result, GapAccept::Late(_)),
"duplicate from old epoch is Late"
);
}
#[test]
fn expired_old_epoch_frame_is_stale() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
for w in &mut client.old_windows {
w.expires = Instant::now() - Duration::from_millis(1);
}
let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor);
assert!(
matches!(result, Err(GapError::EpochStale { .. })),
"expired epoch is Stale"
);
}
#[test]
fn reset_clears_overlap_buffer() {
let mut client = GapClient::new();
let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
assert!(!client.old_windows.is_empty(), "overlap buffer populated");
client.reset();
assert!(
client.old_windows.is_empty(),
"overlap buffer cleared after reset"
);
}
}