1use std::net::SocketAddr;
2
3use bytes::{Buf, BytesMut};
4use tokio::{
5 io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf, split},
6 net::{TcpListener, TcpStream},
7};
8
9use crate::{
10 TpktConnection, TpktError, TpktReader, TpktRecvResult, TpktWriter,
11 parser::{TpktParser, TpktParserResult},
12 serialiser::TpktSerialiser,
13};
14
15pub struct TcpTpktService {}
16
17pub struct TcpTpktServer {
18 listener: TcpListener,
19}
20
21impl TcpTpktServer {
22 pub async fn listen(address: SocketAddr) -> Result<Self, TpktError> {
23 Ok(Self { listener: TcpListener::bind(address).await? })
24 }
25
26 pub async fn accept<'a>(&self) -> Result<(TcpTpktConnection, SocketAddr), TpktError> {
27 let (stream, remote_host) = self.listener.accept().await?;
28 let (reader, writer) = split(stream);
29 Ok((TcpTpktConnection::new(TcpTpktReader::new(reader), TcpTpktWriter::new(writer)), remote_host))
30 }
31}
32
33pub struct TcpTpktConnection {
34 reader: TcpTpktReader,
35 writer: TcpTpktWriter,
36}
37
38impl TcpTpktConnection {
39 pub async fn connect<'a>(address: SocketAddr) -> Result<TcpTpktConnection, TpktError> {
40 let stream = TcpStream::connect(address).await?;
41 let (reader, writer) = split(stream);
42 return Ok(TcpTpktConnection::new(TcpTpktReader::new(reader), TcpTpktWriter::new(writer)));
43 }
44
45 fn new(reader: TcpTpktReader, writer: TcpTpktWriter) -> Self {
46 TcpTpktConnection { reader, writer }
47 }
48}
49
50impl TpktConnection for TcpTpktConnection {
51 async fn split(self) -> Result<(impl TpktReader, impl TpktWriter), TpktError> {
52 Ok((self.reader, self.writer))
53 }
54}
55
56pub struct TcpTpktReader {
57 parser: TpktParser,
58 receive_buffer: BytesMut,
59 reader: ReadHalf<TcpStream>,
60}
61
62impl TcpTpktReader {
63 pub fn new(reader: ReadHalf<TcpStream>) -> Self {
64 Self {
65 reader,
66 parser: TpktParser::new(),
67 receive_buffer: BytesMut::new(),
68 }
69 }
70}
71
72impl TpktReader for TcpTpktReader {
73 async fn recv(&mut self) -> Result<TpktRecvResult, TpktError> {
74 loop {
75 let buffer = &mut self.receive_buffer;
76 match self.parser.parse(buffer) {
77 Ok(TpktParserResult::Data(x)) => return Ok(TpktRecvResult::Data(x)),
78 Ok(TpktParserResult::InProgress) => (),
79 Err(x) => return Err(x),
80 };
81 if self.reader.read_buf(buffer).await? == 0 {
82 return Ok(TpktRecvResult::Closed);
83 };
84 }
85 }
86}
87
88pub struct TcpTpktWriter {
89 write_buffer: BytesMut,
90 serialiser: TpktSerialiser,
91 writer: WriteHalf<TcpStream>,
92}
93
94impl TcpTpktWriter {
95 pub fn new(writer: WriteHalf<TcpStream>) -> Self {
96 Self {
97 serialiser: TpktSerialiser::new(),
98 writer,
99 write_buffer: BytesMut::new(),
100 }
101 }
102}
103
104impl TpktWriter for TcpTpktWriter {
105 async fn send(&mut self, data: &[u8]) -> Result<(), TpktError> {
106 self.write_buffer.extend(self.serialiser.serialise(data)?);
107 while self.write_buffer.has_remaining() {
108 self.writer.write_buf(&mut self.write_buffer).await?;
109 }
110 Ok(())
111 }
112
113 async fn continue_send(&mut self) -> Result<(), TpktError> {
114 while self.write_buffer.has_remaining() {
115 self.writer.write_buf(&mut self.write_buffer).await?;
116 }
117 Ok(())
118 }
119}