lightning_liquidity/
message_queue.rs1use crate::lsps0::ser::LSPSMessage;
4use crate::prelude::{Box, Vec, VecDeque};
5use crate::sync::{Mutex, RwLock};
6
7use bitcoin::secp256k1::PublicKey;
8
9pub struct MessageQueue {
13 queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
14 process_msgs_callback: RwLock<Option<Box<dyn ProcessMessagesCallback>>>,
15}
16
17impl MessageQueue {
18 pub(crate) fn new() -> Self {
19 let queue = Mutex::new(VecDeque::new());
20 let process_msgs_callback = RwLock::new(None);
21 Self { queue, process_msgs_callback }
22 }
23
24 pub(crate) fn set_process_msgs_callback(&self, callback: Box<dyn ProcessMessagesCallback>) {
25 *self.process_msgs_callback.write().unwrap() = Some(callback);
26 }
27
28 pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
29 self.queue.lock().unwrap().drain(..).collect()
30 }
31
32 pub(crate) fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
33 {
34 let mut queue = self.queue.lock().unwrap();
35 queue.push_back((*counterparty_node_id, msg));
36 }
37
38 if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() {
39 process_msgs_callback.call()
40 }
41 }
42}
43
44macro_rules! define_callback { ($($bounds: path),*) => {
45pub trait ProcessMessagesCallback : $($bounds +)* {
51 fn call(&self);
53}
54
55impl<F: Fn() $(+ $bounds)*> ProcessMessagesCallback for F {
56 fn call(&self) { (self)(); }
57}
58} }
59
60#[cfg(feature = "std")]
61define_callback!(Send, Sync);
62#[cfg(not(feature = "std"))]
63define_callback!();