1use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, StreamType, timeouts};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10#[derive(Debug, thiserror::Error)]
12pub enum GapError {
13 #[error("decode: {0}")]
15 Decode(#[from] CodecError),
16 #[error("epoch stale: kp={kp}, expected={expected}")]
18 EpochStale {
19 kp: u32,
21 expected: u32,
23 },
24 #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
26 RtpReplay {
27 src: u32,
29 seq: u32,
31 hw: u32,
33 },
34 #[error("node: {0}")]
36 Node(#[from] NodeError),
37}
38
39#[derive(Debug)]
41pub enum GapAccept {
42 New(GapPayload),
44 Late(GapPayload),
47}
48
49struct OldEpochWindow {
52 epoch: u64,
53 in_hw: HashMap<u32, u32>,
54 expires: Instant,
55}
56
57pub struct GapClient {
71 out_rtp_seq: HashMap<u32, u32>,
72 in_hw: HashMap<u32, u32>,
73 current_epoch: Option<u64>,
74 old_windows: Vec<OldEpochWindow>,
76}
77
78impl Default for GapClient {
79 fn default() -> Self {
80 Self {
81 out_rtp_seq: HashMap::new(),
82 in_hw: HashMap::new(),
83 current_epoch: None,
84 old_windows: Vec::new(),
85 }
86 }
87}
88
89impl GapClient {
90 pub fn new() -> Self {
92 Self::default()
93 }
94
95 pub fn send<S: Sealer>(
101 &mut self,
102 node: &mut GroupNode,
103 seal: &mut S,
104 target: MemberId,
105 media_source_id: u32,
106 rtp_timestamp: u64,
107 opus: Vec<u8>,
108 ) -> Result<OutboundFrame, GapError> {
109 self.sync_epoch(node.current_epoch);
110 let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
111 *seq = seq.wrapping_add(1) & 0xFFFF;
113 let payload = GapPayload {
114 media_source_id,
115 rtp_sequence: *seq,
116 rtp_timestamp,
117 key_phase: node.current_epoch as u32,
118 opus_frame: serde_bytes::ByteBuf::from(opus),
119 };
120 let stream_id = node.member_stream_id(2);
121 Ok(node.send_payload(
122 seal,
123 target,
124 StreamType::Audio,
125 stream_id,
126 GbpFlags::ordered_only(),
127 &payload.to_cbor(),
128 )?)
129 }
130
131 pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GapAccept, GapError> {
139 self.sync_epoch(current_epoch);
140 let p = GapPayload::from_cbor(plaintext)?;
141 if p.key_phase == current_epoch as u32 {
142 let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
144 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
145 return Ok(GapAccept::Late(p));
146 }
147 self.in_hw.insert(p.media_source_id, p.rtp_sequence);
148 return Ok(GapAccept::New(p));
149 }
150 let now = Instant::now();
152 if let Some(old) = self
153 .old_windows
154 .iter_mut()
155 .find(|w| w.epoch == p.key_phase as u64 && w.expires > now)
156 {
157 let hw = old.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
158 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
159 return Ok(GapAccept::Late(p));
160 }
161 old.in_hw.insert(p.media_source_id, p.rtp_sequence);
162 return Ok(GapAccept::New(p));
163 }
164 Err(GapError::EpochStale {
165 kp: p.key_phase,
166 expected: current_epoch as u32,
167 })
168 }
169
170 pub fn sync_epoch(&mut self, epoch: u64) {
178 let now = Instant::now();
180 self.old_windows.retain(|w| w.expires > now);
181
182 if Some(epoch) != self.current_epoch {
183 if let Some(old_epoch) = self.current_epoch {
185 if !self.in_hw.is_empty() {
186 self.old_windows.push(OldEpochWindow {
187 epoch: old_epoch,
188 in_hw: std::mem::take(&mut self.in_hw),
189 expires: now + Duration::from_millis(timeouts::T_GAP_KEY_OVERLAP_MS),
190 });
191 }
192 }
193 self.out_rtp_seq.clear();
194 self.in_hw.clear();
195 self.current_epoch = Some(epoch);
196 }
197 }
198
199 pub fn reset(&mut self) {
202 self.out_rtp_seq.clear();
203 self.in_hw.clear();
204 self.old_windows.clear();
205 self.current_epoch = None;
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
214 crate::GapPayload {
215 media_source_id: 1,
216 rtp_sequence: seq,
217 rtp_timestamp: 960,
218 key_phase,
219 opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
220 }
221 .to_cbor()
222 }
223
224 #[test]
225 fn wraparound_after_ffff_is_accepted() {
226 let mut client = GapClient::new();
227 let _ = client.accept(&make_payload(0xFFFE, 1), 1).unwrap();
229 let _ = client.accept(&make_payload(0xFFFF, 1), 1).unwrap();
230 let result = client.accept(&make_payload(0x0000, 1), 1).unwrap();
232 assert!(
233 matches!(result, GapAccept::New(_)),
234 "seq=0 after 0xFFFF must be New"
235 );
236 }
237
238 #[test]
239 fn strict_replay_within_window_is_late() {
240 let mut client = GapClient::new();
241 let _ = client.accept(&make_payload(100, 1), 1).unwrap();
242 let result = client.accept(&make_payload(100, 1), 1).unwrap();
243 assert!(
244 matches!(result, GapAccept::Late(_)),
245 "exact dup must be Late"
246 );
247 }
248
249 #[test]
250 fn epoch_change_clears_window() {
251 let mut client = GapClient::new();
252 let _ = client.accept(&make_payload(1, 1), 1).unwrap();
253 let result = client.accept(&make_payload(1, 2), 2).unwrap();
255 assert!(
256 matches!(result, GapAccept::New(_)),
257 "new epoch resets window"
258 );
259 }
260
261 #[test]
264 fn old_epoch_frame_accepted_within_overlap() {
265 let mut client = GapClient::new();
266 let _ = client.accept(&make_payload(5, 1), 1).unwrap();
268 let _ = client.accept(&make_payload(1, 2), 2).unwrap();
270 let result = client.accept(&make_payload(6, 1), 2).unwrap();
272 assert!(
273 matches!(result, GapAccept::New(_)),
274 "late epoch-1 frame accepted within T_overlap"
275 );
276 }
277
278 #[test]
279 fn old_epoch_replay_is_late_within_overlap() {
280 let mut client = GapClient::new();
281 let _ = client.accept(&make_payload(5, 1), 1).unwrap();
282 let _ = client.accept(&make_payload(1, 2), 2).unwrap();
284 let result = client.accept(&make_payload(5, 1), 2).unwrap();
286 assert!(
287 matches!(result, GapAccept::Late(_)),
288 "duplicate from old epoch is Late"
289 );
290 }
291
292 #[test]
293 fn expired_old_epoch_frame_is_stale() {
294 let mut client = GapClient::new();
295 let _ = client.accept(&make_payload(5, 1), 1).unwrap();
296 let _ = client.accept(&make_payload(1, 2), 2).unwrap();
298 for w in &mut client.old_windows {
300 w.expires = Instant::now() - Duration::from_millis(1);
301 }
302 let result = client.accept(&make_payload(6, 1), 2);
304 assert!(
305 matches!(result, Err(GapError::EpochStale { .. })),
306 "expired epoch is Stale"
307 );
308 }
309
310 #[test]
311 fn reset_clears_overlap_buffer() {
312 let mut client = GapClient::new();
313 let _ = client.accept(&make_payload(1, 1), 1).unwrap();
314 let _ = client.accept(&make_payload(1, 2), 2).unwrap();
315 assert!(!client.old_windows.is_empty(), "overlap buffer populated");
316 client.reset();
317 assert!(
318 client.old_windows.is_empty(),
319 "overlap buffer cleared after reset"
320 );
321 }
322}