actix_amqp/server/
connect.rs

1use actix_codec::{AsyncRead, AsyncWrite, Framed};
2use amqp_codec::protocol::{Frame, Open};
3use amqp_codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec};
4use futures::{Future, StreamExt};
5
6use super::errors::ServerError;
7use crate::connection::ConnectionController;
8
9/// Open new connection
10pub struct Connect<Io> {
11    conn: Framed<Io, ProtocolIdCodec>,
12    controller: ConnectionController,
13}
14
15impl<Io> Connect<Io> {
16    pub(crate) fn new(conn: Framed<Io, ProtocolIdCodec>, controller: ConnectionController) -> Self {
17        Self { conn, controller }
18    }
19
20    /// Returns reference to io object
21    pub fn get_ref(&self) -> &Io {
22        self.conn.get_ref()
23    }
24
25    /// Returns mutable reference to io object
26    pub fn get_mut(&mut self) -> &mut Io {
27        self.conn.get_mut()
28    }
29}
30
31impl<Io: AsyncRead + AsyncWrite> Connect<Io> {
32    /// Wait for connection open frame
33    pub async fn open(self) -> Result<ConnectOpened<Io>, ServerError<()>> {
34        let mut framed = self.conn.into_framed(AmqpCodec::<AmqpFrame>::new());
35        let mut controller = self.controller;
36
37        let frame = framed
38            .next()
39            .await
40            .ok_or(ServerError::Disconnected)?
41            .map_err(ServerError::from)?;
42
43        let frame = frame.into_parts().1;
44        match frame {
45            Frame::Open(frame) => {
46                trace!("Got open frame: {:?}", frame);
47                controller.set_remote((&frame).into());
48                Ok(ConnectOpened {
49                    frame,
50                    framed,
51                    controller,
52                })
53            }
54            frame => Err(ServerError::Unexpected(frame)),
55        }
56    }
57}
58
59/// Connection is opened
60pub struct ConnectOpened<Io> {
61    frame: Open,
62    framed: Framed<Io, AmqpCodec<AmqpFrame>>,
63    controller: ConnectionController,
64}
65
66impl<Io> ConnectOpened<Io> {
67    pub(crate) fn new(
68        frame: Open,
69        framed: Framed<Io, AmqpCodec<AmqpFrame>>,
70        controller: ConnectionController,
71    ) -> Self {
72        ConnectOpened {
73            frame,
74            framed,
75            controller,
76        }
77    }
78
79    /// Get reference to remote `Open` frame
80    pub fn frame(&self) -> &Open {
81        &self.frame
82    }
83
84    /// Returns reference to io object
85    pub fn get_ref(&self) -> &Io {
86        self.framed.get_ref()
87    }
88
89    /// Returns mutable reference to io object
90    pub fn get_mut(&mut self) -> &mut Io {
91        self.framed.get_mut()
92    }
93
94    /// Connection controller
95    pub fn connection(&self) -> &ConnectionController {
96        &self.controller
97    }
98
99    /// Ack connect message and set state
100    pub fn ack<St>(self, state: St) -> ConnectAck<Io, St> {
101        ConnectAck {
102            state,
103            framed: self.framed,
104            controller: self.controller,
105        }
106    }
107}
108
109/// Ack connect message
110pub struct ConnectAck<Io, St> {
111    state: St,
112    framed: Framed<Io, AmqpCodec<AmqpFrame>>,
113    controller: ConnectionController,
114}
115
116impl<Io, St> ConnectAck<Io, St> {
117    pub(crate) fn into_inner(self) -> (St, Framed<Io, AmqpCodec<AmqpFrame>>, ConnectionController) {
118        (self.state, self.framed, self.controller)
119    }
120}