lapin/
returned_messages.rs

1use crate::{
2    BasicProperties, Promise, message::BasicReturnMessage, publisher_confirm::Confirmation,
3    types::PayloadSize,
4};
5use std::{
6    collections::VecDeque,
7    fmt,
8    sync::{Arc, Mutex, MutexGuard},
9};
10use tracing::{trace, warn};
11
12#[derive(Clone, Default)]
13pub(crate) struct ReturnedMessages(Arc<Mutex<Inner>>);
14
15impl ReturnedMessages {
16    pub(crate) fn start_new_delivery(&self, message: BasicReturnMessage) {
17        self.lock_inner().current_message = Some(message);
18    }
19
20    pub(crate) fn handle_content_header_frame(
21        &self,
22        size: PayloadSize,
23        properties: BasicProperties,
24        confirm_mode: bool,
25    ) {
26        self.lock_inner()
27            .handle_content_header_frame(size, properties, confirm_mode);
28    }
29
30    pub(crate) fn handle_body_frame(
31        &self,
32        remaining_size: PayloadSize,
33        payload: Vec<u8>,
34        confirm_mode: bool,
35    ) {
36        self.lock_inner()
37            .handle_body_frame(remaining_size, payload, confirm_mode);
38    }
39
40    pub(crate) fn drain(&self) -> Vec<BasicReturnMessage> {
41        self.lock_inner().drain()
42    }
43
44    pub(crate) fn register_dropped_confirm(&self, promise: Promise<Confirmation>) {
45        self.lock_inner().register_dropped_confirm(promise);
46    }
47
48    pub(crate) fn get_waiting_message(&self) -> Option<BasicReturnMessage> {
49        self.lock_inner().waiting_messages.pop_front()
50    }
51
52    fn lock_inner(&self) -> MutexGuard<'_, Inner> {
53        self.0.lock().unwrap_or_else(|e| e.into_inner())
54    }
55}
56
57impl fmt::Debug for ReturnedMessages {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        let mut debug = f.debug_struct("ReturnedMessages");
60        if let Ok(inner) = self.0.try_lock() {
61            debug
62                .field("waiting_messages", &inner.waiting_messages)
63                .field("messages", &inner.messages)
64                .field("non_confirm_messages", &inner.non_confirm_messages);
65        }
66        debug.finish()
67    }
68}
69
70#[derive(Default)]
71pub struct Inner {
72    current_message: Option<BasicReturnMessage>,
73    non_confirm_messages: Vec<BasicReturnMessage>,
74    waiting_messages: VecDeque<BasicReturnMessage>,
75    messages: Vec<BasicReturnMessage>,
76    dropped_confirms: Vec<Promise<Confirmation>>,
77}
78
79impl Inner {
80    fn handle_content_header_frame(
81        &mut self,
82        size: PayloadSize,
83        properties: BasicProperties,
84        confirm_mode: bool,
85    ) {
86        if let Some(message) = self.current_message.as_mut() {
87            message.properties = properties;
88        }
89        if size == 0 {
90            self.new_delivery_complete(confirm_mode);
91        }
92    }
93
94    fn handle_body_frame(
95        &mut self,
96        remaining_size: PayloadSize,
97        payload: Vec<u8>,
98        confirm_mode: bool,
99    ) {
100        if let Some(message) = self.current_message.as_mut() {
101            message.receive_content(payload);
102        }
103        if remaining_size == 0 {
104            self.new_delivery_complete(confirm_mode);
105        }
106    }
107
108    fn new_delivery_complete(&mut self, confirm_mode: bool) {
109        if let Some(message) = self.current_message.take() {
110            warn!(?message, "Server returned us a message");
111            if confirm_mode {
112                self.waiting_messages.push_back(message);
113            } else {
114                self.non_confirm_messages.push(message);
115            }
116        }
117    }
118
119    fn process_dropped_confirm(
120        &mut self,
121        promise: Promise<Confirmation>,
122        messages: Option<&mut Vec<BasicReturnMessage>>,
123    ) {
124        let messages = messages.unwrap_or(&mut self.messages);
125
126        if let Some(confirmation) = promise.try_wait() {
127            if let Ok(Confirmation::Nack(Some(message))) | Ok(Confirmation::Ack(Some(message))) =
128                confirmation
129            {
130                trace!("PublisherConfirm was carrying a message, storing it");
131                messages.push(*message);
132            } else {
133                trace!("PublisherConfirm was ready but didn't carry a message, discarding");
134            }
135        } else {
136            trace!("PublisherConfirm wasn't ready yet, storing it for further use");
137            self.dropped_confirms.push(promise);
138        }
139    }
140
141    fn register_dropped_confirm(&mut self, promise: Promise<Confirmation>) {
142        trace!("Registering new dropped PublisherConfirm");
143        self.process_dropped_confirm(promise, None)
144    }
145
146    fn drain(&mut self) -> Vec<BasicReturnMessage> {
147        let mut messages = std::mem::take(&mut self.messages);
148        if !self.non_confirm_messages.is_empty() {
149            let mut non_confirm_messages = std::mem::take(&mut self.non_confirm_messages);
150            non_confirm_messages.append(&mut messages);
151            messages = non_confirm_messages;
152        }
153        let before = self.dropped_confirms.len();
154        if before != 0 {
155            for promise in std::mem::take(&mut self.dropped_confirms) {
156                self.process_dropped_confirm(promise, Some(&mut messages))
157            }
158            trace!(
159                %before,
160                after=%self.dropped_confirms.len(),
161                "PublisherConfirms processed"
162            );
163        }
164        messages
165    }
166}