Skip to main content

gtp/
client.rs

1//! Stateful GTP client.
2
3use crate::GtpMessage;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7
8/// Errors returned by [`GtpClient`].
9#[derive(Debug, thiserror::Error)]
10pub enum GtpError {
11    /// Failed to decode the CBOR payload.
12    #[error("decode: {0}")]
13    Decode(#[from] CodecError),
14    /// Duplicate `(sender_id, message_id)` (idempotency).
15    #[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
16    Duplicate {
17        /// Sender member id.
18        sender_id: MemberId,
19        /// Message id.
20        message_id: u64,
21    },
22    /// Underlying GBP node error during send.
23    #[error("node: {0}")]
24    Node(#[from] NodeError),
25}
26
27/// Outcome of accepting a GTP payload.
28#[derive(Debug)]
29pub enum GtpAccept {
30    /// First time `(sender_id, message_id)` is seen.
31    New(GtpMessage),
32    /// `(sender_id, message_id)` was already seen.
33    Duplicate(GtpMessage),
34}
35
36/// Per-epoch message dedup capacity (GTP §5).
37const GTP_SEEN_CAP: usize = 10_000;
38
39/// Stateful GTP client.
40///
41/// Tracks the set of already-seen `(sender_id, message_id)` pairs to enforce
42/// the idempotency contract of GTP §5. The seen-set is LRU-bounded at
43/// [`GTP_SEEN_CAP`] entries per epoch to prevent unbounded memory growth
44/// in long-lived groups.
45///
46/// The client observes the current group epoch on every [`GtpClient::send`]
47/// or [`GtpClient::accept`] call and automatically clears its idempotency
48/// set when the epoch advances. Callers may also drive a reset explicitly
49/// via [`GtpClient::reset`].
50pub struct GtpClient {
51    seen: BoundedSeen<(MemberId, u64)>,
52    current_epoch: Option<u64>,
53}
54
55impl GtpClient {
56    /// Creates an empty client.
57    pub fn new() -> Self {
58        Self {
59            seen: BoundedSeen::new(GTP_SEEN_CAP),
60            current_epoch: None,
61        }
62    }
63
64    /// Sends a text message via the given GBP node and AEAD sealer.
65    ///
66    /// Returns a wire-ready [`OutboundFrame`] that the caller MUST hand to the
67    /// transport. Uses the `O | R | A` profile from GTP §5.
68    pub fn send<S: Sealer>(
69        &mut self,
70        node: &mut GroupNode,
71        seal: &mut S,
72        target: MemberId,
73        message_id: u64,
74        text: &str,
75    ) -> Result<OutboundFrame, GtpError> {
76        self.sync_epoch(node.current_epoch);
77        let msg = GtpMessage::plain(node.member_id, message_id, text);
78        let stream_id = node.member_stream_id(1);
79        let of = node.send_payload(
80            seal,
81            target,
82            StreamType::Text,
83            stream_id,
84            GbpFlags::ordered_reliable_ack(),
85            &msg.to_cbor(),
86        )?;
87        Ok(of)
88    }
89
90    /// Accepts a plaintext payload delivered by the GBP layer
91    /// (`Event::PayloadReceived`).
92    ///
93    /// `current_epoch` is the receiver node's current epoch — passing it lets
94    /// the client auto-reset its idempotency set when the epoch advances.
95    /// Returns either [`GtpAccept::New`] or [`GtpAccept::Duplicate`], or
96    /// [`GtpError::Decode`] if the plaintext is not a valid GTP message.
97    pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GtpAccept, GtpError> {
98        self.sync_epoch(current_epoch);
99        let m = GtpMessage::from_cbor(plaintext)?;
100        let key = (m.sender_id, m.message_id);
101        if !self.seen.insert(key) {
102            return Ok(GtpAccept::Duplicate(m));
103        }
104        Ok(GtpAccept::New(m))
105    }
106
107    /// Synchronises the client's view of the group epoch and resets
108    /// idempotency state when the epoch has advanced. Called automatically
109    /// by [`GtpClient::send`] and [`GtpClient::accept`].
110    pub fn sync_epoch(&mut self, epoch: u64) {
111        if Some(epoch) != self.current_epoch {
112            self.seen.clear();
113            self.current_epoch = Some(epoch);
114        }
115    }
116
117    /// Clears the idempotency set unconditionally.
118    pub fn reset(&mut self) {
119        self.seen.clear();
120        self.current_epoch = None;
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use crate::GtpMessage;
128
129    fn encode_msg(sender_id: u32, message_id: u64) -> Vec<u8> {
130        GtpMessage::plain(sender_id, message_id, "hello").to_cbor()
131    }
132
133    #[test]
134    fn accept_new_message_returns_new() {
135        let mut client = GtpClient::new();
136        let payload = encode_msg(1, 100);
137        assert!(matches!(
138            client.accept(&payload, 0).unwrap(),
139            GtpAccept::New(_)
140        ));
141    }
142
143    #[test]
144    fn accept_duplicate_returns_duplicate() {
145        let mut client = GtpClient::new();
146        let payload = encode_msg(1, 100);
147        client.accept(&payload, 0).unwrap();
148        let result = client.accept(&payload, 0).unwrap();
149        assert!(matches!(result, GtpAccept::Duplicate(_)));
150    }
151
152    #[test]
153    fn different_message_ids_both_new() {
154        let mut client = GtpClient::new();
155        let p1 = encode_msg(1, 1);
156        let p2 = encode_msg(1, 2);
157        assert!(matches!(client.accept(&p1, 0).unwrap(), GtpAccept::New(_)));
158        assert!(matches!(client.accept(&p2, 0).unwrap(), GtpAccept::New(_)));
159    }
160
161    #[test]
162    fn different_senders_same_message_id_both_new() {
163        let mut client = GtpClient::new();
164        let p1 = encode_msg(1, 42);
165        let p2 = encode_msg(2, 42);
166        assert!(matches!(client.accept(&p1, 0).unwrap(), GtpAccept::New(_)));
167        assert!(matches!(client.accept(&p2, 0).unwrap(), GtpAccept::New(_)));
168    }
169
170    #[test]
171    fn epoch_advance_clears_seen_set() {
172        let mut client = GtpClient::new();
173        let payload = encode_msg(1, 100);
174        client.accept(&payload, 0).unwrap();
175        // same message, new epoch → New again
176        let result = client.accept(&payload, 1).unwrap();
177        assert!(matches!(result, GtpAccept::New(_)));
178    }
179
180    #[test]
181    fn reset_clears_idempotency_state() {
182        let mut client = GtpClient::new();
183        let payload = encode_msg(7, 999);
184        client.accept(&payload, 5).unwrap();
185        client.reset();
186        let result = client.accept(&payload, 5).unwrap();
187        assert!(matches!(result, GtpAccept::New(_)));
188    }
189
190    #[test]
191    fn sync_epoch_same_value_keeps_state() {
192        let mut client = GtpClient::new();
193        let payload = encode_msg(1, 1);
194        client.accept(&payload, 3).unwrap();
195        client.sync_epoch(3); // same epoch — does not clear
196        let result = client.accept(&payload, 3).unwrap();
197        assert!(matches!(result, GtpAccept::Duplicate(_)));
198    }
199
200    #[test]
201    fn invalid_cbor_returns_decode_error() {
202        let mut client = GtpClient::new();
203        let result = client.accept(b"\xFF\xFF", 0);
204        assert!(matches!(result, Err(GtpError::Decode(_))));
205    }
206}