1use crate::error::PlainTextError;
22
23use bytes::Bytes;
24use futures::future::{self, Ready};
25use futures::prelude::*;
26use futures::future::BoxFuture;
27use libp2p_core::{
28 identity,
29 InboundUpgrade,
30 OutboundUpgrade,
31 UpgradeInfo,
32 PeerId,
33 PublicKey,
34};
35use log::debug;
36use std::{io, iter, pin::Pin, task::{Context, Poll}};
37use void::Void;
38
39mod error;
40mod handshake;
41mod structs_proto {
42 include!(concat!(env!("OUT_DIR"), "/structs.rs"));
43}
44
45
46#[derive(Debug, Copy, Clone)]
73pub struct PlainText1Config;
74
75impl UpgradeInfo for PlainText1Config {
76 type Info = &'static [u8];
77 type InfoIter = iter::Once<Self::Info>;
78
79 fn protocol_info(&self) -> Self::InfoIter {
80 iter::once(b"/plaintext/1.0.0")
81 }
82}
83
84impl<C> InboundUpgrade<C> for PlainText1Config {
85 type Output = C;
86 type Error = Void;
87 type Future = Ready<Result<C, Self::Error>>;
88
89 fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future {
90 future::ready(Ok(i))
91 }
92}
93
94impl<C> OutboundUpgrade<C> for PlainText1Config {
95 type Output = C;
96 type Error = Void;
97 type Future = Ready<Result<C, Self::Error>>;
98
99 fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future {
100 future::ready(Ok(i))
101 }
102}
103
104#[derive(Clone)]
107pub struct PlainText2Config {
108 pub local_public_key: identity::PublicKey,
109}
110
111impl UpgradeInfo for PlainText2Config {
112 type Info = &'static [u8];
113 type InfoIter = iter::Once<Self::Info>;
114
115 fn protocol_info(&self) -> Self::InfoIter {
116 iter::once(b"/plaintext/2.0.0")
117 }
118}
119
120impl<C> InboundUpgrade<C> for PlainText2Config
121where
122 C: AsyncRead + AsyncWrite + Send + Unpin + 'static
123{
124 type Output = (PeerId, PlainTextOutput<C>);
125 type Error = PlainTextError;
126 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
127
128 fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
129 Box::pin(self.handshake(socket))
130 }
131}
132
133impl<C> OutboundUpgrade<C> for PlainText2Config
134where
135 C: AsyncRead + AsyncWrite + Send + Unpin + 'static
136{
137 type Output = (PeerId, PlainTextOutput<C>);
138 type Error = PlainTextError;
139 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
140
141 fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
142 Box::pin(self.handshake(socket))
143 }
144}
145
146impl PlainText2Config {
147 async fn handshake<T>(self, socket: T) -> Result<(PeerId, PlainTextOutput<T>), PlainTextError>
148 where
149 T: AsyncRead + AsyncWrite + Send + Unpin + 'static
150 {
151 debug!("Starting plaintext handshake.");
152 let (socket, remote, read_buffer) = handshake::handshake(socket, self).await?;
153 debug!("Finished plaintext handshake.");
154
155 Ok((
156 remote.peer_id,
157 PlainTextOutput {
158 socket,
159 remote_key: remote.public_key,
160 read_buffer,
161 }
162 ))
163 }
164}
165
166pub struct PlainTextOutput<S>
168where
169 S: AsyncRead + AsyncWrite + Unpin,
170{
171 pub socket: S,
173 pub remote_key: PublicKey,
175 read_buffer: Bytes,
179}
180
181impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for PlainTextOutput<S> {
182 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
183 -> Poll<Result<usize, io::Error>>
184 {
185 if !self.read_buffer.is_empty() {
186 let n = std::cmp::min(buf.len(), self.read_buffer.len());
187 let b = self.read_buffer.split_to(n);
188 buf[..n].copy_from_slice(&b[..]);
189 return Poll::Ready(Ok(n))
190 }
191 AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
192 }
193}
194
195impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for PlainTextOutput<S> {
196 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
197 -> Poll<Result<usize, io::Error>>
198 {
199 AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
200 }
201
202 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
203 -> Poll<Result<(), io::Error>>
204 {
205 AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
206 }
207
208 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
209 -> Poll<Result<(), io::Error>>
210 {
211 AsyncWrite::poll_close(Pin::new(&mut self.socket), cx)
212 }
213}