lapin/
basic_get_delivery.rs1use crate::{BasicProperties, PromiseResolver, message::BasicGetMessage, types::PayloadSize};
2use std::{
3 fmt,
4 sync::{Arc, Mutex, MutexGuard},
5};
6
7#[derive(Clone, Default)]
8pub(crate) struct BasicGetDelivery(Arc<Mutex<Inner>>);
9
10impl BasicGetDelivery {
11 pub(crate) fn start_new_delivery(
12 &self,
13 message: BasicGetMessage,
14 resolver: PromiseResolver<Option<BasicGetMessage>>,
15 ) {
16 self.lock_inner().start_new_delivery(message, resolver);
17 }
18
19 pub(crate) fn handle_content_header_frame(
20 &self,
21 size: PayloadSize,
22 properties: BasicProperties,
23 ) {
24 self.lock_inner()
25 .handle_content_header_frame(size, properties);
26 }
27
28 pub(crate) fn handle_body_frame(&self, remaining_size: PayloadSize, payload: Vec<u8>) {
29 self.lock_inner().handle_body_frame(remaining_size, payload);
30 }
31
32 fn lock_inner(&self) -> MutexGuard<'_, Inner> {
33 self.0.lock().unwrap_or_else(|e| e.into_inner())
34 }
35}
36
37impl fmt::Debug for BasicGetDelivery {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_tuple("BasicGetDelivery").finish()
40 }
41}
42
43#[derive(Default)]
44struct Inner(Option<InnerData>);
45
46impl Inner {
47 fn start_new_delivery(
48 &mut self,
49 message: BasicGetMessage,
50 resolver: PromiseResolver<Option<BasicGetMessage>>,
51 ) {
52 self.0 = Some(InnerData { message, resolver });
53 }
54
55 fn handle_content_header_frame(&mut self, size: PayloadSize, properties: BasicProperties) {
56 if let Some(inner) = self.0.as_mut() {
57 inner.message.properties = properties;
58 }
59 if size == 0 {
60 self.new_delivery_complete();
61 }
62 }
63
64 fn handle_body_frame(&mut self, remaining_size: PayloadSize, payload: Vec<u8>) {
65 if let Some(inner) = self.0.as_mut() {
66 inner.message.receive_content(payload);
67 }
68 if remaining_size == 0 {
69 self.new_delivery_complete();
70 }
71 }
72
73 fn new_delivery_complete(&mut self) {
74 if let Some(inner) = self.0.take() {
75 inner.resolver.resolve(Some(inner.message));
76 }
77 }
78}
79
80struct InnerData {
81 message: BasicGetMessage,
82 resolver: PromiseResolver<Option<BasicGetMessage>>,
83}