bitcoincore_zmqsequence/
lib.rs1use crate::errors::ZMQSeqListenerError;
2use std::str;
3use std::sync::atomic::AtomicBool;
4use std::sync::atomic::Ordering;
5use std::sync::mpsc::channel;
6use std::sync::mpsc::Receiver;
7use std::sync::{Arc, Barrier};
8use std::thread;
9use std::thread::JoinHandle;
10
11use url::Url;
12
13pub mod errors;
14
15#[cfg(feature = "check_node")]
16pub mod check;
17
18type Msg = Vec<Vec<u8>>;
19
20#[derive(Clone, PartialEq, Eq, Debug)]
22pub enum MempoolSequence {
23 BlockConnection { block_hash: String, zmq_seq: u32 },
25 BlockDisconnection { block_hash: String, zmq_seq: u32 },
27 TxRemoved {
29 txid: String,
30 mp_seq_num: u64,
31 zmq_seq: u32,
32 },
33 TxAdded {
35 txid: String,
36 mp_seq_num: u64,
37 zmq_seq: u32,
38 },
39 SeqError { error: ZMQSeqListenerError },
41 SeqStart { bitcoind_already_working: bool },
44}
45
46impl TryFrom<Msg> for MempoolSequence {
47 type Error = ZMQSeqListenerError;
48
49 fn try_from(msg: Vec<Vec<u8>>) -> Result<Self, Self::Error> {
50 if msg.len() < 3 {
52 return Err(ZMQSeqListenerError::MsgError("msg.len() < 3".to_string()));
53 };
54 let topic = str::from_utf8(msg.get(0).unwrap())?;
55 let body = msg.get(1).unwrap();
56 let zmqseq = msg.get(2).unwrap();
57
58 if topic.ne("sequence") {
59 return Err(ZMQSeqListenerError::TopicError(topic.to_string()));
60 }
61 let zmq_seq = parse_zmq_seq_num(zmqseq)?;
62 let char = *body.get(32).ok_or(ZMQSeqListenerError::MsgError(
63 "Not enough size, expected [u8;33]".to_string(),
64 ))?;
65 match char {
66 67 => Ok(MempoolSequence::BlockConnection {
68 block_hash: hex::encode(&body[..32]),
69 zmq_seq,
70 }),
71 68 => Ok(MempoolSequence::BlockDisconnection {
73 block_hash: hex::encode(&body[..32]),
74 zmq_seq,
75 }),
76 82 => Ok(MempoolSequence::TxRemoved {
78 txid: hex::encode(&body[..32]),
79 mp_seq_num: parse_mp_seq_num(body)?,
80 zmq_seq,
81 }),
82 65 => Ok(MempoolSequence::TxAdded {
84 txid: hex::encode(&body[..32]),
85 mp_seq_num: parse_mp_seq_num(body)?,
86 zmq_seq,
87 }),
88 ch => Err(ZMQSeqListenerError::CharCodeError(ch.to_string())),
89 }
90 }
91}
92
93fn parse_mp_seq_num(value: &[u8]) -> Result<u64, ZMQSeqListenerError> {
94 let ch: [u8; 8] = match value[33..41].try_into() {
95 Ok(val) => Ok(val),
96 Err(_) => Err(ZMQSeqListenerError::MsgError(
97 "Not enough size, expected [u8;41]".to_string(),
98 )),
99 }?;
100 Ok(u64::from_le_bytes(ch))
101}
102
103fn parse_zmq_seq_num(value: &[u8]) -> Result<u32, ZMQSeqListenerError> {
104 let ch: [u8; 4] = match value[..4].try_into() {
105 Ok(val) => Ok(val),
106 Err(_) => Err(ZMQSeqListenerError::MsgError(
107 "Not enough size, expected [u8;4]".to_string(),
108 )),
109 }?;
110 Ok(u32::from_le_bytes(ch))
111}
112
113pub struct ZmqSeqListener {
144 pub rx: Receiver<MempoolSequence>,
145 pub stop: Arc<AtomicBool>,
146 pub thread: JoinHandle<()>,
147}
148
149impl ZmqSeqListener {
150 pub fn start(zmq_address: &Url) -> Result<Self, ZMQSeqListenerError> {
152 let context = zmq::Context::new();
153 let subscriber = context.socket(zmq::SUB)?;
154 subscriber.connect(zmq_address.as_str())?;
155 subscriber.set_subscribe(b"sequence")?;
156 let stop_th = Arc::new(AtomicBool::new(false));
157 let stop = stop_th.clone();
158 let (tx, rx) = channel();
159 let barrier = Arc::new(Barrier::new(2));
161 let barrierc = barrier.clone();
162 let thread = thread::spawn(move || {
163 let mut is_starting = true;
164 let mut last_zmq_seq = 0;
165 while !stop_th.load(Ordering::SeqCst) {
166 let mpsq = match receive_mpsq(&subscriber) {
167 Ok(mpsq) => mpsq,
168 Err(e) => MempoolSequence::SeqError { error: e },
169 };
170 if is_starting {
171 barrier.wait();
172 tx.send(MempoolSequence::SeqStart {
173 bitcoind_already_working: check_bitcoind_already_working(&mpsq),
174 })
175 .unwrap();
176 is_starting = false;
177 } else {
178 let zmq_seq = zmq_seq_from(&mpsq);
179 if zmq_seq.is_some() {
180 if zmq_seq.unwrap() != last_zmq_seq + 1 {
181 tx.send(MempoolSequence::SeqError {
182 error: ZMQSeqListenerError::InvalidSeqNumber(
183 last_zmq_seq + 1,
184 zmq_seq.unwrap(),
185 ),
186 })
187 .unwrap();
188 }
189 }
190 }
191 last_zmq_seq = zmq_seq_from(&mpsq).unwrap_or(last_zmq_seq);
192 tx.send(mpsq).unwrap();
193 }
194 });
195 barrierc.wait();
196 Ok(ZmqSeqListener { rx, stop, thread })
197 }
198}
199
200fn check_bitcoind_already_working(mpsq: &MempoolSequence) -> bool {
201 matches!(
202 mpsq,
203 MempoolSequence::BlockConnection { zmq_seq, .. }
204 | MempoolSequence::BlockDisconnection { zmq_seq, .. }
205 | MempoolSequence::TxRemoved { zmq_seq, .. }
206 | MempoolSequence::TxAdded { zmq_seq, .. }
207 if *zmq_seq != 0
208 )
209}
210
211fn zmq_seq_from(mpsq: &MempoolSequence) -> Option<u32> {
212 match mpsq {
213 MempoolSequence::BlockConnection { zmq_seq, .. }
214 | MempoolSequence::BlockDisconnection { zmq_seq, .. }
215 | MempoolSequence::TxRemoved { zmq_seq, .. }
216 | MempoolSequence::TxAdded { zmq_seq, .. } => Some(*zmq_seq),
217 _ => None,
218 }
219}
220
221fn receive_mpsq(subscriber: &zmq::Socket) -> Result<MempoolSequence, ZMQSeqListenerError> {
222 let res = {
223 let msg = subscriber.recv_multipart(0)?;
224 let mpsq = MempoolSequence::try_from(msg)?;
225 Ok(mpsq)
226 };
227 res
228}