stratum_server/
connection.rs1use crate::{frame::Request, session::SendInformation, types::ConnectionID, Error, Frame, Result};
2use bytes::BytesMut;
3use std::net::SocketAddr;
4use tokio::{
5 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
6 net::{
7 tcp::{OwnedReadHalf, OwnedWriteHalf},
8 TcpStream,
9 },
10 sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
11 task::JoinHandle,
12};
13use tokio_util::sync::CancellationToken;
14use tracing::trace;
15
16#[derive(Debug)]
19pub struct Connection {
20 _id: ConnectionID,
21 writer: OwnedWriteHalf,
22 reader: BufReader<OwnedReadHalf>,
23 cancel_token: CancellationToken,
24
25 _buffer: BytesMut,
28 pub(crate) address: SocketAddr,
29}
30
31impl Connection {
32 pub(crate) fn new(
33 id: ConnectionID,
34 socket: TcpStream,
35 cancel_token: CancellationToken,
36 ) -> Result<Self> {
37 let addr = socket.peer_addr()?;
38
39 let (read_half, write_half) = socket.into_split();
40
41 Ok(Connection {
42 _id: id,
43 address: addr,
44 writer: write_half,
45 reader: BufReader::new(read_half),
46 cancel_token,
47 _buffer: BytesMut::new(),
48 })
49 }
50
51 pub(crate) fn init(
52 self,
53 ) -> (
54 ConnectionReader,
55 UnboundedSender<SendInformation>,
56 JoinHandle<Result<()>>,
57 ) {
58 let reader = ConnectionReader {
59 reader: self.reader,
60 };
61
62 let (tx, rx): (
63 UnboundedSender<SendInformation>,
64 UnboundedReceiver<SendInformation>,
65 ) = unbounded_channel();
66
67 let cancel_token = self.cancel_token.clone();
70 let handle =
71 tokio::spawn(async move { write_message(cancel_token, rx, self.writer).await });
72
73 (reader, tx, handle)
74 }
75
76 pub(crate) async fn proxy_protocol(&mut self) -> Result<SocketAddr> {
79 let mut buf = String::new();
80
81 self.reader.read_line(&mut buf).await?;
84
85 let buf = buf.trim();
88 let pieces: Vec<&str> = buf.split(' ').collect();
92
93 Ok(format!("{}:{}", pieces[2], pieces[4]).parse()?)
94 }
95}
96
97async fn write_message(
98 cancel_token: CancellationToken,
99 mut rx: UnboundedReceiver<SendInformation>,
100 mut writer: OwnedWriteHalf,
101) -> Result<()> {
102 while !cancel_token.is_cancelled() {
105 tokio::select! {
106 Some(msg) = rx.recv() => {
107 match msg {
108 SendInformation::Json(json) => {
109 writer.write_all(json.as_bytes()).await?;
110 writer.write_all(b"\n").await?;
111 }
112 SendInformation::Text(text) => {
113 writer.write_all(text.as_bytes()).await?;
114 }
115 SendInformation::Raw(buffer) => {
116 writer.write_all(&buffer).await?;
117 }
118 }
119 }
120 () = cancel_token.cancelled() => {
121 trace!("write loop hit cancellation token.");
123
124 return Ok(());
126 }
127 else => {
128 return Ok(());
130 }
131 }
132 }
133
134 Ok(())
135}
136
137pub struct ConnectionReader {
139 reader: BufReader<OwnedReadHalf>,
140}
141
142impl ConnectionReader {
143 pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
144 loop {
145 let mut buf = String::new();
146 if 0 == self.reader.read_line(&mut buf).await? {
147 if self.reader.buffer().is_empty() {
148 return Ok(None);
149 }
150 return Err(Error::PeerResetConnection);
151 }
152
153 if !buf.is_empty() {
154 buf = buf.trim().to_owned();
156
157 trace!("Received Message: {}", &buf);
159
160 if buf.is_empty() {
161 continue;
162 }
163
164 let msg: Request = serde_json::from_str(&buf)?;
167
168 return Ok(Some(Frame::V1(msg)));
169 }
170 }
171 }
172}
173
174