1use crate::{
2 codec::{NoiseEncryptedDecoder, NoiseEncryptedEncoder},
3 opening::Opening,
4 Accepting, SessionId,
5};
6use anyhow::{Context as _, Result};
7use bytes::{Bytes, BytesMut};
8use futures::{prelude::*, stream::FusedStream as _};
9use keynesis_core::{
10 hash::Blake2b,
11 key::{ed25519::PublicKey, Dh},
12 noise::{TransportReceiveHalf, TransportSendHalf, TransportState},
13};
14use rand_core::{CryptoRng, RngCore};
15use std::{
16 pin::Pin,
17 task::{Context, Poll},
18};
19use tokio::io::{AsyncRead, AsyncWrite};
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22pub struct Handle<I, O> {
29 stream: HandleReadHalf<I>,
30 sink: HandleWriteHalf<O>,
31}
32
33pub struct HandleReadHalf<I> {
37 none: bool,
38 stream: FramedRead<I, NoiseEncryptedDecoder>,
39}
40
41pub struct HandleWriteHalf<O> {
45 sink: FramedWrite<O, NoiseEncryptedEncoder>,
46}
47
48impl<I> HandleReadHalf<I>
49where
50 I: AsyncRead,
51{
52 fn new(stream: I, state: TransportReceiveHalf<Blake2b>) -> Self {
53 let stream = FramedRead::new(stream, NoiseEncryptedDecoder::new(state));
54 let none = false;
55
56 Self { stream, none }
57 }
58
59 pub fn remote_public_identity(&self) -> &PublicKey {
62 self.stream.decoder().remote_public_identity()
63 }
64
65 pub fn session_id(&self) -> &SessionId {
70 self.stream.decoder().session_id()
71 }
72}
73
74impl<O> HandleWriteHalf<O>
75where
76 O: AsyncWrite,
77{
78 fn new(stream: O, state: TransportSendHalf<Blake2b>) -> Self {
79 let sink = FramedWrite::new(stream, NoiseEncryptedEncoder::new(state));
80
81 Self { sink }
82 }
83
84 pub fn remote_public_identity(&self) -> &PublicKey {
87 self.sink.encoder().remote_public_identity()
88 }
89
90 pub fn session_id(&self) -> &SessionId {
95 self.sink.encoder().session_id()
96 }
97}
98
99impl<I, O> Handle<I, O>
100where
101 I: AsyncRead + Unpin,
102 O: AsyncWrite + Unpin,
103{
104 pub(crate) fn new(stream: I, sink: O, state: TransportState<Blake2b>) -> Self {
105 let (tsh, trh) = state.split();
106
107 let stream = HandleReadHalf::new(stream, trh);
108 let sink = HandleWriteHalf::new(sink, tsh);
109
110 Self { stream, sink }
111 }
112
113 pub fn split(self) -> (HandleReadHalf<I>, HandleWriteHalf<O>) {
120 (self.stream, self.sink)
121 }
122
123 pub fn accept<K, RNG>(rng: RNG, reader: I, writer: O) -> Accepting<I, O, RNG, K>
126 where
127 K: Dh,
128 RNG: RngCore + CryptoRng,
129 {
130 Accepting::new(rng, reader, writer)
131 }
132
133 pub async fn open<K, RNG>(rng: RNG, k: &K, rs: PublicKey, reader: I, writer: O) -> Result<Self>
144 where
145 K: Dh,
146 RNG: RngCore + CryptoRng,
147 {
148 let opening = Opening::new(rng, k, rs, reader, writer).await?;
149 opening.wait(k).await
150 }
151
152 #[allow(dead_code)]
155 pub fn remote_public_identity(&self) -> &PublicKey {
156 self.stream.remote_public_identity()
157 }
158
159 pub fn session_id(&self) -> &SessionId {
164 self.stream.session_id()
165 }
166}
167
168impl<I, O> Stream for Handle<I, O>
169where
170 I: AsyncRead + Unpin,
171 O: Unpin,
172{
173 type Item = Result<BytesMut>;
174
175 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176 let handle = self.get_mut();
177 let stream = Pin::new(&mut handle.stream);
178 stream.poll_next(cx)
179 }
180}
181
182impl<I> Stream for HandleReadHalf<I>
183where
184 I: AsyncRead + Unpin,
185{
186 type Item = Result<BytesMut>;
187
188 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189 if self.is_terminated() {
190 return Poll::Ready(None);
191 }
192
193 let handle = self.get_mut();
194 let stream = Pin::new(&mut handle.stream);
195
196 match futures::ready!(stream.poll_next(cx)) {
197 None => {
198 handle.none = true;
199 Poll::Ready(None)
200 }
201 Some(result) => Poll::Ready(Some(result.context("Invalid frame received from peer"))),
202 }
203 }
204}
205
206impl<I> stream::FusedStream for HandleReadHalf<I>
207where
208 I: AsyncRead + Unpin,
209{
210 fn is_terminated(&self) -> bool {
211 self.none
212 }
213}
214
215impl<I, O> stream::FusedStream for Handle<I, O>
216where
217 I: AsyncRead + Unpin,
218 O: Unpin,
219{
220 fn is_terminated(&self) -> bool {
221 self.stream.is_terminated()
222 }
223}
224
225impl<I, O> Sink<Bytes> for Handle<I, O>
226where
227 I: Unpin,
228 O: AsyncWrite + Unpin,
229{
230 type Error = anyhow::Error;
231
232 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
233 let handle = self.get_mut();
234 Pin::new(&mut handle.sink).start_send(item)
235 }
236
237 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
238 let handle = self.get_mut();
239 Pin::new(&mut handle.sink).poll_ready(cx)
240 }
241
242 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
243 let handle = self.get_mut();
244 Pin::new(&mut handle.sink).poll_close(cx)
245 }
246
247 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
248 let handle = self.get_mut();
249 Pin::new(&mut handle.sink).poll_flush(cx)
250 }
251}
252
253impl<O> Sink<Bytes> for HandleWriteHalf<O>
254where
255 O: AsyncWrite + Unpin,
256{
257 type Error = anyhow::Error;
258
259 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
260 let handle = self.get_mut();
261
262 Pin::new(&mut handle.sink)
263 .start_send(item)
264 .context("Cannot send the encrypted data to the handle")?;
265
266 Ok(())
267 }
268
269 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270 let handle = self.get_mut();
271 match Pin::new(&mut handle.sink).poll_ready(cx) {
272 Poll::Pending => Poll::Pending,
273 Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_ready the handle")),
274 }
275 }
276
277 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
278 let handle = self.get_mut();
279 match Pin::new(&mut handle.sink).poll_close(cx) {
280 Poll::Pending => Poll::Pending,
281 Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_close the handle")),
282 }
283 }
284
285 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
286 let handle = self.get_mut();
287 match Pin::new(&mut handle.sink).poll_flush(cx) {
288 Poll::Pending => Poll::Pending,
289 Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_flush the handle")),
290 }
291 }
292}