request_response/handler/
protocol.rs1use crate::RequestId;
27use crate::codec::RequestResponseCodec;
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30use tetsy_libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31use tetsy_libp2p_swarm::NegotiatedSubstream;
32use smallvec::SmallVec;
33use std::io;
34
35#[derive(Debug, Clone)]
37pub enum ProtocolSupport {
38 Inbound,
40 Outbound,
42 Full
44}
45
46impl ProtocolSupport {
47 pub fn inbound(&self) -> bool {
49 match self {
50 ProtocolSupport::Inbound | ProtocolSupport::Full => true,
51 ProtocolSupport::Outbound => false,
52 }
53 }
54
55 pub fn outbound(&self) -> bool {
57 match self {
58 ProtocolSupport::Outbound | ProtocolSupport::Full => true,
59 ProtocolSupport::Inbound => false,
60 }
61 }
62}
63
64#[derive(Debug)]
68pub struct ResponseProtocol<TCodec>
69where
70 TCodec: RequestResponseCodec
71{
72 pub(crate) codec: TCodec,
73 pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
74 pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>,
75 pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>,
76 pub(crate) request_id: RequestId
77
78}
79
80impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
81where
82 TCodec: RequestResponseCodec
83{
84 type Info = TCodec::Protocol;
85 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
86
87 fn protocol_info(&self) -> Self::InfoIter {
88 self.protocols.clone().into_iter()
89 }
90}
91
92impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
93where
94 TCodec: RequestResponseCodec + Send + 'static,
95{
96 type Output = bool;
97 type Error = io::Error;
98 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
99
100 fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
101 async move {
102 let read = self.codec.read_request(&protocol, &mut io);
103 let request = read.await?;
104 if let Ok(()) = self.request_sender.send((self.request_id, request)) {
105 if let Ok(response) = self.response_receiver.await {
106 let write = self.codec.write_response(&protocol, &mut io, response);
107 write.await?;
108 } else {
109 io.close().await?;
110 return Ok(false)
111 }
112 }
113 io.close().await?;
114 Ok(true)
115 }.boxed()
116 }
117}
118
119#[derive(Debug)]
123pub struct RequestProtocol<TCodec>
124where
125 TCodec: RequestResponseCodec
126{
127 pub(crate) codec: TCodec,
128 pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
129 pub(crate) request_id: RequestId,
130 pub(crate) request: TCodec::Request,
131}
132
133impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
134where
135 TCodec: RequestResponseCodec
136{
137 type Info = TCodec::Protocol;
138 type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
139
140 fn protocol_info(&self) -> Self::InfoIter {
141 self.protocols.clone().into_iter()
142 }
143}
144
145impl<TCodec> OutboundUpgrade<NegotiatedSubstream> for RequestProtocol<TCodec>
146where
147 TCodec: RequestResponseCodec + Send + 'static,
148{
149 type Output = TCodec::Response;
150 type Error = io::Error;
151 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
152
153 fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
154 async move {
155 let write = self.codec.write_request(&protocol, &mut io, self.request);
156 write.await?;
157 io.close().await?;
158 let read = self.codec.read_response(&protocol, &mut io);
159 let response = read.await?;
160 Ok(response)
161 }.boxed()
162 }
163}