lightning_liquidity/
message_queue.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Holds types and traits used to implement message queues for [`LSPSMessage`]s.
11
12use alloc::collections::VecDeque;
13use alloc::vec::Vec;
14
15use crate::lsps0::ser::LSPSMessage;
16use crate::sync::{Arc, Mutex};
17
18use lightning::util::wakers::Notifier;
19
20use bitcoin::secp256k1::PublicKey;
21
22/// The default [`MessageQueue`] Implementation used by [`LiquidityManager`].
23///
24/// [`LiquidityManager`]: crate::LiquidityManager
25pub struct MessageQueue {
26	queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
27	pending_msgs_notifier: Arc<Notifier>,
28}
29
30impl MessageQueue {
31	pub(crate) fn new(pending_msgs_notifier: Arc<Notifier>) -> Self {
32		let queue = Mutex::new(VecDeque::new());
33		Self { queue, pending_msgs_notifier }
34	}
35
36	pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
37		self.queue.lock().unwrap().drain(..).collect()
38	}
39
40	pub(crate) fn notifier(&self) -> MessageQueueNotifierGuard<'_> {
41		MessageQueueNotifierGuard { msg_queue: self, buffer: VecDeque::new() }
42	}
43}
44
45// A guard type that will process buffered messages and wake the background processor when dropped.
46#[must_use]
47pub(crate) struct MessageQueueNotifierGuard<'a> {
48	msg_queue: &'a MessageQueue,
49	buffer: VecDeque<(PublicKey, LSPSMessage)>,
50}
51
52impl<'a> MessageQueueNotifierGuard<'a> {
53	pub fn enqueue(&mut self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
54		self.buffer.push_back((*counterparty_node_id, msg));
55	}
56}
57
58impl<'a> Drop for MessageQueueNotifierGuard<'a> {
59	fn drop(&mut self) {
60		if !self.buffer.is_empty() {
61			self.msg_queue.queue.lock().unwrap().append(&mut self.buffer);
62			self.msg_queue.pending_msgs_notifier.notify();
63		}
64	}
65}