actix_amqp/server/
connect.rs1use actix_codec::{AsyncRead, AsyncWrite, Framed};
2use amqp_codec::protocol::{Frame, Open};
3use amqp_codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec};
4use futures::{Future, StreamExt};
5
6use super::errors::ServerError;
7use crate::connection::ConnectionController;
8
9pub struct Connect<Io> {
11 conn: Framed<Io, ProtocolIdCodec>,
12 controller: ConnectionController,
13}
14
15impl<Io> Connect<Io> {
16 pub(crate) fn new(conn: Framed<Io, ProtocolIdCodec>, controller: ConnectionController) -> Self {
17 Self { conn, controller }
18 }
19
20 pub fn get_ref(&self) -> &Io {
22 self.conn.get_ref()
23 }
24
25 pub fn get_mut(&mut self) -> &mut Io {
27 self.conn.get_mut()
28 }
29}
30
31impl<Io: AsyncRead + AsyncWrite> Connect<Io> {
32 pub async fn open(self) -> Result<ConnectOpened<Io>, ServerError<()>> {
34 let mut framed = self.conn.into_framed(AmqpCodec::<AmqpFrame>::new());
35 let mut controller = self.controller;
36
37 let frame = framed
38 .next()
39 .await
40 .ok_or(ServerError::Disconnected)?
41 .map_err(ServerError::from)?;
42
43 let frame = frame.into_parts().1;
44 match frame {
45 Frame::Open(frame) => {
46 trace!("Got open frame: {:?}", frame);
47 controller.set_remote((&frame).into());
48 Ok(ConnectOpened {
49 frame,
50 framed,
51 controller,
52 })
53 }
54 frame => Err(ServerError::Unexpected(frame)),
55 }
56 }
57}
58
59pub struct ConnectOpened<Io> {
61 frame: Open,
62 framed: Framed<Io, AmqpCodec<AmqpFrame>>,
63 controller: ConnectionController,
64}
65
66impl<Io> ConnectOpened<Io> {
67 pub(crate) fn new(
68 frame: Open,
69 framed: Framed<Io, AmqpCodec<AmqpFrame>>,
70 controller: ConnectionController,
71 ) -> Self {
72 ConnectOpened {
73 frame,
74 framed,
75 controller,
76 }
77 }
78
79 pub fn frame(&self) -> &Open {
81 &self.frame
82 }
83
84 pub fn get_ref(&self) -> &Io {
86 self.framed.get_ref()
87 }
88
89 pub fn get_mut(&mut self) -> &mut Io {
91 self.framed.get_mut()
92 }
93
94 pub fn connection(&self) -> &ConnectionController {
96 &self.controller
97 }
98
99 pub fn ack<St>(self, state: St) -> ConnectAck<Io, St> {
101 ConnectAck {
102 state,
103 framed: self.framed,
104 controller: self.controller,
105 }
106 }
107}
108
109pub struct ConnectAck<Io, St> {
111 state: St,
112 framed: Framed<Io, AmqpCodec<AmqpFrame>>,
113 controller: ConnectionController,
114}
115
116impl<Io, St> ConnectAck<Io, St> {
117 pub(crate) fn into_inner(self) -> (St, Framed<Io, AmqpCodec<AmqpFrame>>, ConnectionController) {
118 (self.state, self.framed, self.controller)
119 }
120}