actix_amqp/client/
protocol.rs1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_codec::{AsyncRead, AsyncWrite, Framed};
6use actix_service::Service;
7use futures::{Future, SinkExt, StreamExt};
8
9use amqp_codec::protocol::ProtocolId;
10use amqp_codec::{ProtocolIdCodec, ProtocolIdError};
11
12pub struct ProtocolNegotiation<Io> {
13 proto: ProtocolId,
14 _r: PhantomData<Io>,
15}
16
17impl<Io> Clone for ProtocolNegotiation<Io> {
18 fn clone(&self) -> Self {
19 ProtocolNegotiation {
20 proto: self.proto.clone(),
21 _r: PhantomData,
22 }
23 }
24}
25
26impl<Io> ProtocolNegotiation<Io> {
27 pub fn new(proto: ProtocolId) -> Self {
28 ProtocolNegotiation {
29 proto,
30 _r: PhantomData,
31 }
32 }
33
34 pub fn framed(stream: Io) -> Framed<Io, ProtocolIdCodec>
35 where
36 Io: AsyncRead + AsyncWrite,
37 {
38 Framed::new(stream, ProtocolIdCodec)
39 }
40}
41
42impl<Io> Default for ProtocolNegotiation<Io> {
43 fn default() -> Self {
44 Self::new(ProtocolId::Amqp)
45 }
46}
47
48impl<Io> Service for ProtocolNegotiation<Io>
49where
50 Io: AsyncRead + AsyncWrite + 'static,
51{
52 type Request = Framed<Io, ProtocolIdCodec>;
53 type Response = Framed<Io, ProtocolIdCodec>;
54 type Error = ProtocolIdError;
55 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
56
57 fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
58 Poll::Ready(Ok(()))
59 }
60
61 fn call(&mut self, mut framed: Framed<Io, ProtocolIdCodec>) -> Self::Future {
62 let proto = self.proto;
63
64 Box::pin(async move {
65 framed.send(proto).await?;
66 let protocol = framed.next().await.ok_or(ProtocolIdError::Disconnected)??;
67
68 if proto == protocol {
69 Ok(framed)
70 } else {
71 Err(ProtocolIdError::Unexpected {
72 exp: proto,
73 got: protocol,
74 })
75 }
76 })
77 }
78}