actix_amqp/server/
message.rs1use std::fmt;
2
3use amqp_codec::protocol::{Accepted, DeliveryState, Error, Rejected, Transfer, TransferBody};
4use amqp_codec::Decode;
5use bytes::Bytes;
6
7use crate::rcvlink::ReceiverLink;
8use crate::session::Session;
9
10use super::errors::AmqpError;
11use super::State;
12
13pub struct Message<S> {
14 state: State<S>,
15 frame: Transfer,
16 link: ReceiverLink,
17}
18
19#[derive(Debug)]
20pub enum Outcome {
21 Accept,
22 Reject,
23 Error(Error),
24}
25
26impl<T> From<T> for Outcome
27where
28 T: Into<Error>,
29{
30 fn from(err: T) -> Self {
31 Outcome::Error(err.into())
32 }
33}
34
35impl Outcome {
36 pub(crate) fn into_delivery_state(self) -> DeliveryState {
37 match self {
38 Outcome::Accept => DeliveryState::Accepted(Accepted {}),
39 Outcome::Reject => DeliveryState::Rejected(Rejected { error: None }),
40 Outcome::Error(e) => DeliveryState::Rejected(Rejected { error: Some(e) }),
41 }
42 }
43}
44
45impl<S> Message<S> {
46 pub(crate) fn new(state: State<S>, frame: Transfer, link: ReceiverLink) -> Self {
47 Message { state, frame, link }
48 }
49
50 pub fn state(&self) -> &S {
51 self.state.get_ref()
52 }
53
54 pub fn state_mut(&mut self) -> &mut S {
55 self.state.get_mut()
56 }
57
58 pub fn session(&self) -> &Session {
59 self.link.session()
60 }
61
62 pub fn session_mut(&mut self) -> &mut Session {
63 self.link.session_mut()
64 }
65
66 pub fn frame(&self) -> &Transfer {
67 &self.frame
68 }
69
70 pub fn body(&self) -> Option<&Bytes> {
71 match self.frame.body {
72 Some(TransferBody::Data(ref b)) => Some(b),
73 _ => None,
74 }
75 }
76
77 pub fn load_message<T: Decode>(&self) -> Result<T, AmqpError> {
78 if let Some(TransferBody::Data(ref b)) = self.frame.body {
79 if let Ok((_, msg)) = T::decode(b) {
80 Ok(msg)
81 } else {
82 Err(AmqpError::decode_error().description("Can not decode message"))
83 }
84 } else {
85 Err(AmqpError::invalid_field().description("Unknown body"))
86 }
87 }
88}
89
90impl<S> fmt::Debug for Message<S> {
91 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
92 fmt.debug_struct("Message<S>")
93 .field("frame", &self.frame)
94 .finish()
95 }
96}