stratum_server/
connection.rs

1use crate::{frame::Request, session::SendInformation, types::ConnectionID, Error, Frame, Result};
2use bytes::BytesMut;
3use std::net::SocketAddr;
4use tokio::{
5    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
6    net::{
7        tcp::{OwnedReadHalf, OwnedWriteHalf},
8        TcpStream,
9    },
10    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
11    task::JoinHandle,
12};
13use tokio_util::sync::CancellationToken;
14use tracing::trace;
15
16//@todo convert this to return ConnectionWriter to be used in Sessions.
17
18#[derive(Debug)]
19pub struct Connection {
20    _id: ConnectionID,
21    writer: OwnedWriteHalf,
22    reader: BufReader<OwnedReadHalf>,
23    cancel_token: CancellationToken,
24
25    //@todo implement this, but move to it Reader.
26    // The buffer for reading frames.
27    _buffer: BytesMut,
28    pub(crate) address: SocketAddr,
29}
30
31impl Connection {
32    pub(crate) fn new(
33        id: ConnectionID,
34        socket: TcpStream,
35        cancel_token: CancellationToken,
36    ) -> Result<Self> {
37        let addr = socket.peer_addr()?;
38
39        let (read_half, write_half) = socket.into_split();
40
41        Ok(Connection {
42            _id: id,
43            address: addr,
44            writer: write_half,
45            reader: BufReader::new(read_half),
46            cancel_token,
47            _buffer: BytesMut::new(),
48        })
49    }
50
51    pub(crate) fn init(
52        self,
53    ) -> (
54        ConnectionReader,
55        UnboundedSender<SendInformation>,
56        JoinHandle<Result<()>>,
57    ) {
58        let reader = ConnectionReader {
59            reader: self.reader,
60        };
61
62        let (tx, rx): (
63            UnboundedSender<SendInformation>,
64            UnboundedReceiver<SendInformation>,
65        ) = unbounded_channel();
66
67        //@todo let's review this thoroughly.
68        //@todo I think that we need to return this thread so it can be joined.
69        let cancel_token = self.cancel_token.clone();
70        let handle =
71            tokio::spawn(async move { write_message(cancel_token, rx, self.writer).await });
72
73        (reader, tx, handle)
74    }
75
76    //@todo this prob panics in multiple scenarios, so this really needs to be cleaned up.
77    //@todo polish this up and support both v1 and v2.
78    pub(crate) async fn proxy_protocol(&mut self) -> Result<SocketAddr> {
79        let mut buf = String::new();
80
81        //@todo This may be the memory leak here.
82        // Check for Proxy Protocol.
83        self.reader.read_line(&mut buf).await?;
84
85        //Buf will be of the format "PROXY TCP4 92.118.161.17 172.20.42.228 55867 8080\r\n"
86        //Trim the \r\n off
87        let buf = buf.trim();
88        //Might want to not be ascii whitespace and just normal here.
89        // let pieces = buf.split_ascii_whitespace();
90
91        let pieces: Vec<&str> = buf.split(' ').collect();
92
93        Ok(format!("{}:{}", pieces[2], pieces[4]).parse()?)
94    }
95}
96
97async fn write_message(
98    cancel_token: CancellationToken,
99    mut rx: UnboundedReceiver<SendInformation>,
100    mut writer: OwnedWriteHalf,
101) -> Result<()> {
102    //@todo move cancel_token.cancelled() into the select loop oh wait it is, weird I guess this
103    //works just review again?
104    while !cancel_token.is_cancelled() {
105        tokio::select! {
106            Some(msg) = rx.recv() => {
107                match msg {
108                    SendInformation::Json(json) => {
109                        writer.write_all(json.as_bytes()).await?;
110                        writer.write_all(b"\n").await?;
111                    }
112                    SendInformation::Text(text) => {
113                        writer.write_all(text.as_bytes()).await?;
114                    }
115                    SendInformation::Raw(buffer) => {
116                        writer.write_all(&buffer).await?;
117                    }
118                }
119            }
120            () = cancel_token.cancelled() => {
121                //@todo reword this
122                trace!("write loop hit cancellation token.");
123
124                //Return Err
125                    return Ok(());
126            }
127            else => {
128            //Return Err
129                return Ok(());
130            }
131        }
132    }
133
134    Ok(())
135}
136
137//@todo inhouse a buffer here, but for now this works I suppose.
138pub struct ConnectionReader {
139    reader: BufReader<OwnedReadHalf>,
140}
141
142impl ConnectionReader {
143    pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
144        loop {
145            let mut buf = String::new();
146            if 0 == self.reader.read_line(&mut buf).await? {
147                if self.reader.buffer().is_empty() {
148                    return Ok(None);
149                }
150                return Err(Error::PeerResetConnection);
151            }
152
153            if !buf.is_empty() {
154                //@smells
155                buf = buf.trim().to_owned();
156
157                //@todo when revamping logging, put connection id into here.
158                trace!("Received Message: {}", &buf);
159
160                if buf.is_empty() {
161                    continue;
162                }
163
164                //@todo I think we may want to log the buf here if it fails on trace - Right now we
165                //can't see what these connections are sending.
166                let msg: Request = serde_json::from_str(&buf)?;
167
168                return Ok(Some(Frame::V1(msg)));
169            }
170        }
171    }
172}
173
174//@todo RUN tests here with a bunch of different scenarios, including bad messages, not using proxy
175//protocol, etc.