memberlist_plumtree/message/
id.rs

1//! Message identifier type for Plumtree protocol.
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use std::{
5    fmt::{self, Debug, Display},
6    hash::{Hash, Hasher},
7    sync::atomic::{AtomicU64, Ordering},
8    time::{SystemTime, UNIX_EPOCH},
9};
10
11/// Counter for generating unique message IDs within a process
12static COUNTER: AtomicU64 = AtomicU64::new(0);
13
14/// Unique identifier for a Plumtree message.
15///
16/// Composed of:
17/// - 8 bytes: timestamp (milliseconds since UNIX epoch)
18/// - 8 bytes: node-local counter
19/// - 8 bytes: random component
20///
21/// This provides uniqueness across nodes and time while being
22/// efficient to generate and compare.
23#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
24pub struct MessageId {
25    timestamp: u64,
26    counter: u64,
27    random: u64,
28}
29
30impl MessageId {
31    /// Size of the message ID in bytes when encoded.
32    pub const ENCODED_SIZE: usize = 24;
33
34    /// Create a new unique message ID.
35    pub fn new() -> Self {
36        let timestamp = SystemTime::now()
37            .duration_since(UNIX_EPOCH)
38            .map(|d| d.as_millis() as u64)
39            .unwrap_or(0);
40
41        let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
42        let random = rand::random::<u64>();
43
44        Self {
45            timestamp,
46            counter,
47            random,
48        }
49    }
50
51    /// Create a message ID from raw components (for testing/deserialization).
52    pub const fn from_parts(timestamp: u64, counter: u64, random: u64) -> Self {
53        Self {
54            timestamp,
55            counter,
56            random,
57        }
58    }
59
60    /// Get the timestamp component.
61    #[inline]
62    pub const fn timestamp(&self) -> u64 {
63        self.timestamp
64    }
65
66    /// Get the counter component.
67    #[inline]
68    pub const fn counter(&self) -> u64 {
69        self.counter
70    }
71
72    /// Get the random component.
73    #[inline]
74    pub const fn random(&self) -> u64 {
75        self.random
76    }
77
78    /// Encode the message ID into bytes.
79    pub fn encode(&self, buf: &mut impl BufMut) {
80        buf.put_u64(self.timestamp);
81        buf.put_u64(self.counter);
82        buf.put_u64(self.random);
83    }
84
85    /// Encode the message ID into a new Bytes buffer.
86    pub fn encode_to_bytes(&self) -> Bytes {
87        let mut buf = BytesMut::with_capacity(Self::ENCODED_SIZE);
88        self.encode(&mut buf);
89        buf.freeze()
90    }
91
92    /// Decode a message ID from bytes.
93    ///
94    /// Returns `None` if the buffer is too small.
95    pub fn decode(buf: &mut impl Buf) -> Option<Self> {
96        if buf.remaining() < Self::ENCODED_SIZE {
97            return None;
98        }
99
100        Some(Self {
101            timestamp: buf.get_u64(),
102            counter: buf.get_u64(),
103            random: buf.get_u64(),
104        })
105    }
106
107    /// Decode a message ID from a byte slice.
108    pub fn decode_from_slice(data: &[u8]) -> Option<Self> {
109        if data.len() < Self::ENCODED_SIZE {
110            return None;
111        }
112
113        let mut cursor = std::io::Cursor::new(data);
114        Self::decode(&mut cursor)
115    }
116}
117
118impl Default for MessageId {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl Hash for MessageId {
125    fn hash<H: Hasher>(&self, state: &mut H) {
126        // Use all components for hashing
127        state.write_u64(self.timestamp);
128        state.write_u64(self.counter);
129        state.write_u64(self.random);
130    }
131}
132
133impl Debug for MessageId {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        write!(
136            f,
137            "MessageId({:016x}-{:016x}-{:016x})",
138            self.timestamp, self.counter, self.random
139        )
140    }
141}
142
143impl Display for MessageId {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        // Shortened display format
146        write!(
147            f,
148            "{:08x}{:04x}{:04x}",
149            (self.timestamp & 0xFFFFFFFF) as u32,
150            (self.counter & 0xFFFF) as u16,
151            (self.random & 0xFFFF) as u16
152        )
153    }
154}
155
156#[cfg(feature = "serde")]
157impl serde::Serialize for MessageId {
158    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
159    where
160        S: serde::Serializer,
161    {
162        if serializer.is_human_readable() {
163            serializer.serialize_str(&self.to_string())
164        } else {
165            let bytes = self.encode_to_bytes();
166            serializer.serialize_bytes(&bytes)
167        }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn test_message_id_uniqueness() {
177        let ids: Vec<MessageId> = (0..1000).map(|_| MessageId::new()).collect();
178
179        // All IDs should be unique
180        let mut seen = std::collections::HashSet::new();
181        for id in &ids {
182            assert!(seen.insert(*id), "Duplicate MessageId generated");
183        }
184    }
185
186    #[test]
187    fn test_message_id_encoding() {
188        let id = MessageId::new();
189        let encoded = id.encode_to_bytes();
190
191        assert_eq!(encoded.len(), MessageId::ENCODED_SIZE);
192
193        let decoded = MessageId::decode_from_slice(&encoded).unwrap();
194        assert_eq!(id, decoded);
195    }
196
197    #[test]
198    fn test_message_id_ordering() {
199        let id1 = MessageId::from_parts(100, 0, 0);
200        let id2 = MessageId::from_parts(200, 0, 0);
201
202        assert!(id1 < id2);
203    }
204}