keynesis_network/
handle.rs

1use crate::{
2    codec::{NoiseEncryptedDecoder, NoiseEncryptedEncoder},
3    opening::Opening,
4    Accepting, SessionId,
5};
6use anyhow::{Context as _, Result};
7use bytes::{Bytes, BytesMut};
8use futures::{prelude::*, stream::FusedStream as _};
9use keynesis_core::{
10    hash::Blake2b,
11    key::{ed25519::PublicKey, Dh},
12    noise::{TransportReceiveHalf, TransportSendHalf, TransportState},
13};
14use rand_core::{CryptoRng, RngCore};
15use std::{
16    pin::Pin,
17    task::{Context, Poll},
18};
19use tokio::io::{AsyncRead, AsyncWrite};
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22/// bidirectional handle of an encrypted connection
23///
24/// The [`Handle`] is composed of 2 halves that can be split for more convenient
25/// management of the ins and outs of the connections.
26///
27/// see [`Handle::split`] for more information
28pub struct Handle<I, O> {
29    stream: HandleReadHalf<I>,
30    sink: HandleWriteHalf<O>,
31}
32
33/// the reading half of the encrypted connection
34///
35/// see [`Handle::split`] for more information
36pub struct HandleReadHalf<I> {
37    none: bool,
38    stream: FramedRead<I, NoiseEncryptedDecoder>,
39}
40
41/// the writing half of the encrypted connection
42///
43/// see [`Handle::split`] for more information
44pub struct HandleWriteHalf<O> {
45    sink: FramedWrite<O, NoiseEncryptedEncoder>,
46}
47
48impl<I> HandleReadHalf<I>
49where
50    I: AsyncRead,
51{
52    fn new(stream: I, state: TransportReceiveHalf<Blake2b>) -> Self {
53        let stream = FramedRead::new(stream, NoiseEncryptedDecoder::new(state));
54        let none = false;
55
56        Self { stream, none }
57    }
58
59    /// retrieve the public identity of the peer
60    ///
61    pub fn remote_public_identity(&self) -> &PublicKey {
62        self.stream.decoder().remote_public_identity()
63    }
64
65    /// retrieve the unique identifier of the established session
66    ///
67    /// this is derived from the NOISE handshake and is the same
68    /// on both sides of the stream (here and for the remote).
69    pub fn session_id(&self) -> &SessionId {
70        self.stream.decoder().session_id()
71    }
72}
73
74impl<O> HandleWriteHalf<O>
75where
76    O: AsyncWrite,
77{
78    fn new(stream: O, state: TransportSendHalf<Blake2b>) -> Self {
79        let sink = FramedWrite::new(stream, NoiseEncryptedEncoder::new(state));
80
81        Self { sink }
82    }
83
84    /// retrieve the public identity of the peer
85    ///
86    pub fn remote_public_identity(&self) -> &PublicKey {
87        self.sink.encoder().remote_public_identity()
88    }
89
90    /// retrieve the unique identifier of the established session
91    ///
92    /// this is derived from the NOISE handshake and is the same
93    /// on both sides of the stream (here and for the remote).
94    pub fn session_id(&self) -> &SessionId {
95        self.sink.encoder().session_id()
96    }
97}
98
99impl<I, O> Handle<I, O>
100where
101    I: AsyncRead + Unpin,
102    O: AsyncWrite + Unpin,
103{
104    pub(crate) fn new(stream: I, sink: O, state: TransportState<Blake2b>) -> Self {
105        let (tsh, trh) = state.split();
106
107        let stream = HandleReadHalf::new(stream, trh);
108        let sink = HandleWriteHalf::new(sink, tsh);
109
110        Self { stream, sink }
111    }
112
113    /// split the handle into 2 parts into 2 separate half
114    ///
115    /// One will contains the writing half and the other one the reading half. This is
116    /// because the connection is bidirectional/duplex so it is more convenient to handle
117    /// the protocol if the 2 halves are split. However, if you are only using the
118    /// synchronous you can keep the [`Handle`] as it is.
119    pub fn split(self) -> (HandleReadHalf<I>, HandleWriteHalf<O>) {
120        (self.stream, self.sink)
121    }
122
123    /// prepare accepting the new request from the given stream
124    ///
125    pub fn accept<K, RNG>(rng: RNG, reader: I, writer: O) -> Accepting<I, O, RNG, K>
126    where
127        K: Dh,
128        RNG: RngCore + CryptoRng,
129    {
130        Accepting::new(rng, reader, writer)
131    }
132
133    /// open a new stream with the remote peer connected to the `stream` and
134    /// expecting the remote's public identity `rs`.
135    ///
136    /// In order to open the connection, the remote peer will need to
137    /// verify our identity, so we needs the private key associated
138    /// to our identity (`K`).
139    ///
140    /// We will generate an ephemeral key, the `rng` will do that at the appropriate
141    /// time.
142    ///
143    pub async fn open<K, RNG>(rng: RNG, k: &K, rs: PublicKey, reader: I, writer: O) -> Result<Self>
144    where
145        K: Dh,
146        RNG: RngCore + CryptoRng,
147    {
148        let opening = Opening::new(rng, k, rs, reader, writer).await?;
149        opening.wait(k).await
150    }
151
152    /// retrieve the public identity of the peer
153    ///
154    #[allow(dead_code)]
155    pub fn remote_public_identity(&self) -> &PublicKey {
156        self.stream.remote_public_identity()
157    }
158
159    /// retrieve the unique identifier of the established session
160    ///
161    /// this is derived from the NOISE handshake and is the same
162    /// on both sides of the stream (here and for the remote).
163    pub fn session_id(&self) -> &SessionId {
164        self.stream.session_id()
165    }
166}
167
168impl<I, O> Stream for Handle<I, O>
169where
170    I: AsyncRead + Unpin,
171    O: Unpin,
172{
173    type Item = Result<BytesMut>;
174
175    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176        let handle = self.get_mut();
177        let stream = Pin::new(&mut handle.stream);
178        stream.poll_next(cx)
179    }
180}
181
182impl<I> Stream for HandleReadHalf<I>
183where
184    I: AsyncRead + Unpin,
185{
186    type Item = Result<BytesMut>;
187
188    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189        if self.is_terminated() {
190            return Poll::Ready(None);
191        }
192
193        let handle = self.get_mut();
194        let stream = Pin::new(&mut handle.stream);
195
196        match futures::ready!(stream.poll_next(cx)) {
197            None => {
198                handle.none = true;
199                Poll::Ready(None)
200            }
201            Some(result) => Poll::Ready(Some(result.context("Invalid frame received from peer"))),
202        }
203    }
204}
205
206impl<I> stream::FusedStream for HandleReadHalf<I>
207where
208    I: AsyncRead + Unpin,
209{
210    fn is_terminated(&self) -> bool {
211        self.none
212    }
213}
214
215impl<I, O> stream::FusedStream for Handle<I, O>
216where
217    I: AsyncRead + Unpin,
218    O: Unpin,
219{
220    fn is_terminated(&self) -> bool {
221        self.stream.is_terminated()
222    }
223}
224
225impl<I, O> Sink<Bytes> for Handle<I, O>
226where
227    I: Unpin,
228    O: AsyncWrite + Unpin,
229{
230    type Error = anyhow::Error;
231
232    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
233        let handle = self.get_mut();
234        Pin::new(&mut handle.sink).start_send(item)
235    }
236
237    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
238        let handle = self.get_mut();
239        Pin::new(&mut handle.sink).poll_ready(cx)
240    }
241
242    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
243        let handle = self.get_mut();
244        Pin::new(&mut handle.sink).poll_close(cx)
245    }
246
247    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
248        let handle = self.get_mut();
249        Pin::new(&mut handle.sink).poll_flush(cx)
250    }
251}
252
253impl<O> Sink<Bytes> for HandleWriteHalf<O>
254where
255    O: AsyncWrite + Unpin,
256{
257    type Error = anyhow::Error;
258
259    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
260        let handle = self.get_mut();
261
262        Pin::new(&mut handle.sink)
263            .start_send(item)
264            .context("Cannot send the encrypted data to the handle")?;
265
266        Ok(())
267    }
268
269    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270        let handle = self.get_mut();
271        match Pin::new(&mut handle.sink).poll_ready(cx) {
272            Poll::Pending => Poll::Pending,
273            Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_ready the handle")),
274        }
275    }
276
277    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
278        let handle = self.get_mut();
279        match Pin::new(&mut handle.sink).poll_close(cx) {
280            Poll::Pending => Poll::Pending,
281            Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_close the handle")),
282        }
283    }
284
285    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
286        let handle = self.get_mut();
287        match Pin::new(&mut handle.sink).poll_flush(cx) {
288            Poll::Pending => Poll::Pending,
289            Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_flush the handle")),
290        }
291    }
292}