1use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, PayloadCodec, StreamType, timeouts};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8use std::time::Duration;
9#[cfg(not(target_arch = "wasm32"))]
10use std::time::Instant;
11#[cfg(target_arch = "wasm32")]
12use web_time::Instant;
13
14#[derive(Debug, thiserror::Error)]
16pub enum GapError {
17 #[error("decode: {0}")]
19 Decode(#[from] CodecError),
20 #[error("epoch stale: kp={kp}, expected={expected}")]
22 EpochStale {
23 kp: u32,
25 expected: u32,
27 },
28 #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
30 RtpReplay {
31 src: u32,
33 seq: u32,
35 hw: u32,
37 },
38 #[error("node: {0}")]
40 Node(#[from] NodeError),
41}
42
43#[derive(Debug)]
45pub enum GapAccept {
46 New(GapPayload),
48 Late(GapPayload),
51}
52
53struct OldEpochWindow {
56 epoch: u64,
57 in_hw: HashMap<u32, u32>,
58 expires: Instant,
59}
60
61pub struct GapClient {
75 out_rtp_seq: HashMap<u32, u32>,
76 in_hw: HashMap<u32, u32>,
77 current_epoch: Option<u64>,
78 old_windows: Vec<OldEpochWindow>,
80}
81
82impl Default for GapClient {
83 fn default() -> Self {
84 Self {
85 out_rtp_seq: HashMap::new(),
86 in_hw: HashMap::new(),
87 current_epoch: None,
88 old_windows: Vec::new(),
89 }
90 }
91}
92
93impl GapClient {
94 pub fn new() -> Self {
96 Self::default()
97 }
98
99 pub fn send<S: Sealer>(
108 &mut self,
109 node: &mut GroupNode,
110 seal: &mut S,
111 target: MemberId,
112 media_source_id: u32,
113 rtp_timestamp: u64,
114 opus: Vec<u8>,
115 codec: PayloadCodec,
116 ) -> Result<OutboundFrame, GapError> {
117 self.sync_epoch(node.current_epoch);
118 let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
119 *seq = seq.wrapping_add(1) & 0xFFFF;
121 let payload = GapPayload {
122 media_source_id,
123 rtp_sequence: *seq,
124 rtp_timestamp,
125 key_phase: node.current_epoch as u32,
126 opus_frame: serde_bytes::ByteBuf::from(opus),
127 };
128 let stream_id = node.member_stream_id(2);
129 Ok(node.send_payload(
130 seal,
131 target,
132 StreamType::Audio,
133 stream_id,
134 GbpFlags::ordered_only(),
135 &payload.to_bytes(codec),
136 codec,
137 )?)
138 }
139
140 pub fn accept(
149 &mut self,
150 plaintext: &[u8],
151 current_epoch: u64,
152 codec: PayloadCodec,
153 ) -> Result<GapAccept, GapError> {
154 self.sync_epoch(current_epoch);
155 let p = GapPayload::from_bytes(plaintext, codec)?;
156 if p.key_phase == current_epoch as u32 {
157 let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
159 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
160 return Ok(GapAccept::Late(p));
161 }
162 self.in_hw.insert(p.media_source_id, p.rtp_sequence);
163 return Ok(GapAccept::New(p));
164 }
165 let now = Instant::now();
167 if let Some(old) = self
168 .old_windows
169 .iter_mut()
170 .find(|w| w.epoch == p.key_phase as u64 && w.expires > now)
171 {
172 let hw = old.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
173 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
174 return Ok(GapAccept::Late(p));
175 }
176 old.in_hw.insert(p.media_source_id, p.rtp_sequence);
177 return Ok(GapAccept::New(p));
178 }
179 Err(GapError::EpochStale {
180 kp: p.key_phase,
181 expected: current_epoch as u32,
182 })
183 }
184
185 pub fn sync_epoch(&mut self, epoch: u64) {
193 let now = Instant::now();
195 self.old_windows.retain(|w| w.expires > now);
196
197 if Some(epoch) != self.current_epoch {
198 if let Some(old_epoch) = self.current_epoch {
200 if !self.in_hw.is_empty() {
201 self.old_windows.push(OldEpochWindow {
202 epoch: old_epoch,
203 in_hw: std::mem::take(&mut self.in_hw),
204 expires: now + Duration::from_millis(timeouts::T_GAP_KEY_OVERLAP_MS),
205 });
206 }
207 }
208 self.out_rtp_seq.clear();
209 self.in_hw.clear();
210 self.current_epoch = Some(epoch);
211 }
212 }
213
214 pub fn reset(&mut self) {
217 self.out_rtp_seq.clear();
218 self.in_hw.clear();
219 self.old_windows.clear();
220 self.current_epoch = None;
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
229 crate::GapPayload {
230 media_source_id: 1,
231 rtp_sequence: seq,
232 rtp_timestamp: 960,
233 key_phase,
234 opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
235 }
236 .to_bytes(PayloadCodec::Cbor)
237 }
238
239 #[test]
240 fn wraparound_after_ffff_is_accepted() {
241 let mut client = GapClient::new();
242 let _ = client.accept(&make_payload(0xFFFE, 1), 1, PayloadCodec::Cbor).unwrap();
244 let _ = client.accept(&make_payload(0xFFFF, 1), 1, PayloadCodec::Cbor).unwrap();
245 let result = client.accept(&make_payload(0x0000, 1), 1, PayloadCodec::Cbor).unwrap();
247 assert!(
248 matches!(result, GapAccept::New(_)),
249 "seq=0 after 0xFFFF must be New"
250 );
251 }
252
253 #[test]
254 fn strict_replay_within_window_is_late() {
255 let mut client = GapClient::new();
256 let _ = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
257 let result = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
258 assert!(
259 matches!(result, GapAccept::Late(_)),
260 "exact dup must be Late"
261 );
262 }
263
264 #[test]
265 fn epoch_change_clears_window() {
266 let mut client = GapClient::new();
267 let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
268 let result = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
270 assert!(
271 matches!(result, GapAccept::New(_)),
272 "new epoch resets window"
273 );
274 }
275
276 #[test]
279 fn old_epoch_frame_accepted_within_overlap() {
280 let mut client = GapClient::new();
281 let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
283 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
285 let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor).unwrap();
287 assert!(
288 matches!(result, GapAccept::New(_)),
289 "late epoch-1 frame accepted within T_overlap"
290 );
291 }
292
293 #[test]
294 fn old_epoch_replay_is_late_within_overlap() {
295 let mut client = GapClient::new();
296 let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
297 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
299 let result = client.accept(&make_payload(5, 1), 2, PayloadCodec::Cbor).unwrap();
301 assert!(
302 matches!(result, GapAccept::Late(_)),
303 "duplicate from old epoch is Late"
304 );
305 }
306
307 #[test]
308 fn expired_old_epoch_frame_is_stale() {
309 let mut client = GapClient::new();
310 let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
311 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
313 for w in &mut client.old_windows {
315 w.expires = Instant::now() - Duration::from_millis(1);
316 }
317 let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor);
319 assert!(
320 matches!(result, Err(GapError::EpochStale { .. })),
321 "expired epoch is Stale"
322 );
323 }
324
325 #[test]
326 fn reset_clears_overlap_buffer() {
327 let mut client = GapClient::new();
328 let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
329 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
330 assert!(!client.old_windows.is_empty(), "overlap buffer populated");
331 client.reset();
332 assert!(
333 client.old_windows.is_empty(),
334 "overlap buffer cleared after reset"
335 );
336 }
337}