ant_libp2p_plaintext/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the [plaintext](https://github.com/libp2p/specs/blob/master/plaintext/README.md) protocol.
22
23#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25use ant_libp2p_core as libp2p_core;
26
27use std::{
28    io, iter,
29    pin::Pin,
30    task::{Context, Poll},
31};
32
33use bytes::Bytes;
34use futures::{future::BoxFuture, prelude::*};
35use libp2p_core::{
36    upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade},
37    UpgradeInfo,
38};
39use libp2p_identity as identity;
40use libp2p_identity::{PeerId, PublicKey};
41
42use crate::error::Error;
43
44mod error;
45mod handshake;
46mod proto {
47    #![allow(unreachable_pub)]
48    include!("generated/mod.rs");
49    pub(crate) use self::structs::Exchange;
50}
51
52/// [`Config`] is an insecure connection handshake for testing purposes only.
53#[derive(Clone)]
54pub struct Config {
55    local_public_key: identity::PublicKey,
56}
57
58impl Config {
59    pub fn new(identity: &identity::Keypair) -> Self {
60        Self {
61            local_public_key: identity.public(),
62        }
63    }
64}
65
66impl UpgradeInfo for Config {
67    type Info = &'static str;
68    type InfoIter = iter::Once<Self::Info>;
69
70    fn protocol_info(&self) -> Self::InfoIter {
71        iter::once("/plaintext/2.0.0")
72    }
73}
74
75impl<C> InboundConnectionUpgrade<C> for Config
76where
77    C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
78{
79    type Output = (PeerId, Output<C>);
80    type Error = Error;
81    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
82
83    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
84        Box::pin(self.handshake(socket))
85    }
86}
87
88impl<C> OutboundConnectionUpgrade<C> for Config
89where
90    C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
91{
92    type Output = (PeerId, Output<C>);
93    type Error = Error;
94    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
95
96    fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
97        Box::pin(self.handshake(socket))
98    }
99}
100
101impl Config {
102    async fn handshake<T>(self, socket: T) -> Result<(PeerId, Output<T>), Error>
103    where
104        T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
105    {
106        tracing::debug!("Starting plaintext handshake.");
107        let (socket, remote_key, read_buffer) = handshake::handshake(socket, self).await?;
108        tracing::debug!("Finished plaintext handshake.");
109
110        Ok((
111            remote_key.to_peer_id(),
112            Output {
113                socket,
114                remote_key,
115                read_buffer,
116            },
117        ))
118    }
119}
120
121/// Output of the plaintext protocol.
122pub struct Output<S>
123where
124    S: AsyncRead + AsyncWrite + Unpin,
125{
126    /// The plaintext stream.
127    pub socket: S,
128    /// The public key of the remote.
129    pub remote_key: PublicKey,
130    /// Remaining bytes that have been already buffered
131    /// during the handshake but are not part of the
132    /// handshake. These must be consumed first by `poll_read`.
133    read_buffer: Bytes,
134}
135
136impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for Output<S> {
137    fn poll_read(
138        mut self: Pin<&mut Self>,
139        cx: &mut Context<'_>,
140        buf: &mut [u8],
141    ) -> Poll<Result<usize, io::Error>> {
142        if !self.read_buffer.is_empty() {
143            let n = std::cmp::min(buf.len(), self.read_buffer.len());
144            let b = self.read_buffer.split_to(n);
145            buf[..n].copy_from_slice(&b[..]);
146            return Poll::Ready(Ok(n));
147        }
148        AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
149    }
150}
151
152impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Output<S> {
153    fn poll_write(
154        mut self: Pin<&mut Self>,
155        cx: &mut Context<'_>,
156        buf: &[u8],
157    ) -> Poll<Result<usize, io::Error>> {
158        AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
159    }
160
161    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
162        AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
163    }
164
165    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
166        AsyncWrite::poll_close(Pin::new(&mut self.socket), cx)
167    }
168}