Skip to main content

gsp/
client.rs

1//! Stateful GSP client.
2
3use crate::GspSignal;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, SignalType, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7use std::collections::HashSet;
8
9/// Errors returned by [`GspClient`].
10#[derive(Debug, thiserror::Error)]
11pub enum GspError {
12    /// Failed to decode the CBOR payload.
13    #[error("decode: {0}")]
14    Decode(#[from] CodecError),
15    /// `signal_type` is not in the registry.
16    #[error("unknown signal_type: {0}")]
17    UnknownSignal(u32),
18    /// Duplicate `request_id`.
19    #[error("duplicate request_id: {0}")]
20    DuplicateRequest(u32),
21    /// Underlying GBP node error during send.
22    #[error("node: {0}")]
23    Node(#[from] NodeError),
24}
25
26/// Accepted signal: decoded fields plus the local state effects already
27/// applied by the client.
28#[derive(Debug, Clone)]
29pub struct GspAccept {
30    /// Decoded signal type.
31    pub signal: SignalType,
32    /// Sender member id.
33    pub sender_id: MemberId,
34    /// Claimed role (used by `ROLE_CHANGE`).
35    pub role_claim: u32,
36    /// Request id.
37    pub request_id: u32,
38}
39
40/// Per-epoch request dedup capacity (GSP §5).
41const GSP_SEEN_CAP: usize = 10_000;
42
43/// Stateful GSP client.
44///
45/// Tracks `request_id` deduplication, the current membership set and the
46/// mute-list. Membership is updated atomically when JOIN, LEAVE, MUTE or
47/// UNMUTE signals are accepted. The `request_id` set is LRU-bounded at
48/// [`GSP_SEEN_CAP`] entries per epoch.
49///
50/// The client observes the current group epoch on every [`GspClient::send`]
51/// or [`GspClient::accept`] call and automatically clears its
52/// `request_id` deduplication set when the epoch advances. Callers may also
53/// drive a reset explicitly via [`GspClient::reset`].
54pub struct GspClient {
55    seen_requests: BoundedSeen<u32>,
56    /// Members that are currently muted.
57    pub muted: HashSet<MemberId>,
58    /// Current membership set, driven by JOIN / LEAVE.
59    pub members: HashSet<MemberId>,
60    current_epoch: Option<u64>,
61}
62
63impl GspClient {
64    /// Creates an empty client.
65    pub fn new() -> Self {
66        Self {
67            seen_requests: BoundedSeen::new(GSP_SEEN_CAP),
68            muted: HashSet::new(),
69            members: HashSet::new(),
70            current_epoch: None,
71        }
72    }
73
74    /// Sends a signal. Uses the `O | R | A` profile required by GSP §3.
75    pub fn send<S: Sealer>(
76        &mut self,
77        node: &mut GroupNode,
78        seal: &mut S,
79        target: MemberId,
80        signal: SignalType,
81        role_claim: u32,
82        request_id: u32,
83    ) -> Result<OutboundFrame, GspError> {
84        self.sync_epoch(node.current_epoch);
85        let mut sig = GspSignal::bare(signal as u32, request_id, node.member_id);
86        sig.role_claim = role_claim;
87        let stream_id = node.member_stream_id(3);
88        Ok(node.send_payload(
89            seal,
90            target,
91            StreamType::Signal,
92            stream_id,
93            GbpFlags::ordered_reliable_ack(),
94            &sig.to_cbor(),
95        )?)
96    }
97
98    /// Accepts a signal payload, applies the state effects defined in GSP §5
99    /// and returns the decoded [`GspAccept`].
100    ///
101    /// `current_epoch` is the receiver node's current epoch — passing it lets
102    /// the client auto-reset its `request_id` deduplication set when the
103    /// epoch advances.
104    pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GspAccept, GspError> {
105        self.sync_epoch(current_epoch);
106        let s = GspSignal::from_cbor(plaintext)?;
107        let signal = SignalType::try_from(s.signal_type).map_err(GspError::UnknownSignal)?;
108        if !self.seen_requests.insert(s.request_id) {
109            return Err(GspError::DuplicateRequest(s.request_id));
110        }
111        match signal {
112            SignalType::Join => {
113                self.members.insert(s.sender_id);
114            }
115            SignalType::Leave => {
116                self.members.remove(&s.sender_id);
117                self.muted.remove(&s.sender_id);
118            }
119            SignalType::Mute => {
120                self.muted.insert(s.sender_id);
121            }
122            SignalType::Unmute => {
123                self.muted.remove(&s.sender_id);
124            }
125            _ => {}
126        }
127        Ok(GspAccept {
128            signal,
129            sender_id: s.sender_id,
130            role_claim: s.role_claim,
131            request_id: s.request_id,
132        })
133    }
134
135    /// Synchronises the client's view of the group epoch and resets the
136    /// `request_id` deduplication set when the epoch has advanced. Called
137    /// automatically by [`GspClient::send`] and [`GspClient::accept`].
138    pub fn sync_epoch(&mut self, epoch: u64) {
139        if Some(epoch) != self.current_epoch {
140            self.seen_requests.clear();
141            self.current_epoch = Some(epoch);
142        }
143    }
144
145    /// Clears the request-id deduplication set unconditionally.
146    pub fn reset(&mut self) {
147        self.seen_requests.clear();
148        self.current_epoch = None;
149    }
150}