1use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8
9#[derive(Debug, thiserror::Error)]
11pub enum GapError {
12 #[error("decode: {0}")]
14 Decode(#[from] CodecError),
15 #[error("epoch stale: kp={kp}, expected={expected}")]
17 EpochStale {
18 kp: u32,
20 expected: u32,
22 },
23 #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
25 RtpReplay {
26 src: u32,
28 seq: u32,
30 hw: u32,
32 },
33 #[error("node: {0}")]
35 Node(#[from] NodeError),
36}
37
38#[derive(Debug)]
40pub enum GapAccept {
41 New(GapPayload),
43 Late(GapPayload),
46}
47
48#[derive(Default)]
58pub struct GapClient {
59 out_rtp_seq: HashMap<u32, u32>,
60 in_hw: HashMap<u32, u32>,
61 current_epoch: Option<u64>,
62}
63
64impl GapClient {
65 pub fn new() -> Self {
67 Self::default()
68 }
69
70 pub fn send<S: Sealer>(
76 &mut self,
77 node: &mut GroupNode,
78 seal: &mut S,
79 target: MemberId,
80 media_source_id: u32,
81 rtp_timestamp: u64,
82 opus: Vec<u8>,
83 ) -> Result<OutboundFrame, GapError> {
84 self.sync_epoch(node.current_epoch);
85 let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
86 *seq = seq.wrapping_add(1) & 0xFFFF;
88 let payload = GapPayload {
89 media_source_id,
90 rtp_sequence: *seq,
91 rtp_timestamp,
92 key_phase: node.current_epoch as u32,
93 opus_frame: serde_bytes::ByteBuf::from(opus),
94 };
95 let stream_id = node.member_stream_id(2);
96 Ok(node.send_payload(
97 seal,
98 target,
99 StreamType::Audio,
100 stream_id,
101 GbpFlags::ordered_only(),
102 &payload.to_cbor(),
103 )?)
104 }
105
106 pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GapAccept, GapError> {
111 self.sync_epoch(current_epoch);
112 let p = GapPayload::from_cbor(plaintext)?;
113 if p.key_phase != current_epoch as u32 {
114 return Err(GapError::EpochStale { kp: p.key_phase, expected: current_epoch as u32 });
115 }
116 let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
117 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
120 return Ok(GapAccept::Late(p));
121 }
122 self.in_hw.insert(p.media_source_id, p.rtp_sequence);
123 Ok(GapAccept::New(p))
124 }
125
126 pub fn sync_epoch(&mut self, epoch: u64) {
130 if Some(epoch) != self.current_epoch {
131 self.out_rtp_seq.clear();
132 self.in_hw.clear();
133 self.current_epoch = Some(epoch);
134 }
135 }
136
137 pub fn reset(&mut self) {
139 self.out_rtp_seq.clear();
140 self.in_hw.clear();
141 self.current_epoch = None;
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
150 crate::GapPayload {
151 media_source_id: 1,
152 rtp_sequence: seq,
153 rtp_timestamp: 960,
154 key_phase,
155 opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
156 }
157 .to_cbor()
158 }
159
160 #[test]
161 fn wraparound_after_ffff_is_accepted() {
162 let mut client = GapClient::new();
163 let _ = client.accept(&make_payload(0xFFFE, 1), 1).unwrap();
165 let _ = client.accept(&make_payload(0xFFFF, 1), 1).unwrap();
166 let result = client.accept(&make_payload(0x0000, 1), 1).unwrap();
168 assert!(matches!(result, GapAccept::New(_)), "seq=0 after 0xFFFF must be New");
169 }
170
171 #[test]
172 fn strict_replay_within_window_is_late() {
173 let mut client = GapClient::new();
174 let _ = client.accept(&make_payload(100, 1), 1).unwrap();
175 let result = client.accept(&make_payload(100, 1), 1).unwrap();
176 assert!(matches!(result, GapAccept::Late(_)), "exact dup must be Late");
177 }
178
179 #[test]
180 fn epoch_change_clears_window() {
181 let mut client = GapClient::new();
182 let _ = client.accept(&make_payload(1, 1), 1).unwrap();
183 let result = client.accept(&make_payload(1, 2), 2).unwrap();
185 assert!(matches!(result, GapAccept::New(_)), "new epoch resets window");
186 }
187}