libp2p_request_response/handler/
protocol.rs1use crate::RequestId;
27use crate::codec::RequestResponseCodec;
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31use libp2p_swarm::NegotiatedSubstream;
32use smallvec::SmallVec;
33use std::io;
34
35#[derive(Debug, Clone)]
37pub enum ProtocolSupport {
38 Inbound,
40 Outbound,
42 Full
44}
45
46impl ProtocolSupport {
47 pub fn inbound(&self) -> bool {
49 match self {
50 ProtocolSupport::Inbound | ProtocolSupport::Full => true,
51 ProtocolSupport::Outbound => false,
52 }
53 }
54
55 pub fn outbound(&self) -> bool {
57 match self {
58 ProtocolSupport::Outbound | ProtocolSupport::Full => true,
59 ProtocolSupport::Inbound => false,
60 }
61 }
62}
63
64#[derive(Debug)]
68pub struct ResponseProtocol<TCodec>
69where
70 TCodec: RequestResponseCodec
71{
72 pub(crate) codec: TCodec,
73 pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
74 pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>,
75 pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>,
76 pub(crate) request_id: RequestId
77
78}
79
80impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
81where
82 TCodec: RequestResponseCodec
83{
84 type Info = TCodec::Protocol;
85 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
86
87 fn protocol_info(&self) -> Self::InfoIter {
88 self.protocols.clone().into_iter()
89 }
90}
91
92impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
93where
94 TCodec: RequestResponseCodec + Send + 'static,
95{
96 type Output = bool;
97 type Error = io::Error;
98 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
99
100 fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
101 async move {
102 let read = self.codec.read_request(&protocol, &mut io);
103 let request = read.await?;
104 if let Ok(()) = self.request_sender.send((self.request_id, request)) {
105 if let Ok(response) = self.response_receiver.await {
106 let write = self.codec.write_response(&protocol, &mut io, response);
107 write.await?;
108 } else {
109 return Ok(false)
110 }
111 }
112 io.close().await?;
113 Ok(true)
114 }.boxed()
115 }
116}
117
118#[derive(Debug)]
122pub struct RequestProtocol<TCodec>
123where
124 TCodec: RequestResponseCodec
125{
126 pub(crate) codec: TCodec,
127 pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
128 pub(crate) request_id: RequestId,
129 pub(crate) request: TCodec::Request,
130}
131
132impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
133where
134 TCodec: RequestResponseCodec
135{
136 type Info = TCodec::Protocol;
137 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
138
139 fn protocol_info(&self) -> Self::InfoIter {
140 self.protocols.clone().into_iter()
141 }
142}
143
144impl<TCodec> OutboundUpgrade<NegotiatedSubstream> for RequestProtocol<TCodec>
145where
146 TCodec: RequestResponseCodec + Send + 'static,
147{
148 type Output = TCodec::Response;
149 type Error = io::Error;
150 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
151
152 fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
153 async move {
154 let write = self.codec.write_request(&protocol, &mut io, self.request);
155 write.await?;
156 io.close().await?;
157 let read = self.codec.read_response(&protocol, &mut io);
158 let response = read.await?;
159 Ok(response)
160 }.boxed()
161 }
162}