keynesis_network/
handle.rs

1use crate::{
2    codec::{NoiseEncryptedDecoder, NoiseEncryptedEncoder},
3    opening::Opening,
4    Accepting, SessionId,
5};
6use anyhow::{Context as _, Result};
7use bytes::{Bytes, BytesMut};
8use futures::{prelude::*, stream::FusedStream as _};
9use keynesis_core::{
10    hash::Blake2b,
11    key::{Dh, PublicKey},
12    noise::{TransportReceiveHalf, TransportSendHalf, TransportState},
13};
14use rand_core::{CryptoRng, RngCore};
15use std::{
16    pin::Pin,
17    task::{Context, Poll},
18};
19use tokio::io::{AsyncRead, AsyncWrite};
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22/// bidirectional handle of an encrypted connection
23///
24/// The [`Handle`] is composed of 2 halves that can be split for more convenient
25/// management of the ins and outs of the connections.
26///
27/// see [`Handle::split`] for more information
28pub struct Handle<I, O, PK>
29where
30    PK: PublicKey,
31{
32    stream: HandleReadHalf<I, PK>,
33    sink: HandleWriteHalf<O, PK>,
34}
35
36/// the reading half of the encrypted connection
37///
38/// see [`Handle::split`] for more information
39pub struct HandleReadHalf<I, PK>
40where
41    PK: PublicKey,
42{
43    none: bool,
44    stream: FramedRead<I, NoiseEncryptedDecoder<PK>>,
45}
46
47/// the writing half of the encrypted connection
48///
49/// see [`Handle::split`] for more information
50pub struct HandleWriteHalf<O, PK>
51where
52    PK: PublicKey,
53{
54    sink: FramedWrite<O, NoiseEncryptedEncoder<PK>>,
55}
56
57impl<I, PK> HandleReadHalf<I, PK>
58where
59    PK: PublicKey,
60    I: AsyncRead,
61{
62    fn new(stream: I, state: TransportReceiveHalf<Blake2b, PK>) -> Self {
63        let stream = FramedRead::new(stream, NoiseEncryptedDecoder::new(state));
64        let none = false;
65
66        Self { stream, none }
67    }
68
69    /// retrieve the public identity of the peer
70    ///
71    pub fn remote_public_identity(&self) -> &PK {
72        self.stream.decoder().remote_public_identity()
73    }
74
75    /// retrieve the unique identifier of the established session
76    ///
77    /// this is derived from the NOISE handshake and is the same
78    /// on both sides of the stream (here and for the remote).
79    pub fn session_id(&self) -> &SessionId {
80        self.stream.decoder().session_id()
81    }
82}
83
84impl<O, PK> HandleWriteHalf<O, PK>
85where
86    O: AsyncWrite,
87    PK: PublicKey,
88{
89    fn new(stream: O, state: TransportSendHalf<Blake2b, PK>) -> Self {
90        let sink = FramedWrite::new(stream, NoiseEncryptedEncoder::new(state));
91
92        Self { sink }
93    }
94
95    /// retrieve the public identity of the peer
96    ///
97    pub fn remote_public_identity(&self) -> &PK {
98        self.sink.encoder().remote_public_identity()
99    }
100
101    /// retrieve the unique identifier of the established session
102    ///
103    /// this is derived from the NOISE handshake and is the same
104    /// on both sides of the stream (here and for the remote).
105    pub fn session_id(&self) -> &SessionId {
106        self.sink.encoder().session_id()
107    }
108}
109
110impl<I, O, PK> Handle<I, O, PK>
111where
112    I: AsyncRead + Unpin,
113    O: AsyncWrite + Unpin,
114    PK: PublicKey,
115{
116    pub(crate) fn new(stream: I, sink: O, state: TransportState<Blake2b, PK>) -> Self {
117        let (tsh, trh) = state.split();
118
119        let stream = HandleReadHalf::new(stream, trh);
120        let sink = HandleWriteHalf::new(sink, tsh);
121
122        Self { stream, sink }
123    }
124
125    /// split the handle into 2 parts into 2 separate half
126    ///
127    /// One will contains the writing half and the other one the reading half. This is
128    /// because the connection is bidirectional/duplex so it is more convenient to handle
129    /// the protocol if the 2 halves are split. However, if you are only using the
130    /// synchronous you can keep the [`Handle`] as it is.
131    pub fn split(self) -> (HandleReadHalf<I, PK>, HandleWriteHalf<O, PK>) {
132        (self.stream, self.sink)
133    }
134
135    /// prepare accepting the new request from the given stream
136    ///
137    pub fn accept<K, RNG>(rng: RNG, reader: I, writer: O) -> Accepting<I, O, RNG, K>
138    where
139        K: Dh,
140        RNG: RngCore + CryptoRng,
141    {
142        Accepting::new(rng, reader, writer)
143    }
144
145    /// open a new stream with the remote peer connected to the `stream` and
146    /// expecting the remote's public identity `rs`.
147    ///
148    /// In order to open the connection, the remote peer will need to
149    /// verify our identity, so we needs the private key associated
150    /// to our identity (`K`).
151    ///
152    /// We will generate an ephemeral key, the `rng` will do that at the appropriate
153    /// time.
154    ///
155    pub async fn open<K, RNG>(rng: RNG, k: &K, rs: PK, reader: I, writer: O) -> Result<Self>
156    where
157        K: Dh<Public = PK>,
158        RNG: RngCore + CryptoRng,
159    {
160        let opening = Opening::new(rng, k, rs, reader, writer).await?;
161        opening.wait(k).await
162    }
163
164    /// retrieve the public identity of the peer
165    ///
166    #[allow(dead_code)]
167    pub fn remote_public_identity(&self) -> &PK {
168        self.stream.remote_public_identity()
169    }
170
171    /// retrieve the unique identifier of the established session
172    ///
173    /// this is derived from the NOISE handshake and is the same
174    /// on both sides of the stream (here and for the remote).
175    pub fn session_id(&self) -> &SessionId {
176        self.stream.session_id()
177    }
178}
179
180impl<I, O, PK> Stream for Handle<I, O, PK>
181where
182    I: AsyncRead + Unpin,
183    O: Unpin,
184    PK: PublicKey,
185{
186    type Item = Result<BytesMut>;
187
188    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189        let handle = self.get_mut();
190        let stream = Pin::new(&mut handle.stream);
191        stream.poll_next(cx)
192    }
193}
194
195impl<I, PK> Stream for HandleReadHalf<I, PK>
196where
197    I: AsyncRead + Unpin,
198    PK: PublicKey,
199{
200    type Item = Result<BytesMut>;
201
202    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203        if self.is_terminated() {
204            return Poll::Ready(None);
205        }
206
207        let handle = self.get_mut();
208        let stream = Pin::new(&mut handle.stream);
209
210        match futures::ready!(stream.poll_next(cx)) {
211            None => {
212                handle.none = true;
213                Poll::Ready(None)
214            }
215            Some(result) => Poll::Ready(Some(result.context("Invalid frame received from peer"))),
216        }
217    }
218}
219
220impl<I, PK> stream::FusedStream for HandleReadHalf<I, PK>
221where
222    I: AsyncRead + Unpin,
223    PK: PublicKey,
224{
225    fn is_terminated(&self) -> bool {
226        self.none
227    }
228}
229
230impl<I, O, PK> stream::FusedStream for Handle<I, O, PK>
231where
232    I: AsyncRead + Unpin,
233    O: Unpin,
234    PK: PublicKey,
235{
236    fn is_terminated(&self) -> bool {
237        self.stream.is_terminated()
238    }
239}
240
241impl<I, O, PK> Sink<Bytes> for Handle<I, O, PK>
242where
243    I: Unpin,
244    O: AsyncWrite + Unpin,
245    PK: PublicKey,
246{
247    type Error = anyhow::Error;
248
249    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
250        let handle = self.get_mut();
251        Pin::new(&mut handle.sink).start_send(item)
252    }
253
254    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
255        let handle = self.get_mut();
256        Pin::new(&mut handle.sink).poll_ready(cx)
257    }
258
259    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
260        let handle = self.get_mut();
261        Pin::new(&mut handle.sink).poll_close(cx)
262    }
263
264    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
265        let handle = self.get_mut();
266        Pin::new(&mut handle.sink).poll_flush(cx)
267    }
268}
269
270impl<O, PK> Sink<Bytes> for HandleWriteHalf<O, PK>
271where
272    O: AsyncWrite + Unpin,
273    PK: PublicKey,
274{
275    type Error = anyhow::Error;
276
277    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
278        let handle = self.get_mut();
279
280        Pin::new(&mut handle.sink)
281            .start_send(item)
282            .context("Cannot send the encrypted data to the handle")?;
283
284        Ok(())
285    }
286
287    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
288        let handle = self.get_mut();
289        match Pin::new(&mut handle.sink).poll_ready(cx) {
290            Poll::Pending => Poll::Pending,
291            Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_ready the handle")),
292        }
293    }
294
295    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
296        let handle = self.get_mut();
297        match Pin::new(&mut handle.sink).poll_close(cx) {
298            Poll::Pending => Poll::Pending,
299            Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_close the handle")),
300        }
301    }
302
303    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
304        let handle = self.get_mut();
305        match Pin::new(&mut handle.sink).poll_flush(cx) {
306            Poll::Pending => Poll::Pending,
307            Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_flush the handle")),
308        }
309    }
310}