use ntex_io::IoBoxed;
use ntex_service::cfg::Cfg;
use ntex_util::time::Seconds;
use crate::codec::{AmqpCodec, AmqpFrame, protocol::Frame, protocol::Open};
use crate::{AmqpServiceConfig, RemoteServiceConfig, connection::Connection};
use super::{error::HandshakeError, sasl::Sasl};
#[derive(Debug)]
pub enum Handshake {
Amqp(HandshakeAmqp),
Sasl(Sasl),
}
impl Handshake {
pub(crate) fn new_plain(state: IoBoxed, local_config: Cfg<AmqpServiceConfig>) -> Self {
Handshake::Amqp(HandshakeAmqp {
state,
local_config,
})
}
pub(crate) fn new_sasl(state: IoBoxed, local_config: Cfg<AmqpServiceConfig>) -> Self {
Handshake::Sasl(Sasl::new(state, local_config))
}
pub fn io(&self) -> &IoBoxed {
match self {
Handshake::Amqp(item) => item.io(),
Handshake::Sasl(item) => item.io(),
}
}
}
#[derive(Debug)]
pub struct HandshakeAmqp {
state: IoBoxed,
local_config: Cfg<AmqpServiceConfig>,
}
impl HandshakeAmqp {
pub fn io(&self) -> &IoBoxed {
&self.state
}
pub async fn open(self) -> Result<HandshakeAmqpOpened, HandshakeError> {
let state = self.state;
let local_config = self.local_config;
let codec = AmqpCodec::<AmqpFrame>::new();
let frame = state.recv(&codec).await?.ok_or_else(|| {
log::trace!(
"{}: Server amqp is disconnected during open frame",
state.tag()
);
HandshakeError::Disconnected(None)
})?;
let frame = frame.into_parts().1;
match frame {
Frame::Open(frame) => {
log::trace!("{}: Got open frame: {:?}", state.tag(), frame);
let remote_config = RemoteServiceConfig::new(&frame);
let sink = Connection::new(state.get_ref(), &local_config, &remote_config);
Ok(HandshakeAmqpOpened {
frame,
sink,
state,
local_config,
remote_config,
})
}
frame => Err(HandshakeError::Unexpected(frame)),
}
}
}
pub struct HandshakeAmqpOpened {
frame: Open,
sink: Connection,
state: IoBoxed,
local_config: Cfg<AmqpServiceConfig>,
remote_config: RemoteServiceConfig,
}
impl HandshakeAmqpOpened {
pub(crate) fn new(
frame: Open,
sink: Connection,
state: IoBoxed,
local_config: Cfg<AmqpServiceConfig>,
remote_config: RemoteServiceConfig,
) -> Self {
Self {
frame,
sink,
state,
local_config,
remote_config,
}
}
pub fn io(&self) -> &IoBoxed {
&self.state
}
pub fn frame(&self) -> &Open {
&self.frame
}
pub fn local_config(&self) -> &Cfg<AmqpServiceConfig> {
&self.local_config
}
pub fn remote_config(&self) -> &RemoteServiceConfig {
&self.remote_config
}
pub fn sink(&self) -> &Connection {
&self.sink
}
pub fn ack<St>(self, st: St) -> HandshakeAck<St> {
HandshakeAck {
st,
sink: self.sink,
state: self.state,
idle_timeout: self.remote_config.timeout_remote_secs(),
}
}
}
pub struct HandshakeAck<St> {
st: St,
sink: Connection,
state: IoBoxed,
idle_timeout: Seconds,
}
impl<St> HandshakeAck<St> {
pub(crate) fn into_inner(self) -> (St, Connection, Seconds, IoBoxed) {
(self.st, self.sink, self.idle_timeout, self.state)
}
}