use crate::GtpMessage;
use gbp::CodecError;
use gbp_core::{BoundedSeen, GbpFlags, MemberId, StreamType};
use gbp_node::{GroupNode, NodeError, OutboundFrame, Sealer};
#[derive(Debug, thiserror::Error)]
pub enum GtpError {
#[error("decode: {0}")]
Decode(#[from] CodecError),
#[error("duplicate (sender={sender_id}, mid=0x{message_id:X})")]
Duplicate {
sender_id: MemberId,
message_id: u64,
},
#[error("node: {0}")]
Node(#[from] NodeError),
}
#[derive(Debug)]
pub enum GtpAccept {
New(GtpMessage),
Duplicate(GtpMessage),
}
const GTP_SEEN_CAP: usize = 10_000;
pub struct GtpClient {
seen: BoundedSeen<(MemberId, u64)>,
current_epoch: Option<u64>,
}
impl GtpClient {
pub fn new() -> Self {
Self {
seen: BoundedSeen::new(GTP_SEEN_CAP),
current_epoch: None,
}
}
pub fn send<S: Sealer>(
&mut self,
node: &mut GroupNode,
seal: &mut S,
target: MemberId,
message_id: u64,
text: &str,
) -> Result<OutboundFrame, GtpError> {
self.sync_epoch(node.current_epoch);
let msg = GtpMessage::plain(node.member_id, message_id, text);
let stream_id = node.member_stream_id(1);
let of = node.send_payload(
seal,
target,
StreamType::Text,
stream_id,
GbpFlags::ordered_reliable_ack(),
&msg.to_cbor(),
)?;
Ok(of)
}
pub fn accept(&mut self, plaintext: &[u8], current_epoch: u64) -> Result<GtpAccept, GtpError> {
self.sync_epoch(current_epoch);
let m = GtpMessage::from_cbor(plaintext)?;
let key = (m.sender_id, m.message_id);
if !self.seen.insert(key) {
return Ok(GtpAccept::Duplicate(m));
}
Ok(GtpAccept::New(m))
}
pub fn sync_epoch(&mut self, epoch: u64) {
if Some(epoch) != self.current_epoch {
self.seen.clear();
self.current_epoch = Some(epoch);
}
}
pub fn reset(&mut self) {
self.seen.clear();
self.current_epoch = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::GtpMessage;
fn encode_msg(sender_id: u32, message_id: u64) -> Vec<u8> {
GtpMessage::plain(sender_id, message_id, "hello").to_cbor()
}
#[test]
fn accept_new_message_returns_new() {
let mut client = GtpClient::new();
let payload = encode_msg(1, 100);
assert!(matches!(
client.accept(&payload, 0).unwrap(),
GtpAccept::New(_)
));
}
#[test]
fn accept_duplicate_returns_duplicate() {
let mut client = GtpClient::new();
let payload = encode_msg(1, 100);
client.accept(&payload, 0).unwrap();
let result = client.accept(&payload, 0).unwrap();
assert!(matches!(result, GtpAccept::Duplicate(_)));
}
#[test]
fn different_message_ids_both_new() {
let mut client = GtpClient::new();
let p1 = encode_msg(1, 1);
let p2 = encode_msg(1, 2);
assert!(matches!(client.accept(&p1, 0).unwrap(), GtpAccept::New(_)));
assert!(matches!(client.accept(&p2, 0).unwrap(), GtpAccept::New(_)));
}
#[test]
fn different_senders_same_message_id_both_new() {
let mut client = GtpClient::new();
let p1 = encode_msg(1, 42);
let p2 = encode_msg(2, 42);
assert!(matches!(client.accept(&p1, 0).unwrap(), GtpAccept::New(_)));
assert!(matches!(client.accept(&p2, 0).unwrap(), GtpAccept::New(_)));
}
#[test]
fn epoch_advance_clears_seen_set() {
let mut client = GtpClient::new();
let payload = encode_msg(1, 100);
client.accept(&payload, 0).unwrap();
let result = client.accept(&payload, 1).unwrap();
assert!(matches!(result, GtpAccept::New(_)));
}
#[test]
fn reset_clears_idempotency_state() {
let mut client = GtpClient::new();
let payload = encode_msg(7, 999);
client.accept(&payload, 5).unwrap();
client.reset();
let result = client.accept(&payload, 5).unwrap();
assert!(matches!(result, GtpAccept::New(_)));
}
#[test]
fn sync_epoch_same_value_keeps_state() {
let mut client = GtpClient::new();
let payload = encode_msg(1, 1);
client.accept(&payload, 3).unwrap();
client.sync_epoch(3); let result = client.accept(&payload, 3).unwrap();
assert!(matches!(result, GtpAccept::Duplicate(_)));
}
#[test]
fn invalid_cbor_returns_decode_error() {
let mut client = GtpClient::new();
let result = client.accept(b"\xFF\xFF", 0);
assert!(matches!(result, Err(GtpError::Decode(_))));
}
}