1use crate::GtpMessage;
4use gbp::CodecError;
5use gbp_core::{BoundedSeen, GbpFlags, MemberId, StreamType};
6use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
7
8#[derive(Debug, thiserror::Error)]
10pub enum GtpError {
11 #[error("decode: {0}")]
13 Decode(#[from] CodecError),
14 #[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
16 Duplicate {
17 sender_id: MemberId,
19 message_id: u64,
21 },
22 #[error("node: {0}")]
24 Node(#[from] NodeError),
25}
26
27#[derive(Debug)]
29pub enum GtpAccept {
30 New(GtpMessage),
32 Duplicate(GtpMessage),
34}
35
36const GTP_SEEN_CAP: usize = 10_000;
38
39pub struct GtpClient {
51 seen: BoundedSeen<(MemberId, u64)>,
52 current_epoch: Option<u64>,
53}
54
55impl GtpClient {
56 pub fn new() -> Self {
58 Self {
59 seen: BoundedSeen::new(GTP_SEEN_CAP),
60 current_epoch: None,
61 }
62 }
63
64 pub fn send<S: Sealer>(
69 &mut self,
70 node: &mut GroupNode,
71 seal: &mut S,
72 target: MemberId,
73 message_id: u64,
74 text: &str,
75 ) -> Result<OutboundFrame, GtpError> {
76 self.sync_epoch(node.current_epoch);
77 let msg = GtpMessage::plain(node.member_id, message_id, text);
78 let stream_id = node.member_stream_id(1);
79 let of = node.send_payload(
80 seal,
81 target,
82 StreamType::Text,
83 stream_id,
84 GbpFlags::ordered_reliable_ack(),
85 &msg.to_cbor(),
86 )?;
87 Ok(of)
88 }
89
90 pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GtpAccept, GtpError> {
98 self.sync_epoch(current_epoch);
99 let m = GtpMessage::from_cbor(plaintext)?;
100 let key = (m.sender_id, m.message_id);
101 if !self.seen.insert(key) {
102 return Ok(GtpAccept::Duplicate(m));
103 }
104 Ok(GtpAccept::New(m))
105 }
106
107 pub fn sync_epoch(&mut self, epoch: u64) {
111 if Some(epoch) != self.current_epoch {
112 self.seen.clear();
113 self.current_epoch = Some(epoch);
114 }
115 }
116
117 pub fn reset(&mut self) {
119 self.seen.clear();
120 self.current_epoch = None;
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use crate::GtpMessage;
128
129 fn encode_msg(sender_id: u32, message_id: u64) -> Vec<u8> {
130 GtpMessage::plain(sender_id, message_id, "hello").to_cbor()
131 }
132
133 #[test]
134 fn accept_new_message_returns_new() {
135 let mut client = GtpClient::new();
136 let payload = encode_msg(1, 100);
137 assert!(matches!(
138 client.accept(&payload, 0).unwrap(),
139 GtpAccept::New(_)
140 ));
141 }
142
143 #[test]
144 fn accept_duplicate_returns_duplicate() {
145 let mut client = GtpClient::new();
146 let payload = encode_msg(1, 100);
147 client.accept(&payload, 0).unwrap();
148 let result = client.accept(&payload, 0).unwrap();
149 assert!(matches!(result, GtpAccept::Duplicate(_)));
150 }
151
152 #[test]
153 fn different_message_ids_both_new() {
154 let mut client = GtpClient::new();
155 let p1 = encode_msg(1, 1);
156 let p2 = encode_msg(1, 2);
157 assert!(matches!(client.accept(&p1, 0).unwrap(), GtpAccept::New(_)));
158 assert!(matches!(client.accept(&p2, 0).unwrap(), GtpAccept::New(_)));
159 }
160
161 #[test]
162 fn different_senders_same_message_id_both_new() {
163 let mut client = GtpClient::new();
164 let p1 = encode_msg(1, 42);
165 let p2 = encode_msg(2, 42);
166 assert!(matches!(client.accept(&p1, 0).unwrap(), GtpAccept::New(_)));
167 assert!(matches!(client.accept(&p2, 0).unwrap(), GtpAccept::New(_)));
168 }
169
170 #[test]
171 fn epoch_advance_clears_seen_set() {
172 let mut client = GtpClient::new();
173 let payload = encode_msg(1, 100);
174 client.accept(&payload, 0).unwrap();
175 let result = client.accept(&payload, 1).unwrap();
177 assert!(matches!(result, GtpAccept::New(_)));
178 }
179
180 #[test]
181 fn reset_clears_idempotency_state() {
182 let mut client = GtpClient::new();
183 let payload = encode_msg(7, 999);
184 client.accept(&payload, 5).unwrap();
185 client.reset();
186 let result = client.accept(&payload, 5).unwrap();
187 assert!(matches!(result, GtpAccept::New(_)));
188 }
189
190 #[test]
191 fn sync_epoch_same_value_keeps_state() {
192 let mut client = GtpClient::new();
193 let payload = encode_msg(1, 1);
194 client.accept(&payload, 3).unwrap();
195 client.sync_epoch(3); let result = client.accept(&payload, 3).unwrap();
197 assert!(matches!(result, GtpAccept::Duplicate(_)));
198 }
199
200 #[test]
201 fn invalid_cbor_returns_decode_error() {
202 let mut client = GtpClient::new();
203 let result = client.accept(b"\xFF\xFF", 0);
204 assert!(matches!(result, Err(GtpError::Decode(_))));
205 }
206}