ant_libp2p_plaintext/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
24
25use ant_libp2p_core as libp2p_core;
26
27use std::{
28 io, iter,
29 pin::Pin,
30 task::{Context, Poll},
31};
32
33use bytes::Bytes;
34use futures::{future::BoxFuture, prelude::*};
35use libp2p_core::{
36 upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade},
37 UpgradeInfo,
38};
39use libp2p_identity as identity;
40use libp2p_identity::{PeerId, PublicKey};
41
42use crate::error::Error;
43
44mod error;
45mod handshake;
46mod proto {
47 #![allow(unreachable_pub)]
48 include!("generated/mod.rs");
49 pub(crate) use self::structs::Exchange;
50}
51
52#[derive(Clone)]
54pub struct Config {
55 local_public_key: identity::PublicKey,
56}
57
58impl Config {
59 pub fn new(identity: &identity::Keypair) -> Self {
60 Self {
61 local_public_key: identity.public(),
62 }
63 }
64}
65
66impl UpgradeInfo for Config {
67 type Info = &'static str;
68 type InfoIter = iter::Once<Self::Info>;
69
70 fn protocol_info(&self) -> Self::InfoIter {
71 iter::once("/plaintext/2.0.0")
72 }
73}
74
75impl<C> InboundConnectionUpgrade<C> for Config
76where
77 C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
78{
79 type Output = (PeerId, Output<C>);
80 type Error = Error;
81 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
82
83 fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
84 Box::pin(self.handshake(socket))
85 }
86}
87
88impl<C> OutboundConnectionUpgrade<C> for Config
89where
90 C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
91{
92 type Output = (PeerId, Output<C>);
93 type Error = Error;
94 type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
95
96 fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
97 Box::pin(self.handshake(socket))
98 }
99}
100
101impl Config {
102 async fn handshake<T>(self, socket: T) -> Result<(PeerId, Output<T>), Error>
103 where
104 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
105 {
106 tracing::debug!("Starting plaintext handshake.");
107 let (socket, remote_key, read_buffer) = handshake::handshake(socket, self).await?;
108 tracing::debug!("Finished plaintext handshake.");
109
110 Ok((
111 remote_key.to_peer_id(),
112 Output {
113 socket,
114 remote_key,
115 read_buffer,
116 },
117 ))
118 }
119}
120
121pub struct Output<S>
123where
124 S: AsyncRead + AsyncWrite + Unpin,
125{
126 pub socket: S,
128 pub remote_key: PublicKey,
130 read_buffer: Bytes,
134}
135
136impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for Output<S> {
137 fn poll_read(
138 mut self: Pin<&mut Self>,
139 cx: &mut Context<'_>,
140 buf: &mut [u8],
141 ) -> Poll<Result<usize, io::Error>> {
142 if !self.read_buffer.is_empty() {
143 let n = std::cmp::min(buf.len(), self.read_buffer.len());
144 let b = self.read_buffer.split_to(n);
145 buf[..n].copy_from_slice(&b[..]);
146 return Poll::Ready(Ok(n));
147 }
148 AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
149 }
150}
151
152impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Output<S> {
153 fn poll_write(
154 mut self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 buf: &[u8],
157 ) -> Poll<Result<usize, io::Error>> {
158 AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
159 }
160
161 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
162 AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
163 }
164
165 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
166 AsyncWrite::poll_close(Pin::new(&mut self.socket), cx)
167 }
168}