Skip to main content

gap/
client.rs

1//! Stateful GAP client.
2
3use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, PayloadCodec, StreamType, timeouts};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10/// Errors returned by [`GapClient`].
11#[derive(Debug, thiserror::Error)]
12pub enum GapError {
13    /// Failed to decode the CBOR payload.
14    #[error("decode: {0}")]
15    Decode(#[from] CodecError),
16    /// `key_phase` does not match the current group epoch (GAP §10).
17    #[error("epoch stale: kp={kp}, expected={expected}")]
18    EpochStale {
19        /// Reported `key_phase`.
20        kp: u32,
21        /// Expected `key_phase` (current epoch).
22        expected: u32,
23    },
24    /// `rtp_sequence` was already seen for the same `media_source_id`.
25    #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
26    RtpReplay {
27        /// `media_source_id`.
28        src: u32,
29        /// Reported `rtp_sequence`.
30        seq: u32,
31        /// Replay-window high-water mark.
32        hw: u32,
33    },
34    /// Underlying GBP node error during send.
35    #[error("node: {0}")]
36    Node(#[from] NodeError),
37}
38
39/// Outcome of accepting a GAP payload.
40#[derive(Debug)]
41pub enum GapAccept {
42    /// New audio frame.
43    New(GapPayload),
44    /// Late audio frame (`rtp_sequence` <= last seen). MAY be dropped per
45    /// GAP §7.
46    Late(GapPayload),
47}
48
49/// A snapshot of one old epoch's replay window, kept for `T_GAP_KEY_OVERLAP_MS`
50/// so that late in-flight audio frames from that epoch are still accepted.
51struct OldEpochWindow {
52    epoch: u64,
53    in_hw: HashMap<u32, u32>,
54    expires: Instant,
55}
56
57/// Stateful GAP client.
58///
59/// Maintains an outbound `rtp_sequence` counter and an inbound replay window,
60/// both keyed by `media_source_id`.
61///
62/// The client observes the current group epoch on every [`GapClient::send`]
63/// or [`GapClient::accept`] call and automatically clears its replay window
64/// when the epoch advances. Callers may also drive a reset explicitly via
65/// [`GapClient::reset`].
66///
67/// Old-epoch windows are retained for [`timeouts::T_GAP_KEY_OVERLAP_MS`]
68/// (default 10 s) so that in-flight audio frames from the previous epoch can
69/// still be accepted after an epoch transition (gap_rfc §4).
70pub struct GapClient {
71    out_rtp_seq: HashMap<u32, u32>,
72    in_hw: HashMap<u32, u32>,
73    current_epoch: Option<u64>,
74    /// Old replay windows from previous epochs, retained until T_overlap expires.
75    old_windows: Vec<OldEpochWindow>,
76}
77
78impl Default for GapClient {
79    fn default() -> Self {
80        Self {
81            out_rtp_seq: HashMap::new(),
82            in_hw: HashMap::new(),
83            current_epoch: None,
84            old_windows: Vec::new(),
85        }
86    }
87}
88
89impl GapClient {
90    /// Creates an empty client.
91    pub fn new() -> Self {
92        Self::default()
93    }
94
95    /// Sends an Opus frame. `key_phase` is taken from `node.current_epoch`.
96    /// Uses the `O` profile (no `R` / `A` — voice is not reliable, GAP §7).
97    ///
98    /// The wire `rtp_sequence` is clamped to the 16-bit RTP range; on
99    /// overflow it wraps from `0xFFFF` back to `0x0000`.
100    /// `codec` controls how the payload is encoded; use [`PayloadCodec::Cbor`]
101    /// for maximum compatibility or [`PayloadCodec::FlatBuffers`] for
102    /// lowest decode latency.
103    pub fn send<S: Sealer>(
104        &mut self,
105        node: &mut GroupNode,
106        seal: &mut S,
107        target: MemberId,
108        media_source_id: u32,
109        rtp_timestamp: u64,
110        opus: Vec<u8>,
111        codec: PayloadCodec,
112    ) -> Result<OutboundFrame, GapError> {
113        self.sync_epoch(node.current_epoch);
114        let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
115        // RTP `sequence_number` is 16 bits — clamp every increment.
116        *seq = seq.wrapping_add(1) & 0xFFFF;
117        let payload = GapPayload {
118            media_source_id,
119            rtp_sequence: *seq,
120            rtp_timestamp,
121            key_phase: node.current_epoch as u32,
122            opus_frame: serde_bytes::ByteBuf::from(opus),
123        };
124        let stream_id = node.member_stream_id(2);
125        Ok(node.send_payload(
126            seal,
127            target,
128            StreamType::Audio,
129            stream_id,
130            GbpFlags::ordered_only(),
131            &payload.to_bytes(codec),
132            codec,
133        )?)
134    }
135
136    /// Accepts a plaintext payload delivered by the GBP layer.
137    ///
138    /// Returns [`GapAccept::New`] for fresh frames, [`GapAccept::Late`] for
139    /// replays that the spec allows to drop. Returns [`GapError::EpochStale`]
140    /// only when `key_phase` refers to an epoch that has already expired its
141    /// T_overlap window; frames from epochs still within T_overlap are
142    /// accepted normally (gap_rfc §4).
143    /// `codec` must match [`DeliveredPayload::codec`].
144    pub fn accept(
145        &mut self,
146        plaintext: &[u8],
147        current_epoch: u64,
148        codec: PayloadCodec,
149    ) -> Result<GapAccept, GapError> {
150        self.sync_epoch(current_epoch);
151        let p = GapPayload::from_bytes(plaintext, codec)?;
152        if p.key_phase == current_epoch as u32 {
153            // Fast path: current epoch.
154            let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
155            if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
156                return Ok(GapAccept::Late(p));
157            }
158            self.in_hw.insert(p.media_source_id, p.rtp_sequence);
159            return Ok(GapAccept::New(p));
160        }
161        // Slow path: frame from an older epoch — check the overlap buffer.
162        let now = Instant::now();
163        if let Some(old) = self
164            .old_windows
165            .iter_mut()
166            .find(|w| w.epoch == p.key_phase as u64 && w.expires > now)
167        {
168            let hw = old.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
169            if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
170                return Ok(GapAccept::Late(p));
171            }
172            old.in_hw.insert(p.media_source_id, p.rtp_sequence);
173            return Ok(GapAccept::New(p));
174        }
175        Err(GapError::EpochStale {
176            kp: p.key_phase,
177            expected: current_epoch as u32,
178        })
179    }
180
181    /// Synchronises the client's view of the group epoch.
182    ///
183    /// When the epoch advances, the current replay window is moved to the
184    /// overlap buffer (retained for `T_GAP_KEY_OVERLAP_MS`) instead of being
185    /// discarded, so late in-flight frames from the previous epoch are still
186    /// accepted (gap_rfc §4). Expired entries are pruned on each call.
187    /// Called automatically by [`GapClient::send`] and [`GapClient::accept`].
188    pub fn sync_epoch(&mut self, epoch: u64) {
189        // Prune expired old windows.
190        let now = Instant::now();
191        self.old_windows.retain(|w| w.expires > now);
192
193        if Some(epoch) != self.current_epoch {
194            // Save current window to overlap buffer before resetting.
195            if let Some(old_epoch) = self.current_epoch {
196                if !self.in_hw.is_empty() {
197                    self.old_windows.push(OldEpochWindow {
198                        epoch: old_epoch,
199                        in_hw: std::mem::take(&mut self.in_hw),
200                        expires: now + Duration::from_millis(timeouts::T_GAP_KEY_OVERLAP_MS),
201                    });
202                }
203            }
204            self.out_rtp_seq.clear();
205            self.in_hw.clear();
206            self.current_epoch = Some(epoch);
207        }
208    }
209
210    /// Clears the outbound counters, the replay window, and the overlap buffer
211    /// unconditionally.
212    pub fn reset(&mut self) {
213        self.out_rtp_seq.clear();
214        self.in_hw.clear();
215        self.old_windows.clear();
216        self.current_epoch = None;
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
225        crate::GapPayload {
226            media_source_id: 1,
227            rtp_sequence: seq,
228            rtp_timestamp: 960,
229            key_phase,
230            opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
231        }
232        .to_bytes(PayloadCodec::Cbor)
233    }
234
235    #[test]
236    fn wraparound_after_ffff_is_accepted() {
237        let mut client = GapClient::new();
238        // Prime the high-water mark near wraparound point.
239        let _ = client.accept(&make_payload(0xFFFE, 1), 1, PayloadCodec::Cbor).unwrap();
240        let _ = client.accept(&make_payload(0xFFFF, 1), 1, PayloadCodec::Cbor).unwrap();
241        // After wraparound, seq=0 should be accepted as New, not Late.
242        let result = client.accept(&make_payload(0x0000, 1), 1, PayloadCodec::Cbor).unwrap();
243        assert!(
244            matches!(result, GapAccept::New(_)),
245            "seq=0 after 0xFFFF must be New"
246        );
247    }
248
249    #[test]
250    fn strict_replay_within_window_is_late() {
251        let mut client = GapClient::new();
252        let _ = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
253        let result = client.accept(&make_payload(100, 1), 1, PayloadCodec::Cbor).unwrap();
254        assert!(
255            matches!(result, GapAccept::Late(_)),
256            "exact dup must be Late"
257        );
258    }
259
260    #[test]
261    fn epoch_change_clears_window() {
262        let mut client = GapClient::new();
263        let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
264        // Epoch change: seq 1 was seen in epoch 1, but in epoch 2 it's new again.
265        let result = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
266        assert!(
267            matches!(result, GapAccept::New(_)),
268            "new epoch resets window"
269        );
270    }
271
272    // ---- T_overlap buffer (gap_rfc §4) --------------------------------------
273
274    #[test]
275    fn old_epoch_frame_accepted_within_overlap() {
276        let mut client = GapClient::new();
277        // Establish seq 5 in epoch 1.
278        let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
279        // Advance to epoch 2 — old window is buffered.
280        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
281        // A late frame from epoch 1 (seq 6, not seen yet) arrives before T_overlap expires.
282        let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor).unwrap();
283        assert!(
284            matches!(result, GapAccept::New(_)),
285            "late epoch-1 frame accepted within T_overlap"
286        );
287    }
288
289    #[test]
290    fn old_epoch_replay_is_late_within_overlap() {
291        let mut client = GapClient::new();
292        let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
293        // Advance epoch.
294        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
295        // Same seq from epoch 1 arrives again — replay → Late.
296        let result = client.accept(&make_payload(5, 1), 2, PayloadCodec::Cbor).unwrap();
297        assert!(
298            matches!(result, GapAccept::Late(_)),
299            "duplicate from old epoch is Late"
300        );
301    }
302
303    #[test]
304    fn expired_old_epoch_frame_is_stale() {
305        let mut client = GapClient::new();
306        let _ = client.accept(&make_payload(5, 1), 1, PayloadCodec::Cbor).unwrap();
307        // Advance epoch.
308        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
309        // Manually expire the overlap window.
310        for w in &mut client.old_windows {
311            w.expires = Instant::now() - Duration::from_millis(1);
312        }
313        // Now a late epoch-1 frame should be rejected.
314        let result = client.accept(&make_payload(6, 1), 2, PayloadCodec::Cbor);
315        assert!(
316            matches!(result, Err(GapError::EpochStale { .. })),
317            "expired epoch is Stale"
318        );
319    }
320
321    #[test]
322    fn reset_clears_overlap_buffer() {
323        let mut client = GapClient::new();
324        let _ = client.accept(&make_payload(1, 1), 1, PayloadCodec::Cbor).unwrap();
325        let _ = client.accept(&make_payload(1, 2), 2, PayloadCodec::Cbor).unwrap();
326        assert!(!client.old_windows.is_empty(), "overlap buffer populated");
327        client.reset();
328        assert!(
329            client.old_windows.is_empty(),
330            "overlap buffer cleared after reset"
331        );
332    }
333}