bitcoincore_zmq/
message.rs

1use crate::{
2    error::{Error, Result},
3    sequence_message::SequenceMessage,
4};
5use bitcoin::{
6    consensus::{deserialize, serialize},
7    hashes::Hash,
8    Block, BlockHash, Transaction, Txid, Weight,
9};
10use core::{cmp::min, fmt};
11
12pub const TOPIC_MAX_LEN: usize = 9;
13pub const DATA_MAX_LEN: usize = Weight::MAX_BLOCK.to_wu() as usize;
14pub const SEQUENCE_LEN: usize = 4;
15
16#[derive(Debug, PartialEq, Eq, Clone)]
17pub enum Message {
18    HashBlock(BlockHash, u32),
19    HashTx(Txid, u32),
20    Block(Block, u32),
21    Tx(Transaction, u32),
22    Sequence(SequenceMessage, u32),
23}
24
25impl Message {
26    /// Returns the topic of this [`Message`] as a byte slice.
27    #[inline]
28    pub fn topic(&self) -> &'static [u8] {
29        self.topic_str().as_bytes()
30    }
31
32    /// Returns the topic of this [`Message`] as a string slice.
33    #[inline]
34    pub fn topic_str(&self) -> &'static str {
35        let topic = match self {
36            Self::HashBlock(..) => "hashblock",
37            Self::HashTx(..) => "hashtx",
38            Self::Block(..) => "rawblock",
39            Self::Tx(..) => "rawtx",
40            Self::Sequence(..) => "sequence",
41        };
42
43        debug_assert!(topic.len() <= TOPIC_MAX_LEN);
44
45        topic
46    }
47
48    /// Serializes the middle part of this [`Message`] (no topic and sequence).
49    #[inline]
50    pub fn serialize_data_to_vec(&self) -> Vec<u8> {
51        match self {
52            Self::HashBlock(blockhash, _) => {
53                let mut arr = blockhash.to_byte_array();
54                arr.reverse();
55                arr.to_vec()
56            }
57            Self::HashTx(txid, _) => {
58                let mut arr = txid.to_byte_array();
59                arr.reverse();
60                arr.to_vec()
61            }
62            Self::Block(block, _) => serialize(&block),
63            Self::Tx(tx, _) => serialize(&tx),
64            Self::Sequence(sm, _) => sm.serialize_to_vec(),
65        }
66    }
67
68    /// Serializes this [`Message`] to 3 [`Vec<u8>`]s.
69    #[inline]
70    pub fn serialize_to_vecs(&self) -> [Vec<u8>; 3] {
71        [
72            self.topic().to_vec(),
73            self.serialize_data_to_vec(),
74            self.sequence().to_le_bytes().to_vec(),
75        ]
76    }
77
78    /// Returns the sequence of this [`Message`], a number that starts at 0 and goes up every time
79    /// Bitcoin Core sends a ZMQ message per publisher.
80    #[inline]
81    pub const fn sequence(&self) -> u32 {
82        match self {
83            Self::HashBlock(_, sequence)
84            | Self::HashTx(_, sequence)
85            | Self::Block(_, sequence)
86            | Self::Tx(_, sequence)
87            | Self::Sequence(_, sequence) => *sequence,
88        }
89    }
90
91    /// Attempts to deserialize a multipart (multiple byte slices) to a [`Message`].
92    #[inline]
93    pub fn from_multipart<T: AsRef<[u8]>>(mp: &[T]) -> Result<Self> {
94        Self::from_fixed_size_multipart(
95            mp.try_into()
96                .map_err(|_| Error::InvalidMutlipartLength(mp.len()))?,
97        )
98    }
99
100    #[inline]
101    pub fn from_fixed_size_multipart<T: AsRef<[u8]>>(mp: &[T; 3]) -> Result<Self> {
102        let [topic, data, sequence] = mp;
103
104        let topic = topic.as_ref();
105        let data = data.as_ref();
106        let sequence = sequence.as_ref();
107
108        let sequence = sequence
109            .try_into()
110            .map_err(|_| Error::InvalidSequenceLength(sequence.len()))?;
111
112        Self::from_parts(topic, data, sequence)
113    }
114
115    #[inline]
116    pub fn from_parts(topic: &[u8], data: &[u8], sequence: [u8; 4]) -> Result<Self> {
117        let sequence = u32::from_le_bytes(sequence);
118
119        Ok(match topic {
120            b"hashblock" | b"hashtx" => {
121                let mut data: [u8; 32] = data
122                    .try_into()
123                    .map_err(|_| Error::Invalid256BitHashLength(data.len()))?;
124                data.reverse();
125
126                match topic {
127                    b"hashblock" => Self::HashBlock(BlockHash::from_byte_array(data), sequence),
128                    _ /* b"hashtx" */ => Self::HashTx(Txid::from_byte_array(data), sequence),
129                }
130            }
131            b"rawblock" => Self::Block(deserialize(data)?, sequence),
132            b"rawtx" => Self::Tx(deserialize(data)?, sequence),
133            b"sequence" => Self::Sequence(SequenceMessage::from_byte_slice(data)?, sequence),
134            _ => {
135                let mut buf = [0; TOPIC_MAX_LEN];
136
137                buf[..min(TOPIC_MAX_LEN, topic.len())]
138                    .copy_from_slice(&topic[..min(TOPIC_MAX_LEN, topic.len())]);
139
140                return Err(Error::InvalidTopic(topic.len(), buf));
141            }
142        })
143    }
144}
145
146impl<T: AsRef<[u8]>> TryFrom<&[T]> for Message {
147    type Error = Error;
148
149    #[inline]
150    fn try_from(value: &[T]) -> Result<Self> {
151        Self::from_multipart(value)
152    }
153}
154
155impl<T: AsRef<[u8]>> TryFrom<[T; 3]> for Message {
156    type Error = Error;
157
158    #[inline]
159    fn try_from(value: [T; 3]) -> Result<Self> {
160        Self::from_fixed_size_multipart(&value)
161    }
162}
163
164impl From<Message> for [Vec<u8>; 3] {
165    #[inline]
166    fn from(msg: Message) -> Self {
167        msg.serialize_to_vecs()
168    }
169}
170
171impl From<Message> for Vec<Vec<u8>> {
172    #[inline]
173    fn from(msg: Message) -> Self {
174        msg.serialize_to_vecs().to_vec()
175    }
176}
177
178impl fmt::Display for Message {
179    #[inline]
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        match self {
182            Self::HashBlock(blockhash, sequence) => {
183                write!(f, "HashBlock({blockhash}, sequence={sequence})")
184            }
185            Self::HashTx(txid, sequence) => write!(f, "HashTx({txid}, sequence={sequence})"),
186            Self::Block(block, sequence) => {
187                write!(f, "Block({}, sequence={sequence})", block.block_hash())
188            }
189            Self::Tx(tx, sequence) => write!(f, "Tx({}, sequence={sequence})", tx.compute_txid()),
190            Self::Sequence(sm, sequence) => write!(f, "Sequence({sm}, sequence={sequence})"),
191        }
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use crate::{Error, Message};
198    use bitcoin::{consensus::serialize, constants::genesis_block, hashes::Hash, Network};
199
200    #[test]
201    fn test_deserialize_rawtx() {
202        let genesis_block = genesis_block(Network::Bitcoin);
203
204        let tx = &genesis_block.txdata[0];
205        let tx_bytes = serialize(tx);
206        let txid = tx.compute_txid();
207        let mut txid_bytes = txid.to_byte_array();
208        txid_bytes.reverse();
209
210        let to_deserialize = [
211            b"rawtx" as &[u8],
212            &tx_bytes,
213            &[0x03, 0x00, 0x00, 0x00],
214            b"garbage",
215        ];
216
217        let msg = Message::from_multipart(&to_deserialize[..3]).unwrap();
218
219        assert_eq!(msg, Message::Tx(tx.clone(), 3));
220
221        assert_eq!(msg.topic_str(), "rawtx");
222        assert_eq!(msg.serialize_data_to_vec(), tx_bytes);
223        assert_eq!(msg.sequence(), 3);
224
225        assert_eq!(msg.serialize_to_vecs(), to_deserialize[0..3]);
226
227        assert!(matches!(
228            Message::from_multipart(&to_deserialize[..0]),
229            Err(Error::InvalidMutlipartLength(0))
230        ));
231        assert!(matches!(
232            Message::from_multipart(&to_deserialize[..1]),
233            Err(Error::InvalidMutlipartLength(1))
234        ));
235        assert!(matches!(
236            Message::from_multipart(&to_deserialize[..2]),
237            Err(Error::InvalidMutlipartLength(2))
238        ));
239        assert!(matches!(
240            Message::from_multipart(&to_deserialize[..4]),
241            Err(Error::InvalidMutlipartLength(4))
242        ));
243    }
244
245    #[test]
246    fn test_deserialize_hashtx() {
247        let genesis_block = genesis_block(Network::Bitcoin);
248
249        let txid = genesis_block.txdata[0].compute_txid();
250        let mut txid_bytes = txid.to_byte_array();
251        txid_bytes.reverse();
252
253        let to_deserialize = [b"hashtx" as &[u8], &txid_bytes, &[0x04, 0x00, 0x00, 0x00]];
254
255        let msg = Message::from_multipart(&to_deserialize).unwrap();
256
257        assert_eq!(msg, Message::HashTx(txid, 4));
258
259        assert_eq!(msg.topic_str(), "hashtx");
260        assert_eq!(msg.serialize_data_to_vec(), txid_bytes);
261        assert_eq!(msg.sequence(), 4);
262
263        assert_eq!(msg.serialize_to_vecs(), to_deserialize);
264    }
265
266    #[test]
267    fn test_deserialization_error_mp_len() {
268        let to_deserialize = [
269            b"sequence" as &[u8],
270            &[],
271            &[0x05, 0x00, 0x00, 0x00],
272            b"garbage",
273        ];
274
275        assert!(matches!(
276            Message::from_multipart(&to_deserialize[..0]),
277            Err(Error::InvalidMutlipartLength(0))
278        ));
279        assert!(matches!(
280            Message::from_multipart(&to_deserialize[..1]),
281            Err(Error::InvalidMutlipartLength(1))
282        ));
283        assert!(matches!(
284            Message::from_multipart(&to_deserialize[..2]),
285            Err(Error::InvalidMutlipartLength(2))
286        ));
287        assert!(matches!(
288            Message::from_multipart(&to_deserialize[..4]),
289            Err(Error::InvalidMutlipartLength(4))
290        ));
291    }
292
293    #[test]
294    fn test_deserialization_error_topic() {
295        assert_eq!(
296            Message::from_multipart(&[b"" as &[u8], &[], &[0x06, 0x00, 0x00, 0x00]])
297                .expect_err("expected invalid topic")
298                .invalid_topic_data(),
299            Some((b"" as &[u8], 0))
300        );
301
302        assert_eq!(
303            Message::from_multipart(&[b"abc" as &[u8], &[], &[0x07, 0x00, 0x00, 0x00]])
304                .expect_err("expected invalid topic")
305                .invalid_topic_data(),
306            Some((b"abc" as &[u8], 3))
307        );
308
309        assert_eq!(
310            Message::from_multipart(&[b"hashblock!" as &[u8], &[], &[0x08, 0x00, 0x00, 0x00]])
311                .expect_err("expected invalid topic")
312                .invalid_topic_data(),
313            Some((b"hashblock" as &[u8], 10))
314        );
315
316        assert_eq!(
317            Message::from_multipart(&[
318                b"too long so gets truncated" as &[u8],
319                &[],
320                &[0x09, 0x00, 0x00, 0x00]
321            ])
322            .expect_err("expected invalid topic")
323            .invalid_topic_data(),
324            Some((b"too long " as &[u8], 26))
325        );
326    }
327
328    #[test]
329    fn test_deserialization_error_element_len() {
330        assert!(matches!(
331            Message::from_multipart(&[b"something" as &[u8], &[], b"not 4 bytes"]),
332            Err(Error::InvalidSequenceLength(11))
333        ));
334
335        assert!(matches!(
336            Message::from_multipart(&[b"hashtx" as &[u8], &[], &[0x0a, 0x00, 0x00, 0x00]]),
337            Err(Error::Invalid256BitHashLength(0))
338        ));
339
340        assert!(matches!(
341            Message::from_multipart(&[b"hashblock" as &[u8], &[0; 20], &[0x0b, 0x00, 0x00, 0x00]]),
342            Err(Error::Invalid256BitHashLength(20))
343        ));
344
345        assert!(matches!(
346            Message::from_multipart(&[b"sequence" as &[u8], &[0; 32], &[0x0c, 0x00, 0x00, 0x00]]),
347            Err(Error::InvalidSequenceMessageLength(32))
348        ));
349    }
350}