gtp/client.rs
1//! Stateful GTP client.
2
3use crate::GtpMessage;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7
8/// Errors returned by [`GtpClient`].
9#[derive(Debug, thiserror::Error)]
10pub enum GtpError {
11 /// Failed to decode the CBOR payload.
12 #[error("decode: {0}")]
13 Decode(#[from] CodecError),
14 /// Duplicate `(sender_id, message_id)` (idempotency).
15 #[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
16 Duplicate {
17 /// Sender member id.
18 sender_id: MemberId,
19 /// Message id.
20 message_id: u64,
21 },
22 /// Underlying GBP node error during send.
23 #[error("node: {0}")]
24 Node(#[from] NodeError),
25}
26
27/// Outcome of accepting a GTP payload.
28#[derive(Debug)]
29pub enum GtpAccept {
30 /// First time `(sender_id, message_id)` is seen.
31 New(GtpMessage),
32 /// `(sender_id, message_id)` was already seen.
33 Duplicate(GtpMessage),
34}
35
36/// Per-epoch message dedup capacity (GTP §5).
37const GTP_SEEN_CAP: usize = 10_000;
38
39/// Stateful GTP client.
40///
41/// Tracks the set of already-seen `(sender_id, message_id)` pairs to enforce
42/// the idempotency contract of GTP §5. The seen-set is LRU-bounded at
43/// [`GTP_SEEN_CAP`] entries per epoch to prevent unbounded memory growth
44/// in long-lived groups.
45///
46/// The client observes the current group epoch on every [`GtpClient::send`]
47/// or [`GtpClient::accept`] call and automatically clears its idempotency
48/// set when the epoch advances. Callers may also drive a reset explicitly
49/// via [`GtpClient::reset`].
50pub struct GtpClient {
51 seen: BoundedSeen<(MemberId, u64)>,
52 current_epoch: Option<u64>,
53}
54
55impl GtpClient {
56 /// Creates an empty client.
57 pub fn new() -> Self {
58 Self { seen: BoundedSeen::new(GTP_SEEN_CAP), current_epoch: None }
59 }
60
61 /// Sends a text message via the given GBP node and AEAD sealer.
62 ///
63 /// Returns a wire-ready [`OutboundFrame`] that the caller MUST hand to the
64 /// transport. Uses the `O | R | A` profile from GTP §5.
65 pub fn send<S: Sealer>(
66 &mut self,
67 node: &mut GroupNode,
68 seal: &mut S,
69 target: MemberId,
70 message_id: u64,
71 text: &str,
72 ) -> Result<OutboundFrame, GtpError> {
73 self.sync_epoch(node.current_epoch);
74 let msg = GtpMessage::plain(node.member_id, message_id, text);
75 let stream_id = node.member_stream_id(1);
76 let of = node.send_payload(
77 seal,
78 target,
79 StreamType::Text,
80 stream_id,
81 GbpFlags::ordered_reliable_ack(),
82 &msg.to_cbor(),
83 )?;
84 Ok(of)
85 }
86
87 /// Accepts a plaintext payload delivered by the GBP layer
88 /// (`Event::PayloadReceived`).
89 ///
90 /// `current_epoch` is the receiver node's current epoch — passing it lets
91 /// the client auto-reset its idempotency set when the epoch advances.
92 /// Returns either [`GtpAccept::New`] or [`GtpAccept::Duplicate`], or
93 /// [`GtpError::Decode`] if the plaintext is not a valid GTP message.
94 pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GtpAccept, GtpError> {
95 self.sync_epoch(current_epoch);
96 let m = GtpMessage::from_cbor(plaintext)?;
97 let key = (m.sender_id, m.message_id);
98 if !self.seen.insert(key) {
99 return Ok(GtpAccept::Duplicate(m));
100 }
101 Ok(GtpAccept::New(m))
102 }
103
104 /// Synchronises the client's view of the group epoch and resets
105 /// idempotency state when the epoch has advanced. Called automatically
106 /// by [`GtpClient::send`] and [`GtpClient::accept`].
107 pub fn sync_epoch(&mut self, epoch: u64) {
108 if Some(epoch) != self.current_epoch {
109 self.seen.clear();
110 self.current_epoch = Some(epoch);
111 }
112 }
113
114 /// Clears the idempotency set unconditionally.
115 pub fn reset(&mut self) {
116 self.seen.clear();
117 self.current_epoch = None;
118 }
119}