Skip to main content

gap/
client.rs

1//! Stateful GAP client.
2
3use crate::GapPayload;
4use gbp::CodecError;
5use gbp_core::{GbpFlags, MemberId, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashMap;
8
9/// Errors returned by [`GapClient`].
10#[derive(Debug, thiserror::Error)]
11pub enum GapError {
12    /// Failed to decode the CBOR payload.
13    #[error("decode: {0}")]
14    Decode(#[from] CodecError),
15    /// `key_phase` does not match the current group epoch (GAP §10).
16    #[error("epoch stale: kp={kp}, expected={expected}")]
17    EpochStale {
18        /// Reported `key_phase`.
19        kp: u32,
20        /// Expected `key_phase` (current epoch).
21        expected: u32,
22    },
23    /// `rtp_sequence` was already seen for the same `media_source_id`.
24    #[error("rtp replay: src={src}, seq={seq}, hw={hw}")]
25    RtpReplay {
26        /// `media_source_id`.
27        src: u32,
28        /// Reported `rtp_sequence`.
29        seq: u32,
30        /// Replay-window high-water mark.
31        hw: u32,
32    },
33    /// Underlying GBP node error during send.
34    #[error("node: {0}")]
35    Node(#[from] NodeError),
36}
37
38/// Outcome of accepting a GAP payload.
39#[derive(Debug)]
40pub enum GapAccept {
41    /// New audio frame.
42    New(GapPayload),
43    /// Late audio frame (`rtp_sequence` <= last seen). MAY be dropped per
44    /// GAP §7.
45    Late(GapPayload),
46}
47
48/// Stateful GAP client.
49///
50/// Maintains an outbound `rtp_sequence` counter and an inbound replay window,
51/// both keyed by `media_source_id`.
52///
53/// The client observes the current group epoch on every [`GapClient::send`]
54/// or [`GapClient::accept`] call and automatically clears its replay window
55/// when the epoch advances. Callers may also drive a reset explicitly via
56/// [`GapClient::reset`].
57#[derive(Default)]
58pub struct GapClient {
59    out_rtp_seq: HashMap<u32, u32>,
60    in_hw: HashMap<u32, u32>,
61    current_epoch: Option<u64>,
62}
63
64impl GapClient {
65    /// Creates an empty client.
66    pub fn new() -> Self {
67        Self::default()
68    }
69
70    /// Sends an Opus frame. `key_phase` is taken from `node.current_epoch`.
71    /// Uses the `O` profile (no `R` / `A` — voice is not reliable, GAP §7).
72    ///
73    /// The wire `rtp_sequence` is clamped to the 16-bit RTP range; on
74    /// overflow it wraps from `0xFFFF` back to `0x0000`.
75    pub fn send<S: Sealer>(
76        &mut self,
77        node: &mut GroupNode,
78        seal: &mut S,
79        target: MemberId,
80        media_source_id: u32,
81        rtp_timestamp: u64,
82        opus: Vec<u8>,
83    ) -> Result<OutboundFrame, GapError> {
84        self.sync_epoch(node.current_epoch);
85        let seq = self.out_rtp_seq.entry(media_source_id).or_insert(0);
86        // RTP `sequence_number` is 16 bits — clamp every increment.
87        *seq = seq.wrapping_add(1) & 0xFFFF;
88        let payload = GapPayload {
89            media_source_id,
90            rtp_sequence: *seq,
91            rtp_timestamp,
92            key_phase: node.current_epoch as u32,
93            opus_frame: serde_bytes::ByteBuf::from(opus),
94        };
95        let stream_id = node.member_stream_id(2);
96        Ok(node.send_payload(
97            seal,
98            target,
99            StreamType::Audio,
100            stream_id,
101            GbpFlags::ordered_only(),
102            &payload.to_cbor(),
103        )?)
104    }
105
106    /// Accepts a plaintext payload delivered by the GBP layer.
107    /// Returns [`GapAccept::New`] for fresh frames, [`GapAccept::Late`] for
108    /// replays that the spec allows to drop, or [`GapError::EpochStale`] when
109    /// `key_phase` does not match the current epoch.
110    pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GapAccept, GapError> {
111        self.sync_epoch(current_epoch);
112        let p = GapPayload::from_cbor(plaintext)?;
113        if p.key_phase != current_epoch as u32 {
114            return Err(GapError::EpochStale { kp: p.key_phase, expected: current_epoch as u32 });
115        }
116        let hw = self.in_hw.get(&p.media_source_id).copied().unwrap_or(0);
117        // RFC 3550 §A.1: 16-bit wraparound detection. After 0xFFFF→0x0000
118        // a naive `<= hw` would reject every frame for the rest of the epoch.
119        if p.rtp_sequence <= hw && hw.wrapping_sub(p.rtp_sequence) <= 0x7FFF {
120            return Ok(GapAccept::Late(p));
121        }
122        self.in_hw.insert(p.media_source_id, p.rtp_sequence);
123        Ok(GapAccept::New(p))
124    }
125
126    /// Synchronises the client's view of the group epoch and resets the
127    /// outbound counters and replay window when the epoch has advanced.
128    /// Called automatically by [`GapClient::send`] and [`GapClient::accept`].
129    pub fn sync_epoch(&mut self, epoch: u64) {
130        if Some(epoch) != self.current_epoch {
131            self.out_rtp_seq.clear();
132            self.in_hw.clear();
133            self.current_epoch = Some(epoch);
134        }
135    }
136
137    /// Clears the outbound counters and the replay window unconditionally.
138    pub fn reset(&mut self) {
139        self.out_rtp_seq.clear();
140        self.in_hw.clear();
141        self.current_epoch = None;
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    fn make_payload(seq: u32, key_phase: u32) -> Vec<u8> {
150        crate::GapPayload {
151            media_source_id: 1,
152            rtp_sequence: seq,
153            rtp_timestamp: 960,
154            key_phase,
155            opus_frame: serde_bytes::ByteBuf::from(b"opus-data".to_vec()),
156        }
157        .to_cbor()
158    }
159
160    #[test]
161    fn wraparound_after_ffff_is_accepted() {
162        let mut client = GapClient::new();
163        // Prime the high-water mark near wraparound point.
164        let _ = client.accept(&make_payload(0xFFFE, 1), 1).unwrap();
165        let _ = client.accept(&make_payload(0xFFFF, 1), 1).unwrap();
166        // After wraparound, seq=0 should be accepted as New, not Late.
167        let result = client.accept(&make_payload(0x0000, 1), 1).unwrap();
168        assert!(matches!(result, GapAccept::New(_)), "seq=0 after 0xFFFF must be New");
169    }
170
171    #[test]
172    fn strict_replay_within_window_is_late() {
173        let mut client = GapClient::new();
174        let _ = client.accept(&make_payload(100, 1), 1).unwrap();
175        let result = client.accept(&make_payload(100, 1), 1).unwrap();
176        assert!(matches!(result, GapAccept::Late(_)), "exact dup must be Late");
177    }
178
179    #[test]
180    fn epoch_change_clears_window() {
181        let mut client = GapClient::new();
182        let _ = client.accept(&make_payload(1, 1), 1).unwrap();
183        // Epoch change: seq 1 was seen in epoch 1, but in epoch 2 it's new again.
184        let result = client.accept(&make_payload(1, 2), 2).unwrap();
185        assert!(matches!(result, GapAccept::New(_)), "new epoch resets window");
186    }
187}