lightning_liquidity/
message_queue.rs1use alloc::collections::VecDeque;
13use alloc::vec::Vec;
14
15use crate::lsps0::ser::LSPSMessage;
16use crate::sync::{Arc, Mutex};
17
18use lightning::util::wakers::Notifier;
19
20use bitcoin::secp256k1::PublicKey;
21
22pub struct MessageQueue {
26 queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
27 pending_msgs_notifier: Arc<Notifier>,
28}
29
30impl MessageQueue {
31 pub(crate) fn new(pending_msgs_notifier: Arc<Notifier>) -> Self {
32 let queue = Mutex::new(VecDeque::new());
33 Self { queue, pending_msgs_notifier }
34 }
35
36 pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
37 self.queue.lock().unwrap().drain(..).collect()
38 }
39
40 pub(crate) fn notifier(&self) -> MessageQueueNotifierGuard<'_> {
41 MessageQueueNotifierGuard { msg_queue: self, buffer: VecDeque::new() }
42 }
43}
44
45#[must_use]
47pub(crate) struct MessageQueueNotifierGuard<'a> {
48 msg_queue: &'a MessageQueue,
49 buffer: VecDeque<(PublicKey, LSPSMessage)>,
50}
51
52impl<'a> MessageQueueNotifierGuard<'a> {
53 pub fn enqueue(&mut self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
54 self.buffer.push_back((*counterparty_node_id, msg));
55 }
56}
57
58impl<'a> Drop for MessageQueueNotifierGuard<'a> {
59 fn drop(&mut self) {
60 if !self.buffer.is_empty() {
61 self.msg_queue.queue.lock().unwrap().append(&mut self.buffer);
62 self.msg_queue.pending_msgs_notifier.notify();
63 }
64 }
65}