Skip to main content

lapin/
basic_get_delivery.rs

1use crate::{BasicProperties, PromiseResolver, message::BasicGetMessage, types::PayloadSize};
2use std::{
3    fmt,
4    sync::{Arc, Mutex, MutexGuard},
5};
6
7#[derive(Clone, Default)]
8pub(crate) struct BasicGetDelivery(Arc<Mutex<Inner>>);
9
10impl BasicGetDelivery {
11    pub(crate) fn start_new_delivery(
12        &self,
13        message: BasicGetMessage,
14        resolver: PromiseResolver<Option<BasicGetMessage>>,
15    ) {
16        self.lock_inner().start_new_delivery(message, resolver);
17    }
18
19    pub(crate) fn handle_content_header_frame(
20        &self,
21        size: PayloadSize,
22        properties: BasicProperties,
23    ) {
24        self.lock_inner()
25            .handle_content_header_frame(size, properties);
26    }
27
28    pub(crate) fn handle_body_frame(&self, remaining_size: PayloadSize, payload: Vec<u8>) {
29        self.lock_inner().handle_body_frame(remaining_size, payload);
30    }
31
32    fn lock_inner(&self) -> MutexGuard<'_, Inner> {
33        self.0.lock().unwrap_or_else(|e| e.into_inner())
34    }
35}
36
37impl fmt::Debug for BasicGetDelivery {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_tuple("BasicGetDelivery").finish()
40    }
41}
42
43#[derive(Default)]
44struct Inner(Option<InnerData>);
45
46impl Inner {
47    fn start_new_delivery(
48        &mut self,
49        message: BasicGetMessage,
50        resolver: PromiseResolver<Option<BasicGetMessage>>,
51    ) {
52        self.0 = Some(InnerData { message, resolver });
53    }
54
55    fn handle_content_header_frame(&mut self, size: PayloadSize, properties: BasicProperties) {
56        if let Some(inner) = self.0.as_mut() {
57            inner.message.properties = properties;
58        }
59        if size == 0 {
60            self.new_delivery_complete();
61        }
62    }
63
64    fn handle_body_frame(&mut self, remaining_size: PayloadSize, payload: Vec<u8>) {
65        if let Some(inner) = self.0.as_mut() {
66            inner.message.receive_content(payload);
67        }
68        if remaining_size == 0 {
69            self.new_delivery_complete();
70        }
71    }
72
73    fn new_delivery_complete(&mut self) {
74        if let Some(inner) = self.0.take() {
75            inner.resolver.resolve(Some(inner.message));
76        }
77    }
78}
79
80struct InnerData {
81    message: BasicGetMessage,
82    resolver: PromiseResolver<Option<BasicGetMessage>>,
83}