1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::time::Duration;
use anyhow::{Context, Result};
use futures_util::{SinkExt, StreamExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::io::{self, AsyncRead, AsyncWrite};
use tokio::time::timeout;
use tokio_util::codec::{AnyDelimiterCodec, Framed, FramedParts};
use tracing::trace;
use uuid::Uuid;
pub const CONTROL_PORT: u16 = 7835;
pub const MAX_FRAME_LENGTH: usize = 256;
pub const NETWORK_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientMessage {
Authenticate(String),
Hello(u16),
Accept(Uuid),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerMessage {
Challenge(Uuid),
Hello(u16),
Heartbeat,
Connection(Uuid),
Error(String),
}
pub struct Delimited<U>(Framed<U, AnyDelimiterCodec>);
impl<U: AsyncRead + AsyncWrite + Unpin> Delimited<U> {
pub fn new(stream: U) -> Self {
let codec = AnyDelimiterCodec::new_with_max_length(vec![0], vec![0], MAX_FRAME_LENGTH);
Self(Framed::new(stream, codec))
}
pub async fn recv<T: DeserializeOwned>(&mut self) -> Result<Option<T>> {
trace!("waiting to receive json message");
if let Some(next_message) = self.0.next().await {
let byte_message = next_message.context("frame error, invalid byte length")?;
let serialized_obj =
serde_json::from_slice(&byte_message).context("unable to parse message")?;
Ok(serialized_obj)
} else {
Ok(None)
}
}
pub async fn recv_timeout<T: DeserializeOwned>(&mut self) -> Result<Option<T>> {
timeout(NETWORK_TIMEOUT, self.recv())
.await
.context("timed out waiting for initial message")?
}
pub async fn send<T: Serialize>(&mut self, msg: T) -> Result<()> {
trace!("sending json message");
self.0.send(serde_json::to_string(&msg)?).await?;
Ok(())
}
pub fn into_parts(self) -> FramedParts<U, AnyDelimiterCodec> {
self.0.into_parts()
}
}
pub async fn proxy<S1, S2>(stream1: S1, stream2: S2) -> io::Result<()>
where
S1: AsyncRead + AsyncWrite + Unpin,
S2: AsyncRead + AsyncWrite + Unpin,
{
let (mut s1_read, mut s1_write) = io::split(stream1);
let (mut s2_read, mut s2_write) = io::split(stream2);
tokio::select! {
res = io::copy(&mut s1_read, &mut s2_write) => res,
res = io::copy(&mut s2_read, &mut s1_write) => res,
}?;
Ok(())
}