1use crate::GtpMessage;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, PayloadCodec, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7
8#[derive(Debug, thiserror::Error)]
10pub enum GtpError {
11 #[error("decode: {0}")]
13 Decode(#[from] CodecError),
14 #[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
16 Duplicate {
17 sender_id: MemberId,
19 message_id: u64,
21 },
22 #[error("node: {0}")]
24 Node(#[from] NodeError),
25}
26
27#[derive(Debug)]
29pub enum GtpAccept {
30 New(GtpMessage),
32 Duplicate(GtpMessage),
34}
35
36const GTP_SEEN_CAP: usize = 10_000;
38
39pub struct GtpClient {
51 seen: BoundedSeen<(MemberId, u64)>,
52 current_epoch: Option<u64>,
53}
54
55impl GtpClient {
56 pub fn new() -> Self {
58 Self {
59 seen: BoundedSeen::new(GTP_SEEN_CAP),
60 current_epoch: None,
61 }
62 }
63
64 pub fn send<S: Sealer>(
71 &mut self,
72 node: &mut GroupNode,
73 seal: &mut S,
74 target: MemberId,
75 message_id: u64,
76 text: &str,
77 codec: PayloadCodec,
78 ) -> Result<OutboundFrame, GtpError> {
79 self.sync_epoch(node.current_epoch);
80 let msg = GtpMessage::plain(node.member_id, message_id, text);
81 let stream_id = node.member_stream_id(1);
82 let of = node.send_payload(
83 seal,
84 target,
85 StreamType::Text,
86 stream_id,
87 GbpFlags::ordered_reliable_ack(),
88 &msg.to_bytes(codec),
89 codec,
90 )?;
91 Ok(of)
92 }
93
94 pub fn accept(
103 &mut self,
104 plaintext: &[u8],
105 current_epoch: u64,
106 codec: PayloadCodec,
107 ) -> Result<GtpAccept, GtpError> {
108 self.sync_epoch(current_epoch);
109 let m = GtpMessage::from_bytes(plaintext, codec)?;
110 let key = (m.sender_id, m.message_id);
111 if !self.seen.insert(key) {
112 return Ok(GtpAccept::Duplicate(m));
113 }
114 Ok(GtpAccept::New(m))
115 }
116
117 pub fn sync_epoch(&mut self, epoch: u64) {
121 if Some(epoch) != self.current_epoch {
122 self.seen.clear();
123 self.current_epoch = Some(epoch);
124 }
125 }
126
127 pub fn reset(&mut self) {
129 self.seen.clear();
130 self.current_epoch = None;
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use crate::GtpMessage;
138
139 fn encode_msg(sender_id: u32, message_id: u64) -> Vec<u8> {
140 GtpMessage::plain(sender_id, message_id, "hello").to_cbor()
141 }
142
143 #[test]
144 fn accept_new_message_returns_new() {
145 let mut client = GtpClient::new();
146 let payload = encode_msg(1, 100);
147 assert!(matches!(
148 client.accept(&payload, 0, PayloadCodec::Cbor).unwrap(),
149 GtpAccept::New(_)
150 ));
151 }
152
153 #[test]
154 fn accept_duplicate_returns_duplicate() {
155 let mut client = GtpClient::new();
156 let payload = encode_msg(1, 100);
157 client.accept(&payload, 0, PayloadCodec::Cbor).unwrap();
158 let result = client.accept(&payload, 0, PayloadCodec::Cbor).unwrap();
159 assert!(matches!(result, GtpAccept::Duplicate(_)));
160 }
161
162 #[test]
163 fn different_message_ids_both_new() {
164 let mut client = GtpClient::new();
165 let p1 = encode_msg(1, 1);
166 let p2 = encode_msg(1, 2);
167 assert!(matches!(client.accept(&p1, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
168 assert!(matches!(client.accept(&p2, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
169 }
170
171 #[test]
172 fn different_senders_same_message_id_both_new() {
173 let mut client = GtpClient::new();
174 let p1 = encode_msg(1, 42);
175 let p2 = encode_msg(2, 42);
176 assert!(matches!(client.accept(&p1, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
177 assert!(matches!(client.accept(&p2, 0, PayloadCodec::Cbor).unwrap(), GtpAccept::New(_)));
178 }
179
180 #[test]
181 fn epoch_advance_clears_seen_set() {
182 let mut client = GtpClient::new();
183 let payload = encode_msg(1, 100);
184 client.accept(&payload, 0, PayloadCodec::Cbor).unwrap();
185 let result = client.accept(&payload, 1, PayloadCodec::Cbor).unwrap();
187 assert!(matches!(result, GtpAccept::New(_)));
188 }
189
190 #[test]
191 fn reset_clears_idempotency_state() {
192 let mut client = GtpClient::new();
193 let payload = encode_msg(7, 999);
194 client.accept(&payload, 5, PayloadCodec::Cbor).unwrap();
195 client.reset();
196 let result = client.accept(&payload, 5, PayloadCodec::Cbor).unwrap();
197 assert!(matches!(result, GtpAccept::New(_)));
198 }
199
200 #[test]
201 fn sync_epoch_same_value_keeps_state() {
202 let mut client = GtpClient::new();
203 let payload = encode_msg(1, 1);
204 client.accept(&payload, 3, PayloadCodec::Cbor).unwrap();
205 client.sync_epoch(3); let result = client.accept(&payload, 3, PayloadCodec::Cbor).unwrap();
207 assert!(matches!(result, GtpAccept::Duplicate(_)));
208 }
209
210 #[test]
211 fn invalid_cbor_returns_decode_error() {
212 let mut client = GtpClient::new();
213 let result = client.accept(b"\xFF\xFF", 0, PayloadCodec::Cbor);
214 assert!(matches!(result, Err(GtpError::Decode(_))));
215 }
216}