1use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, PayloadCodec, StreamType, timeouts};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10#[derive(Debug, thiserror::Error)]
12pub enum GapError {
13 #[error("decode: {0}")]
15 Decode(#[from] CodecError),
16 #[error("epoch stale: kp={kp}, expected={expected}")]
18 EpochStale {
19 kp: u32,
21 expected: u32,
23 },
24 #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
26 RtpReplay {
27 src: u32,
29 seq: u32,
31 hw: u32,
33 },
34 #[error("node: {0}")]
36 Node(#[from] NodeError),
37}
38
39#[derive(Debug)]
41pub enum GapAccept {
42 New(GapPayload),
44 Late(GapPayload),
47}
48
49struct OldEpochWindow {
52 epoch: u64,
53 in_hw: HashMap<u32, u32>,
54 expires: Instant,
55}
56
57pub struct GapClient {
71 out_rtp_seq: HashMap<u32, u32>,
72 in_hw: HashMap<u32, u32>,
73 current_epoch: Option<u64>,
74 old_windows: Vec<OldEpochWindow>,
76}
77
78impl Default for GapClient {
79 fn default() -> Self {
80 Self {
81 out_rtp_seq: HashMap::new(),
82 in_hw: HashMap::new(),
83 current_epoch: None,
84 old_windows: Vec::new(),
85 }
86 }
87}
88
89impl GapClient {
90 pub fn new() -> Self {
92 Self::default()
93 }
94
95 pub fn send<S: Sealer>(
104 &mut self,
105 node: &mut GroupNode,
106 seal: &mut S,
107 target: MemberId,
108 media_source_id: u32,
109 rtp_timestamp: u64,
110 opus: Vec<u8>,
111 codec: PayloadCodec,
112 ) -> Result<OutboundFrame, GapError> {
113 self.sync_epoch(node.current_epoch);
114 let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
115 *seq = seq.wrapping_add(1) & 0xFFFF;
117 let payload = GapPayload {
118 media_source_id,
119 rtp_sequence: *seq,
120 rtp_timestamp,
121 key_phase: node.current_epoch as u32,
122 opus_frame: serde_bytes::ByteBuf::from(opus),
123 };
124 let stream_id = node.member_stream_id(2);
125 Ok(node.send_payload(
126 seal,
127 target,
128 StreamType::Audio,
129 stream_id,
130 GbpFlags::ordered_only(),
131 &payload.to_bytes(codec),
132 codec,
133 )?)
134 }
135
136 pub fn accept(
145 &mut self,
146 plaintext: &[u8],
147 current_epoch: u64,
148 codec: PayloadCodec,
149 ) -> Result<GapAccept, GapError> {
150 self.sync_epoch(current_epoch);
151 let p = GapPayload::from_bytes(plaintext, codec)?;
152 if p.key_phase == current_epoch as u32 {
153 let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
155 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
156 return Ok(GapAccept::Late(p));
157 }
158 self.in_hw.insert(p.media_source_id, p.rtp_sequence);
159 return Ok(GapAccept::New(p));
160 }
161 let now = Instant::now();
163 if let Some(old) = self
164 .old_windows
165 .iter_mut()
166 .find(|w| w.epoch == p.key_phase as u64 && w.expires > now)
167 {
168 let hw = old.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
169 if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
170 return Ok(GapAccept::Late(p));
171 }
172 old.in_hw.insert(p.media_source_id, p.rtp_sequence);
173 return Ok(GapAccept::New(p));
174 }
175 Err(GapError::EpochStale {
176 kp: p.key_phase,
177 expected: current_epoch as u32,
178 })
179 }
180
181 pub fn sync_epoch(&mut self, epoch: u64) {
189 let now = Instant::now();
191 self.old_windows.retain(|w| w.expires > now);
192
193 if Some(epoch) != self.current_epoch {
194 if let Some(old_epoch) = self.current_epoch {
196 if !self.in_hw.is_empty() {
197 self.old_windows.push(OldEpochWindow {
198 epoch: old_epoch,
199 in_hw: std::mem::take(&mut self.in_hw),
200 expires: now + Duration::from_millis(timeouts::T_GAP_KEY_OVERLAP_MS),
201 });
202 }
203 }
204 self.out_rtp_seq.clear();
205 self.in_hw.clear();
206 self.current_epoch = Some(epoch);
207 }
208 }
209
210 pub fn reset(&mut self) {
213 self.out_rtp_seq.clear();
214 self.in_hw.clear();
215 self.old_windows.clear();
216 self.current_epoch = None;
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
225 crate::GapPayload {
226 media_source_id: 1,
227 rtp_sequence: seq,
228 rtp_timestamp: 960,
229 key_phase,
230 opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
231 }
232 .to_bytes(PayloadCodec::Cbor)
233 }
234
235 #[test]
236 fn wraparound_after_ffff_is_accepted() {
237 let mut client = GapClient::new();
238 let _ = client.accept(&make_payload(0xFFFE, 1), 1, PayloadCodec::Cbor).unwrap();
240 let _ = client.accept(&make_payload(0xFFFF, 1), 1, PayloadCodec::Cbor).unwrap();
241 let result = client.accept(&make_payload(0x0000, 1), 1, PayloadCodec::Cbor).unwrap();
243 assert!(
244 matches!(result, GapAccept::New(_)),
245 "seq=0 after 0xFFFF must be New"
246 );
247 }
248
249 #[test]
250 fn strict_replay_within_window_is_late() {
251 let mut client = GapClient::new();
252 let _ = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
253 let result = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
254 assert!(
255 matches!(result, GapAccept::Late(_)),
256 "exact dup must be Late"
257 );
258 }
259
260 #[test]
261 fn epoch_change_clears_window() {
262 let mut client = GapClient::new();
263 let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
264 let result = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
266 assert!(
267 matches!(result, GapAccept::New(_)),
268 "new epoch resets window"
269 );
270 }
271
272 #[test]
275 fn old_epoch_frame_accepted_within_overlap() {
276 let mut client = GapClient::new();
277 let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
279 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
281 let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor).unwrap();
283 assert!(
284 matches!(result, GapAccept::New(_)),
285 "late epoch-1 frame accepted within T_overlap"
286 );
287 }
288
289 #[test]
290 fn old_epoch_replay_is_late_within_overlap() {
291 let mut client = GapClient::new();
292 let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
293 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
295 let result = client.accept(&make_payload(5, 1), 2, PayloadCodec::Cbor).unwrap();
297 assert!(
298 matches!(result, GapAccept::Late(_)),
299 "duplicate from old epoch is Late"
300 );
301 }
302
303 #[test]
304 fn expired_old_epoch_frame_is_stale() {
305 let mut client = GapClient::new();
306 let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
307 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
309 for w in &mut client.old_windows {
311 w.expires = Instant::now() - Duration::from_millis(1);
312 }
313 let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor);
315 assert!(
316 matches!(result, Err(GapError::EpochStale { .. })),
317 "expired epoch is Stale"
318 );
319 }
320
321 #[test]
322 fn reset_clears_overlap_buffer() {
323 let mut client = GapClient::new();
324 let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
325 let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
326 assert!(!client.old_windows.is_empty(), "overlap buffer populated");
327 client.reset();
328 assert!(
329 client.old_windows.is_empty(),
330 "overlap buffer cleared after reset"
331 );
332 }
333}