Skip to main content

gtp/
client.rs

1//! Stateful GTP client.
2
3use crate::GtpMessage;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, PayloadCodec, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7
8/// Errors returned by [`GtpClient`].
9#[derive(Debug, thiserror::Error)]
10pub enum GtpError {
11    /// Failed to decode the CBOR payload.
12    #[error("decode: {0}")]
13    Decode(#[from] CodecError),
14    /// Duplicate `(sender_id, message_id)` (idempotency).
15    #[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
16    Duplicate {
17        /// Sender member id.
18        sender_id: MemberId,
19        /// Message id.
20        message_id: u64,
21    },
22    /// Underlying GBP node error during send.
23    #[error("node: {0}")]
24    Node(#[from] NodeError),
25}
26
27/// Outcome of accepting a GTP payload.
28#[derive(Debug)]
29pub enum GtpAccept {
30    /// First time `(sender_id, message_id)` is seen.
31    New(GtpMessage),
32    /// `(sender_id, message_id)` was already seen.
33    Duplicate(GtpMessage),
34}
35
36/// Per-epoch message dedup capacity (GTP §5).
37const GTP_SEEN_CAP: usize = 10_000;
38
39/// Stateful GTP client.
40///
41/// Tracks the set of already-seen `(sender_id, message_id)` pairs to enforce
42/// the idempotency contract of GTP §5. The seen-set is LRU-bounded at
43/// [`GTP_SEEN_CAP`] entries per epoch to prevent unbounded memory growth
44/// in long-lived groups.
45///
46/// The client observes the current group epoch on every [`GtpClient::send`]
47/// or [`GtpClient::accept`] call and automatically clears its idempotency
48/// set when the epoch advances. Callers may also drive a reset explicitly
49/// via [`GtpClient::reset`].
50pub struct GtpClient {
51    seen: BoundedSeen<(MemberId, u64)>,
52    current_epoch: Option<u64>,
53}
54
55impl GtpClient {
56    /// Creates an empty client.
57    pub fn new() -> Self {
58        Self {
59            seen: BoundedSeen::new(GTP_SEEN_CAP),
60            current_epoch: None,
61        }
62    }
63
64    /// Sends a text message via the given GBP node and AEAD sealer.
65    ///
66    /// Returns a wire-ready [`OutboundFrame`] that the caller MUST hand to the
67    /// transport. Uses the `O | R | A` profile from GTP §5.
68    /// `codec` controls how the [`GtpMessage`] payload is encoded inside the
69    /// GBP frame; use [`PayloadCodec::Cbor`] for maximum compatibility.
70    pub fn send<S: Sealer>(
71        &mut self,
72        node: &mut GroupNode,
73        seal: &mut S,
74        target: MemberId,
75        message_id: u64,
76        text: &str,
77        codec: PayloadCodec,
78    ) -> Result<OutboundFrame, GtpError> {
79        self.sync_epoch(node.current_epoch);
80        let msg = GtpMessage::plain(node.member_id, message_id, text);
81        let stream_id = node.member_stream_id(1);
82        let of = node.send_payload(
83            seal,
84            target,
85            StreamType::Text,
86            stream_id,
87            GbpFlags::ordered_reliable_ack(),
88            &msg.to_bytes(codec),
89            codec,
90        )?;
91        Ok(of)
92    }
93
94    /// Accepts a plaintext payload delivered by the GBP layer
95    /// (`Event::PayloadReceived`).
96    ///
97    /// `current_epoch` is the receiver node's current epoch — passing it lets
98    /// the client auto-reset its idempotency set when the epoch advances.
99    /// `codec` must match the value from [`DeliveredPayload::codec`].
100    /// Returns either [`GtpAccept::New`] or [`GtpAccept::Duplicate`], or
101    /// [`GtpError::Decode`] if the plaintext is not a valid GTP message.
102    pub fn accept(
103        &mut self,
104        plaintext: &[u8],
105        current_epoch: u64,
106        codec: PayloadCodec,
107    ) -> Result<GtpAccept, GtpError> {
108        self.sync_epoch(current_epoch);
109        let m = GtpMessage::from_bytes(plaintext, codec)?;
110        let key = (m.sender_id, m.message_id);
111        if !self.seen.insert(key) {
112            return Ok(GtpAccept::Duplicate(m));
113        }
114        Ok(GtpAccept::New(m))
115    }
116
117    /// Synchronises the client's view of the group epoch and resets
118    /// idempotency state when the epoch has advanced. Called automatically
119    /// by [`GtpClient::send`] and [`GtpClient::accept`].
120    pub fn sync_epoch(&mut self, epoch: u64) {
121        if Some(epoch) != self.current_epoch {
122            self.seen.clear();
123            self.current_epoch = Some(epoch);
124        }
125    }
126
127    /// Clears the idempotency set unconditionally.
128    pub fn reset(&mut self) {
129        self.seen.clear();
130        self.current_epoch = None;
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use crate::GtpMessage;
138
139    fn encode_msg(sender_id: u32, message_id: u64) -> Vec<u8> {
140        GtpMessage::plain(sender_id, message_id, "hello").to_cbor()
141    }
142
143    #[test]
144    fn accept_new_message_returns_new() {
145        let mut client = GtpClient::new();
146        let payload = encode_msg(1, 100);
147        assert!(matches!(
148            client.accept(&payload, 0, PayloadCodec::Cbor).unwrap(),
149            GtpAccept::New(_)
150        ));
151    }
152
153    #[test]
154    fn accept_duplicate_returns_duplicate() {
155        let mut client = GtpClient::new();
156        let payload = encode_msg(1, 100);
157        client.accept(&payload, 0, PayloadCodec::Cbor).unwrap();
158        let result = client.accept(&payload, 0, PayloadCodec::Cbor).unwrap();
159        assert!(matches!(result, GtpAccept::Duplicate(_)));
160    }
161
162    #[test]
163    fn different_message_ids_both_new() {
164        let mut client = GtpClient::new();
165        let p1 = encode_msg(1, 1);
166        let p2 = encode_msg(1, 2);
167        assert!(matches!(client.accept(&p1, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
168        assert!(matches!(client.accept(&p2, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
169    }
170
171    #[test]
172    fn different_senders_same_message_id_both_new() {
173        let mut client = GtpClient::new();
174        let p1 = encode_msg(1, 42);
175        let p2 = encode_msg(2, 42);
176        assert!(matches!(client.accept(&p1, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
177        assert!(matches!(client.accept(&p2, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
178    }
179
180    #[test]
181    fn epoch_advance_clears_seen_set() {
182        let mut client = GtpClient::new();
183        let payload = encode_msg(1, 100);
184        client.accept(&payload, 0, PayloadCodec::Cbor).unwrap();
185        // same message, new epoch → New again
186        let result = client.accept(&payload, 1, PayloadCodec::Cbor).unwrap();
187        assert!(matches!(result, GtpAccept::New(_)));
188    }
189
190    #[test]
191    fn reset_clears_idempotency_state() {
192        let mut client = GtpClient::new();
193        let payload = encode_msg(7, 999);
194        client.accept(&payload, 5, PayloadCodec::Cbor).unwrap();
195        client.reset();
196        let result = client.accept(&payload, 5, PayloadCodec::Cbor).unwrap();
197        assert!(matches!(result, GtpAccept::New(_)));
198    }
199
200    #[test]
201    fn sync_epoch_same_value_keeps_state() {
202        let mut client = GtpClient::new();
203        let payload = encode_msg(1, 1);
204        client.accept(&payload, 3, PayloadCodec::Cbor).unwrap();
205        client.sync_epoch(3); // same epoch — does not clear
206        let result = client.accept(&payload, 3, PayloadCodec::Cbor).unwrap();
207        assert!(matches!(result, GtpAccept::Duplicate(_)));
208    }
209
210    #[test]
211    fn invalid_cbor_returns_decode_error() {
212        let mut client = GtpClient::new();
213        let result = client.accept(b"\xFF\xFF", 0, PayloadCodec::Cbor);
214        assert!(matches!(result, Err(GtpError::Decode(_))));
215    }
216}