1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use futures::StreamExt;
use ntex::codec::{AsyncRead, AsyncWrite, Framed};
use ntex_amqp_codec::protocol::{Frame, Open};
use ntex_amqp_codec::{AmqpCodec, AmqpFrame, ProtocolIdCodec};

use super::errors::ServerError;
use crate::connection::ConnectionController;

/// Open new connection
pub struct Connect<Io> {
    conn: Framed<Io, ProtocolIdCodec>,
    controller: ConnectionController,
}

impl<Io> Connect<Io> {
    pub(crate) fn new(conn: Framed<Io, ProtocolIdCodec>, controller: ConnectionController) -> Self {
        Self { conn, controller }
    }

    /// Returns reference to io object
    pub fn get_ref(&self) -> &Io {
        self.conn.get_ref()
    }

    /// Returns mutable reference to io object
    pub fn get_mut(&mut self) -> &mut Io {
        self.conn.get_mut()
    }
}

impl<Io: AsyncRead + AsyncWrite + Unpin> Connect<Io> {
    /// Wait for connection open frame
    pub async fn open(self) -> Result<ConnectOpened<Io>, ServerError<()>> {
        let mut framed = self.conn.into_framed(AmqpCodec::<AmqpFrame>::new());
        let mut controller = self.controller;

        let frame = framed
            .next()
            .await
            .ok_or(ServerError::Disconnected)?
            .map_err(ServerError::from)?;

        let frame = frame.into_parts().1;
        match frame {
            Frame::Open(frame) => {
                trace!("Got open frame: {:?}", frame);
                controller.set_remote((&frame).into());
                Ok(ConnectOpened {
                    frame,
                    framed,
                    controller,
                })
            }
            frame => Err(ServerError::Unexpected(Box::new(frame))),
        }
    }
}

/// Connection is opened
pub struct ConnectOpened<Io> {
    frame: Open,
    framed: Framed<Io, AmqpCodec<AmqpFrame>>,
    controller: ConnectionController,
}

impl<Io> ConnectOpened<Io> {
    pub(crate) fn new(
        frame: Open,
        framed: Framed<Io, AmqpCodec<AmqpFrame>>,
        controller: ConnectionController,
    ) -> Self {
        ConnectOpened {
            frame,
            framed,
            controller,
        }
    }

    /// Get reference to remote `Open` frame
    pub fn frame(&self) -> &Open {
        &self.frame
    }

    /// Returns reference to io object
    pub fn get_ref(&self) -> &Io {
        self.framed.get_ref()
    }

    /// Returns mutable reference to io object
    pub fn get_mut(&mut self) -> &mut Io {
        self.framed.get_mut()
    }

    /// Connection controller
    pub fn connection(&self) -> &ConnectionController {
        &self.controller
    }

    /// Ack connect message and set state
    pub fn ack<St>(self, state: St) -> ConnectAck<Io, St> {
        ConnectAck {
            state,
            framed: self.framed,
            controller: self.controller,
        }
    }
}

/// Ack connect message
pub struct ConnectAck<Io, St> {
    state: St,
    framed: Framed<Io, AmqpCodec<AmqpFrame>>,
    controller: ConnectionController,
}

impl<Io, St> ConnectAck<Io, St> {
    pub(crate) fn into_inner(self) -> (St, Framed<Io, AmqpCodec<AmqpFrame>>, ConnectionController) {
        (self.state, self.framed, self.controller)
    }
}