actix_amqp/client/
protocol.rs

1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_codec::{AsyncRead, AsyncWrite, Framed};
6use actix_service::Service;
7use futures::{Future, SinkExt, StreamExt};
8
9use amqp_codec::protocol::ProtocolId;
10use amqp_codec::{ProtocolIdCodec, ProtocolIdError};
11
12pub struct ProtocolNegotiation<Io> {
13    proto: ProtocolId,
14    _r: PhantomData<Io>,
15}
16
17impl<Io> Clone for ProtocolNegotiation<Io> {
18    fn clone(&self) -> Self {
19        ProtocolNegotiation {
20            proto: self.proto.clone(),
21            _r: PhantomData,
22        }
23    }
24}
25
26impl<Io> ProtocolNegotiation<Io> {
27    pub fn new(proto: ProtocolId) -> Self {
28        ProtocolNegotiation {
29            proto,
30            _r: PhantomData,
31        }
32    }
33
34    pub fn framed(stream: Io) -> Framed<Io, ProtocolIdCodec>
35    where
36        Io: AsyncRead + AsyncWrite,
37    {
38        Framed::new(stream, ProtocolIdCodec)
39    }
40}
41
42impl<Io> Default for ProtocolNegotiation<Io> {
43    fn default() -> Self {
44        Self::new(ProtocolId::Amqp)
45    }
46}
47
48impl<Io> Service for ProtocolNegotiation<Io>
49where
50    Io: AsyncRead + AsyncWrite + 'static,
51{
52    type Request = Framed<Io, ProtocolIdCodec>;
53    type Response = Framed<Io, ProtocolIdCodec>;
54    type Error = ProtocolIdError;
55    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
56
57    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
58        Poll::Ready(Ok(()))
59    }
60
61    fn call(&mut self, mut framed: Framed<Io, ProtocolIdCodec>) -> Self::Future {
62        let proto = self.proto;
63
64        Box::pin(async move {
65            framed.send(proto).await?;
66            let protocol = framed.next().await.ok_or(ProtocolIdError::Disconnected)??;
67
68            if proto == protocol {
69                Ok(framed)
70            } else {
71                Err(ProtocolIdError::Unexpected {
72                    exp: proto,
73                    got: protocol,
74                })
75            }
76        })
77    }
78}