use crate::GapPayload;
use gbp::CodecError;
use gbp_core::{GbpFlags, MemberId, StreamType};
use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
use std::collections::HashMap;
#[derive(Debug, thiserror::Error)]
pub enum GapError {
#[error("decode: {0}")]
Decode(#[from] CodecError),
#[error("epoch stale: kp={kp}, expected={expected}")]
EpochStale {
kp: u32,
expected: u32,
},
#[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
RtpReplay {
src: u32,
seq: u32,
hw: u32,
},
#[error("node: {0}")]
Node(#[from] NodeError),
}
#[derive(Debug)]
pub enum GapAccept {
New(GapPayload),
Late(GapPayload),
}
#[derive(Default)]
pub struct GapClient {
out_rtp_seq: HashMap<u32, u32>,
in_hw: HashMap<u32, u32>,
}
impl GapClient {
pub fn new() -> Self {
Self::default()
}
pub fn send<S: Sealer>(
&mut self,
node: &mut GroupNode,
seal: &mut S,
target: MemberId,
media_source_id: u32,
rtp_timestamp: u64,
opus: Vec<u8>,
) -> Result<OutboundFrame, GapError> {
let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
*seq = seq.wrapping_add(1);
let payload = GapPayload {
media_source_id,
rtp_sequence: *seq,
rtp_timestamp,
key_phase: node.current_epoch as u32,
opus_frame: serde_bytes::ByteBuf::from(opus),
};
let stream_id = node.member_stream_id(2);
Ok(node.send_payload(
seal,
target,
StreamType::Audio,
stream_id,
GbpFlags::ordered_only(),
&payload.to_cbor(),
)?)
}
pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GapAccept, GapError> {
let p = GapPayload::from_cbor(plaintext)?;
if p.key_phase != current_epoch as u32 {
return Err(GapError::EpochStale { kp: p.key_phase, expected: current_epoch as u32 });
}
let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
if p.rtp_sequence <= hw {
return Ok(GapAccept::Late(p));
}
self.in_hw.insert(p.media_source_id, p.rtp_sequence);
Ok(GapAccept::New(p))
}
pub fn reset(&mut self) {
self.out_rtp_seq.clear();
self.in_hw.clear();
}
}