lightning_liquidity/
message_queue.rs

1//! Holds types and traits used to implement message queues for [`LSPSMessage`]s.
2
3use crate::lsps0::ser::LSPSMessage;
4use crate::prelude::{Box, Vec, VecDeque};
5use crate::sync::{Mutex, RwLock};
6
7use bitcoin::secp256k1::PublicKey;
8
9/// The default [`MessageQueue`] Implementation used by [`LiquidityManager`].
10///
11/// [`LiquidityManager`]: crate::LiquidityManager
12pub struct MessageQueue {
13	queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
14	process_msgs_callback: RwLock<Option<Box<dyn ProcessMessagesCallback>>>,
15}
16
17impl MessageQueue {
18	pub(crate) fn new() -> Self {
19		let queue = Mutex::new(VecDeque::new());
20		let process_msgs_callback = RwLock::new(None);
21		Self { queue, process_msgs_callback }
22	}
23
24	pub(crate) fn set_process_msgs_callback(&self, callback: Box<dyn ProcessMessagesCallback>) {
25		*self.process_msgs_callback.write().unwrap() = Some(callback);
26	}
27
28	pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
29		self.queue.lock().unwrap().drain(..).collect()
30	}
31
32	pub(crate) fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
33		{
34			let mut queue = self.queue.lock().unwrap();
35			queue.push_back((*counterparty_node_id, msg));
36		}
37
38		if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() {
39			process_msgs_callback.call()
40		}
41	}
42}
43
44macro_rules! define_callback { ($($bounds: path),*) => {
45/// A callback which will be called to trigger network message processing.
46///
47/// Usually, this should call [`PeerManager::process_events`].
48///
49/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
50pub trait ProcessMessagesCallback : $($bounds +)* {
51	/// The method which is called.
52	fn call(&self);
53}
54
55impl<F: Fn() $(+ $bounds)*> ProcessMessagesCallback for F {
56	fn call(&self) { (self)(); }
57}
58} }
59
60#[cfg(feature = "std")]
61define_callback!(Send, Sync);
62#[cfg(not(feature = "std"))]
63define_callback!();