actix_amqp/server/
message.rs

1use std::fmt;
2
3use amqp_codec::protocol::{Accepted, DeliveryState, Error, Rejected, Transfer, TransferBody};
4use amqp_codec::Decode;
5use bytes::Bytes;
6
7use crate::rcvlink::ReceiverLink;
8use crate::session::Session;
9
10use super::errors::AmqpError;
11use super::State;
12
13pub struct Message<S> {
14    state: State<S>,
15    frame: Transfer,
16    link: ReceiverLink,
17}
18
19#[derive(Debug)]
20pub enum Outcome {
21    Accept,
22    Reject,
23    Error(Error),
24}
25
26impl<T> From<T> for Outcome
27where
28    T: Into<Error>,
29{
30    fn from(err: T) -> Self {
31        Outcome::Error(err.into())
32    }
33}
34
35impl Outcome {
36    pub(crate) fn into_delivery_state(self) -> DeliveryState {
37        match self {
38            Outcome::Accept => DeliveryState::Accepted(Accepted {}),
39            Outcome::Reject => DeliveryState::Rejected(Rejected { error: None }),
40            Outcome::Error(e) => DeliveryState::Rejected(Rejected { error: Some(e) }),
41        }
42    }
43}
44
45impl<S> Message<S> {
46    pub(crate) fn new(state: State<S>, frame: Transfer, link: ReceiverLink) -> Self {
47        Message { state, frame, link }
48    }
49
50    pub fn state(&self) -> &S {
51        self.state.get_ref()
52    }
53
54    pub fn state_mut(&mut self) -> &mut S {
55        self.state.get_mut()
56    }
57
58    pub fn session(&self) -> &Session {
59        self.link.session()
60    }
61
62    pub fn session_mut(&mut self) -> &mut Session {
63        self.link.session_mut()
64    }
65
66    pub fn frame(&self) -> &Transfer {
67        &self.frame
68    }
69
70    pub fn body(&self) -> Option<&Bytes> {
71        match self.frame.body {
72            Some(TransferBody::Data(ref b)) => Some(b),
73            _ => None,
74        }
75    }
76
77    pub fn load_message<T: Decode>(&self) -> Result<T, AmqpError> {
78        if let Some(TransferBody::Data(ref b)) = self.frame.body {
79            if let Ok((_, msg)) = T::decode(b) {
80                Ok(msg)
81            } else {
82                Err(AmqpError::decode_error().description("Can not decode message"))
83            }
84        } else {
85            Err(AmqpError::invalid_field().description("Unknown body"))
86        }
87    }
88}
89
90impl<S> fmt::Debug for Message<S> {
91    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
92        fmt.debug_struct("Message<S>")
93            .field("frame", &self.frame)
94            .finish()
95    }
96}