1use crate::{
2 codec::{NoiseEncryptedDecoder, NoiseEncryptedEncoder},
3 opening::Opening,
4 Accepting, SessionId,
5};
6use anyhow::{Context as _, Result};
7use bytes::{Bytes, BytesMut};
8use futures::{prelude::*, stream::FusedStream as _};
9use keynesis_core::{
10 hash::Blake2b,
11 key::{Dh, PublicKey},
12 noise::{TransportReceiveHalf, TransportSendHalf, TransportState},
13};
14use rand_core::{CryptoRng, RngCore};
15use std::{
16 pin::Pin,
17 task::{Context, Poll},
18};
19use tokio::io::{AsyncRead, AsyncWrite};
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22pub struct Handle<I, O, PK>
29where
30 PK: PublicKey,
31{
32 stream: HandleReadHalf<I, PK>,
33 sink: HandleWriteHalf<O, PK>,
34}
35
36pub struct HandleReadHalf<I, PK>
40where
41 PK: PublicKey,
42{
43 none: bool,
44 stream: FramedRead<I, NoiseEncryptedDecoder<PK>>,
45}
46
47pub struct HandleWriteHalf<O, PK>
51where
52 PK: PublicKey,
53{
54 sink: FramedWrite<O, NoiseEncryptedEncoder<PK>>,
55}
56
57impl<I, PK> HandleReadHalf<I, PK>
58where
59 PK: PublicKey,
60 I: AsyncRead,
61{
62 fn new(stream: I, state: TransportReceiveHalf<Blake2b, PK>) -> Self {
63 let stream = FramedRead::new(stream, NoiseEncryptedDecoder::new(state));
64 let none = false;
65
66 Self { stream, none }
67 }
68
69 pub fn remote_public_identity(&self) -> &PK {
72 self.stream.decoder().remote_public_identity()
73 }
74
75 pub fn session_id(&self) -> &SessionId {
80 self.stream.decoder().session_id()
81 }
82}
83
84impl<O, PK> HandleWriteHalf<O, PK>
85where
86 O: AsyncWrite,
87 PK: PublicKey,
88{
89 fn new(stream: O, state: TransportSendHalf<Blake2b, PK>) -> Self {
90 let sink = FramedWrite::new(stream, NoiseEncryptedEncoder::new(state));
91
92 Self { sink }
93 }
94
95 pub fn remote_public_identity(&self) -> &PK {
98 self.sink.encoder().remote_public_identity()
99 }
100
101 pub fn session_id(&self) -> &SessionId {
106 self.sink.encoder().session_id()
107 }
108}
109
110impl<I, O, PK> Handle<I, O, PK>
111where
112 I: AsyncRead + Unpin,
113 O: AsyncWrite + Unpin,
114 PK: PublicKey,
115{
116 pub(crate) fn new(stream: I, sink: O, state: TransportState<Blake2b, PK>) -> Self {
117 let (tsh, trh) = state.split();
118
119 let stream = HandleReadHalf::new(stream, trh);
120 let sink = HandleWriteHalf::new(sink, tsh);
121
122 Self { stream, sink }
123 }
124
125 pub fn split(self) -> (HandleReadHalf<I, PK>, HandleWriteHalf<O, PK>) {
132 (self.stream, self.sink)
133 }
134
135 pub fn accept<K, RNG>(rng: RNG, reader: I, writer: O) -> Accepting<I, O, RNG, K>
138 where
139 K: Dh,
140 RNG: RngCore + CryptoRng,
141 {
142 Accepting::new(rng, reader, writer)
143 }
144
145 pub async fn open<K, RNG>(rng: RNG, k: &K, rs: PK, reader: I, writer: O) -> Result<Self>
156 where
157 K: Dh<Public = PK>,
158 RNG: RngCore + CryptoRng,
159 {
160 let opening = Opening::new(rng, k, rs, reader, writer).await?;
161 opening.wait(k).await
162 }
163
164 #[allow(dead_code)]
167 pub fn remote_public_identity(&self) -> &PK {
168 self.stream.remote_public_identity()
169 }
170
171 pub fn session_id(&self) -> &SessionId {
176 self.stream.session_id()
177 }
178}
179
180impl<I, O, PK> Stream for Handle<I, O, PK>
181where
182 I: AsyncRead + Unpin,
183 O: Unpin,
184 PK: PublicKey,
185{
186 type Item = Result<BytesMut>;
187
188 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189 let handle = self.get_mut();
190 let stream = Pin::new(&mut handle.stream);
191 stream.poll_next(cx)
192 }
193}
194
195impl<I, PK> Stream for HandleReadHalf<I, PK>
196where
197 I: AsyncRead + Unpin,
198 PK: PublicKey,
199{
200 type Item = Result<BytesMut>;
201
202 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203 if self.is_terminated() {
204 return Poll::Ready(None);
205 }
206
207 let handle = self.get_mut();
208 let stream = Pin::new(&mut handle.stream);
209
210 match futures::ready!(stream.poll_next(cx)) {
211 None => {
212 handle.none = true;
213 Poll::Ready(None)
214 }
215 Some(result) => Poll::Ready(Some(result.context("Invalid frame received from peer"))),
216 }
217 }
218}
219
220impl<I, PK> stream::FusedStream for HandleReadHalf<I, PK>
221where
222 I: AsyncRead + Unpin,
223 PK: PublicKey,
224{
225 fn is_terminated(&self) -> bool {
226 self.none
227 }
228}
229
230impl<I, O, PK> stream::FusedStream for Handle<I, O, PK>
231where
232 I: AsyncRead + Unpin,
233 O: Unpin,
234 PK: PublicKey,
235{
236 fn is_terminated(&self) -> bool {
237 self.stream.is_terminated()
238 }
239}
240
241impl<I, O, PK> Sink<Bytes> for Handle<I, O, PK>
242where
243 I: Unpin,
244 O: AsyncWrite + Unpin,
245 PK: PublicKey,
246{
247 type Error = anyhow::Error;
248
249 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
250 let handle = self.get_mut();
251 Pin::new(&mut handle.sink).start_send(item)
252 }
253
254 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
255 let handle = self.get_mut();
256 Pin::new(&mut handle.sink).poll_ready(cx)
257 }
258
259 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
260 let handle = self.get_mut();
261 Pin::new(&mut handle.sink).poll_close(cx)
262 }
263
264 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
265 let handle = self.get_mut();
266 Pin::new(&mut handle.sink).poll_flush(cx)
267 }
268}
269
270impl<O, PK> Sink<Bytes> for HandleWriteHalf<O, PK>
271where
272 O: AsyncWrite + Unpin,
273 PK: PublicKey,
274{
275 type Error = anyhow::Error;
276
277 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
278 let handle = self.get_mut();
279
280 Pin::new(&mut handle.sink)
281 .start_send(item)
282 .context("Cannot send the encrypted data to the handle")?;
283
284 Ok(())
285 }
286
287 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
288 let handle = self.get_mut();
289 match Pin::new(&mut handle.sink).poll_ready(cx) {
290 Poll::Pending => Poll::Pending,
291 Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_ready the handle")),
292 }
293 }
294
295 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
296 let handle = self.get_mut();
297 match Pin::new(&mut handle.sink).poll_close(cx) {
298 Poll::Pending => Poll::Pending,
299 Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_close the handle")),
300 }
301 }
302
303 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
304 let handle = self.get_mut();
305 match Pin::new(&mut handle.sink).poll_flush(cx) {
306 Poll::Pending => Poll::Pending,
307 Poll::Ready(result) => Poll::Ready(result.context("Cannot poll_flush the handle")),
308 }
309 }
310}