Skip to main content

gtp/
client.rs

1//! Stateful GTP client.
2
3use crate::GtpMessage;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7
8/// Errors returned by [`GtpClient`].
9#[derive(Debug, thiserror::Error)]
10pub enum GtpError {
11    /// Failed to decode the CBOR payload.
12    #[error("decode: {0}")]
13    Decode(#[from] CodecError),
14    /// Duplicate `(sender_id, message_id)` (idempotency).
15    #[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
16    Duplicate {
17        /// Sender member id.
18        sender_id: MemberId,
19        /// Message id.
20        message_id: u64,
21    },
22    /// Underlying GBP node error during send.
23    #[error("node: {0}")]
24    Node(#[from] NodeError),
25}
26
27/// Outcome of accepting a GTP payload.
28#[derive(Debug)]
29pub enum GtpAccept {
30    /// First time `(sender_id, message_id)` is seen.
31    New(GtpMessage),
32    /// `(sender_id, message_id)` was already seen.
33    Duplicate(GtpMessage),
34}
35
36/// Per-epoch message dedup capacity (GTP §5).
37const GTP_SEEN_CAP: usize = 10_000;
38
39/// Stateful GTP client.
40///
41/// Tracks the set of already-seen `(sender_id, message_id)` pairs to enforce
42/// the idempotency contract of GTP §5. The seen-set is LRU-bounded at
43/// [`GTP_SEEN_CAP`] entries per epoch to prevent unbounded memory growth
44/// in long-lived groups.
45///
46/// The client observes the current group epoch on every [`GtpClient::send`]
47/// or [`GtpClient::accept`] call and automatically clears its idempotency
48/// set when the epoch advances. Callers may also drive a reset explicitly
49/// via [`GtpClient::reset`].
50pub struct GtpClient {
51    seen: BoundedSeen<(MemberId, u64)>,
52    current_epoch: Option<u64>,
53}
54
55impl GtpClient {
56    /// Creates an empty client.
57    pub fn new() -> Self {
58        Self { seen: BoundedSeen::new(GTP_SEEN_CAP), current_epoch: None }
59    }
60
61    /// Sends a text message via the given GBP node and AEAD sealer.
62    ///
63    /// Returns a wire-ready [`OutboundFrame`] that the caller MUST hand to the
64    /// transport. Uses the `O | R | A` profile from GTP §5.
65    pub fn send<S: Sealer>(
66        &mut self,
67        node: &mut GroupNode,
68        seal: &mut S,
69        target: MemberId,
70        message_id: u64,
71        text: &str,
72    ) -> Result<OutboundFrame, GtpError> {
73        self.sync_epoch(node.current_epoch);
74        let msg = GtpMessage::plain(node.member_id, message_id, text);
75        let stream_id = node.member_stream_id(1);
76        let of = node.send_payload(
77            seal,
78            target,
79            StreamType::Text,
80            stream_id,
81            GbpFlags::ordered_reliable_ack(),
82            &msg.to_cbor(),
83        )?;
84        Ok(of)
85    }
86
87    /// Accepts a plaintext payload delivered by the GBP layer
88    /// (`Event::PayloadReceived`).
89    ///
90    /// `current_epoch` is the receiver node's current epoch — passing it lets
91    /// the client auto-reset its idempotency set when the epoch advances.
92    /// Returns either [`GtpAccept::New`] or [`GtpAccept::Duplicate`], or
93    /// [`GtpError::Decode`] if the plaintext is not a valid GTP message.
94    pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GtpAccept, GtpError> {
95        self.sync_epoch(current_epoch);
96        let m = GtpMessage::from_cbor(plaintext)?;
97        let key = (m.sender_id, m.message_id);
98        if !self.seen.insert(key) {
99            return Ok(GtpAccept::Duplicate(m));
100        }
101        Ok(GtpAccept::New(m))
102    }
103
104    /// Synchronises the client's view of the group epoch and resets
105    /// idempotency state when the epoch has advanced. Called automatically
106    /// by [`GtpClient::send`] and [`GtpClient::accept`].
107    pub fn sync_epoch(&mut self, epoch: u64) {
108        if Some(epoch) != self.current_epoch {
109            self.seen.clear();
110            self.current_epoch = Some(epoch);
111        }
112    }
113
114    /// Clears the idempotency set unconditionally.
115    pub fn reset(&mut self) {
116        self.seen.clear();
117        self.current_epoch = None;
118    }
119}