Skip to main content

gap/
client.rs

1//! Stateful GAP client.
2
3use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, PayloadCodec, StreamType, timeouts};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8use std::time::Duration;
9#[cfg(not(target_arch = "wasm32"))]
10use std::time::Instant;
11#[cfg(target_arch = "wasm32")]
12use web_time::Instant;
13
14/// Errors returned by [`GapClient`].
15#[derive(Debug, thiserror::Error)]
16pub enum GapError {
17    /// Failed to decode the CBOR payload.
18    #[error("decode: {0}")]
19    Decode(#[from] CodecError),
20    /// `key_phase` does not match the current group epoch (GAP §10).
21    #[error("epoch stale: kp={kp}, expected={expected}")]
22    EpochStale {
23        /// Reported `key_phase`.
24        kp: u32,
25        /// Expected `key_phase` (current epoch).
26        expected: u32,
27    },
28    /// `rtp_sequence` was already seen for the same `media_source_id`.
29    #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
30    RtpReplay {
31        /// `media_source_id`.
32        src: u32,
33        /// Reported `rtp_sequence`.
34        seq: u32,
35        /// Replay-window high-water mark.
36        hw: u32,
37    },
38    /// Underlying GBP node error during send.
39    #[error("node: {0}")]
40    Node(#[from] NodeError),
41}
42
43/// Outcome of accepting a GAP payload.
44#[derive(Debug)]
45pub enum GapAccept {
46    /// New audio frame.
47    New(GapPayload),
48    /// Late audio frame (`rtp_sequence` <= last seen). MAY be dropped per
49    /// GAP §7.
50    Late(GapPayload),
51}
52
53/// A snapshot of one old epoch's replay window, kept for `T_GAP_KEY_OVERLAP_MS`
54/// so that late in-flight audio frames from that epoch are still accepted.
55struct OldEpochWindow {
56    epoch: u64,
57    in_hw: HashMap<u32, u32>,
58    expires: Instant,
59}
60
61/// Stateful GAP client.
62///
63/// Maintains an outbound `rtp_sequence` counter and an inbound replay window,
64/// both keyed by `media_source_id`.
65///
66/// The client observes the current group epoch on every [`GapClient::send`]
67/// or [`GapClient::accept`] call and automatically clears its replay window
68/// when the epoch advances. Callers may also drive a reset explicitly via
69/// [`GapClient::reset`].
70///
71/// Old-epoch windows are retained for [`timeouts::T_GAP_KEY_OVERLAP_MS`]
72/// (default 10 s) so that in-flight audio frames from the previous epoch can
73/// still be accepted after an epoch transition (gap_rfc §4).
74pub struct GapClient {
75    out_rtp_seq: HashMap<u32, u32>,
76    in_hw: HashMap<u32, u32>,
77    current_epoch: Option<u64>,
78    /// Old replay windows from previous epochs, retained until T_overlap expires.
79    old_windows: Vec<OldEpochWindow>,
80}
81
82impl Default for GapClient {
83    fn default() -> Self {
84        Self {
85            out_rtp_seq: HashMap::new(),
86            in_hw: HashMap::new(),
87            current_epoch: None,
88            old_windows: Vec::new(),
89        }
90    }
91}
92
93impl GapClient {
94    /// Creates an empty client.
95    pub fn new() -> Self {
96        Self::default()
97    }
98
99    /// Sends an Opus frame. `key_phase` is taken from `node.current_epoch`.
100    /// Uses the `O` profile (no `R` / `A` — voice is not reliable, GAP §7).
101    ///
102    /// The wire `rtp_sequence` is clamped to the 16-bit RTP range; on
103    /// overflow it wraps from `0xFFFF` back to `0x0000`.
104    /// `codec` controls how the payload is encoded; use [`PayloadCodec::Cbor`]
105    /// for maximum compatibility or [`PayloadCodec::FlatBuffers`] for
106    /// lowest decode latency.
107    pub fn send<S: Sealer>(
108        &mut self,
109        node: &mut GroupNode,
110        seal: &mut S,
111        target: MemberId,
112        media_source_id: u32,
113        rtp_timestamp: u64,
114        opus: Vec<u8>,
115        codec: PayloadCodec,
116    ) -> Result<OutboundFrame, GapError> {
117        self.sync_epoch(node.current_epoch);
118        let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
119        // RTP `sequence_number` is 16 bits — clamp every increment.
120        *seq = seq.wrapping_add(1) & 0xFFFF;
121        let payload = GapPayload {
122            media_source_id,
123            rtp_sequence: *seq,
124            rtp_timestamp,
125            key_phase: node.current_epoch as u32,
126            opus_frame: serde_bytes::ByteBuf::from(opus),
127        };
128        let stream_id = node.member_stream_id(2);
129        Ok(node.send_payload(
130            seal,
131            target,
132            StreamType::Audio,
133            stream_id,
134            GbpFlags::ordered_only(),
135            &payload.to_bytes(codec),
136            codec,
137        )?)
138    }
139
140    /// Accepts a plaintext payload delivered by the GBP layer.
141    ///
142    /// Returns [`GapAccept::New`] for fresh frames, [`GapAccept::Late`] for
143    /// replays that the spec allows to drop. Returns [`GapError::EpochStale`]
144    /// only when `key_phase` refers to an epoch that has already expired its
145    /// T_overlap window; frames from epochs still within T_overlap are
146    /// accepted normally (gap_rfc §4).
147    /// `codec` must match [`DeliveredPayload::codec`].
148    pub fn accept(
149        &mut self,
150        plaintext: &[u8],
151        current_epoch: u64,
152        codec: PayloadCodec,
153    ) -> Result<GapAccept, GapError> {
154        self.sync_epoch(current_epoch);
155        let p = GapPayload::from_bytes(plaintext, codec)?;
156        if p.key_phase == current_epoch as u32 {
157            // Fast path: current epoch.
158            let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
159            if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
160                return Ok(GapAccept::Late(p));
161            }
162            self.in_hw.insert(p.media_source_id, p.rtp_sequence);
163            return Ok(GapAccept::New(p));
164        }
165        // Slow path: frame from an older epoch — check the overlap buffer.
166        let now = Instant::now();
167        if let Some(old) = self
168            .old_windows
169            .iter_mut()
170            .find(|w| w.epoch == p.key_phase as u64 && w.expires > now)
171        {
172            let hw = old.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
173            if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
174                return Ok(GapAccept::Late(p));
175            }
176            old.in_hw.insert(p.media_source_id, p.rtp_sequence);
177            return Ok(GapAccept::New(p));
178        }
179        Err(GapError::EpochStale {
180            kp: p.key_phase,
181            expected: current_epoch as u32,
182        })
183    }
184
185    /// Synchronises the client's view of the group epoch.
186    ///
187    /// When the epoch advances, the current replay window is moved to the
188    /// overlap buffer (retained for `T_GAP_KEY_OVERLAP_MS`) instead of being
189    /// discarded, so late in-flight frames from the previous epoch are still
190    /// accepted (gap_rfc §4). Expired entries are pruned on each call.
191    /// Called automatically by [`GapClient::send`] and [`GapClient::accept`].
192    pub fn sync_epoch(&mut self, epoch: u64) {
193        // Prune expired old windows.
194        let now = Instant::now();
195        self.old_windows.retain(|w| w.expires > now);
196
197        if Some(epoch) != self.current_epoch {
198            // Save current window to overlap buffer before resetting.
199            if let Some(old_epoch) = self.current_epoch {
200                if !self.in_hw.is_empty() {
201                    self.old_windows.push(OldEpochWindow {
202                        epoch: old_epoch,
203                        in_hw: std::mem::take(&mut self.in_hw),
204                        expires: now + Duration::from_millis(timeouts::T_GAP_KEY_OVERLAP_MS),
205                    });
206                }
207            }
208            self.out_rtp_seq.clear();
209            self.in_hw.clear();
210            self.current_epoch = Some(epoch);
211        }
212    }
213
214    /// Clears the outbound counters, the replay window, and the overlap buffer
215    /// unconditionally.
216    pub fn reset(&mut self) {
217        self.out_rtp_seq.clear();
218        self.in_hw.clear();
219        self.old_windows.clear();
220        self.current_epoch = None;
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
229        crate::GapPayload {
230            media_source_id: 1,
231            rtp_sequence: seq,
232            rtp_timestamp: 960,
233            key_phase,
234            opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
235        }
236        .to_bytes(PayloadCodec::Cbor)
237    }
238
239    #[test]
240    fn wraparound_after_ffff_is_accepted() {
241        let mut client = GapClient::new();
242        // Prime the high-water mark near wraparound point.
243        let _ = client.accept(&make_payload(0xFFFE, 1), 1, PayloadCodec::Cbor).unwrap();
244        let _ = client.accept(&make_payload(0xFFFF, 1), 1, PayloadCodec::Cbor).unwrap();
245        // After wraparound, seq=0 should be accepted as New, not Late.
246        let result = client.accept(&make_payload(0x0000, 1), 1, PayloadCodec::Cbor).unwrap();
247        assert!(
248            matches!(result, GapAccept::New(_)),
249            "seq=0 after 0xFFFF must be New"
250        );
251    }
252
253    #[test]
254    fn strict_replay_within_window_is_late() {
255        let mut client = GapClient::new();
256        let _ = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
257        let result = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
258        assert!(
259            matches!(result, GapAccept::Late(_)),
260            "exact dup must be Late"
261        );
262    }
263
264    #[test]
265    fn epoch_change_clears_window() {
266        let mut client = GapClient::new();
267        let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
268        // Epoch change: seq 1 was seen in epoch 1, but in epoch 2 it's new again.
269        let result = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
270        assert!(
271            matches!(result, GapAccept::New(_)),
272            "new epoch resets window"
273        );
274    }
275
276    // ---- T_overlap buffer (gap_rfc §4) --------------------------------------
277
278    #[test]
279    fn old_epoch_frame_accepted_within_overlap() {
280        let mut client = GapClient::new();
281        // Establish seq 5 in epoch 1.
282        let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
283        // Advance to epoch 2 — old window is buffered.
284        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
285        // A late frame from epoch 1 (seq 6, not seen yet) arrives before T_overlap expires.
286        let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor).unwrap();
287        assert!(
288            matches!(result, GapAccept::New(_)),
289            "late epoch-1 frame accepted within T_overlap"
290        );
291    }
292
293    #[test]
294    fn old_epoch_replay_is_late_within_overlap() {
295        let mut client = GapClient::new();
296        let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
297        // Advance epoch.
298        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
299        // Same seq from epoch 1 arrives again — replay → Late.
300        let result = client.accept(&make_payload(5, 1), 2, PayloadCodec::Cbor).unwrap();
301        assert!(
302            matches!(result, GapAccept::Late(_)),
303            "duplicate from old epoch is Late"
304        );
305    }
306
307    #[test]
308    fn expired_old_epoch_frame_is_stale() {
309        let mut client = GapClient::new();
310        let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
311        // Advance epoch.
312        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
313        // Manually expire the overlap window.
314        for w in &mut client.old_windows {
315            w.expires = Instant::now() - Duration::from_millis(1);
316        }
317        // Now a late epoch-1 frame should be rejected.
318        let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor);
319        assert!(
320            matches!(result, Err(GapError::EpochStale { .. })),
321            "expired epoch is Stale"
322        );
323    }
324
325    #[test]
326    fn reset_clears_overlap_buffer() {
327        let mut client = GapClient::new();
328        let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
329        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
330        assert!(!client.old_windows.is_empty(), "overlap buffer populated");
331        client.reset();
332        assert!(
333            client.old_windows.is_empty(),
334            "overlap buffer cleared after reset"
335        );
336    }
337}