Skip to main content

rusty_tpkt/
service.rs

1use std::{collections::VecDeque, net::SocketAddr};
2
3use bytes::{Buf, BytesMut};
4use tokio::{
5    io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf, split},
6    net::{TcpListener, TcpStream},
7};
8
9use crate::{
10    TpktConnection, TpktError, TpktReader, TpktRecvResult, TpktWriter,
11    parser::{TpktParser, TpktParserResult},
12    serialiser::TpktSerialiser,
13};
14
15pub struct TcpTpktService {}
16
17pub struct TcpTpktServer {
18    listener: TcpListener,
19}
20
21impl TcpTpktServer {
22    pub async fn listen(address: SocketAddr) -> Result<Self, TpktError> {
23        Ok(Self { listener: TcpListener::bind(address).await? })
24    }
25
26    pub async fn accept<'a>(&self) -> Result<(TcpTpktConnection, SocketAddr), TpktError> {
27        let (stream, remote_host) = self.listener.accept().await?;
28        let (reader, writer) = split(stream);
29        Ok((TcpTpktConnection::new(TcpTpktReader::new(reader), TcpTpktWriter::new(writer)), remote_host))
30    }
31}
32
33pub struct TcpTpktConnection {
34    reader: TcpTpktReader,
35    writer: TcpTpktWriter,
36}
37
38impl TcpTpktConnection {
39    pub async fn connect<'a>(address: SocketAddr) -> Result<TcpTpktConnection, TpktError> {
40        let stream = TcpStream::connect(address).await?;
41        let (reader, writer) = split(stream);
42        return Ok(TcpTpktConnection::new(TcpTpktReader::new(reader), TcpTpktWriter::new(writer)));
43    }
44
45    fn new(reader: TcpTpktReader, writer: TcpTpktWriter) -> Self {
46        TcpTpktConnection { reader, writer }
47    }
48}
49
50impl TpktConnection for TcpTpktConnection {
51    async fn split(self) -> Result<(impl TpktReader, impl TpktWriter), TpktError> {
52        Ok((self.reader, self.writer))
53    }
54}
55
56pub struct TcpTpktReader {
57    parser: TpktParser,
58    receive_buffer: BytesMut,
59    reader: ReadHalf<TcpStream>,
60}
61
62impl TcpTpktReader {
63    pub fn new(reader: ReadHalf<TcpStream>) -> Self {
64        Self { reader, parser: TpktParser::new(), receive_buffer: BytesMut::new() }
65    }
66}
67
68impl TpktReader for TcpTpktReader {
69    async fn recv(&mut self) -> Result<TpktRecvResult, TpktError> {
70        loop {
71            let buffer = &mut self.receive_buffer;
72            match self.parser.parse(buffer) {
73                Ok(TpktParserResult::Data(x)) => return Ok(TpktRecvResult::Data(x)),
74                Ok(TpktParserResult::InProgress) => (),
75                Err(x) => return Err(x),
76            };
77            if self.reader.read_buf(buffer).await? == 0 {
78                return Ok(TpktRecvResult::Closed);
79            };
80        }
81    }
82}
83
84pub struct TcpTpktWriter {
85    write_buffer: BytesMut,
86    serialiser: TpktSerialiser,
87    writer: WriteHalf<TcpStream>,
88}
89
90impl TcpTpktWriter {
91    pub fn new(writer: WriteHalf<TcpStream>) -> Self {
92        Self { serialiser: TpktSerialiser::new(), writer, write_buffer: BytesMut::new() }
93    }
94}
95
96impl TpktWriter for TcpTpktWriter {
97    async fn send(&mut self, input: &mut VecDeque<Vec<u8>>) -> Result<(), TpktError> {
98        while let Some(packet) = input.pop_front() {
99            self.write_buffer.extend(self.serialiser.serialise(&packet)?);
100        }
101
102        while self.write_buffer.has_remaining() {
103            self.writer.write_buf(&mut self.write_buffer).await?;
104        }
105        Ok(())
106    }
107}