async_protocol/
connection.rs

1use std::io::Cursor;
2
3use protocol::wire::middleware::pipeline::{self, Pipeline};
4use protocol::{Error, Parcel, Settings};
5use tokio::io::{split, AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
6
7use super::transport::{Simple, Transport};
8
9/// Attempts to receive a packet.
10async fn receive_packet<P: Parcel, S: AsyncRead + Send + Unpin>(
11    transport: &mut Simple,
12    stream: &mut S,
13    settings: &Settings,
14    middleware: &mut pipeline::Default,
15) -> Result<Option<P>, Error> {
16    transport.process_data(stream, &settings).await?;
17
18    if let Some(raw_packet) = transport.receive_raw_packet().await? {
19        let mut packet_data = Cursor::new(middleware.decode_data(raw_packet)?);
20
21        let packet = P::read(&mut packet_data, settings)?;
22
23        Ok(Some(packet))
24    } else {
25        Ok(None)
26    }
27}
28
29/// Sends a packet.
30async fn send_packet<P: Parcel, S: AsyncWrite + Send + Unpin>(
31    transport: &mut Simple,
32    stream: &mut S,
33    settings: &Settings,
34    middleware: &mut pipeline::Default,
35    packet: &P,
36) -> Result<(), Error> {
37    let raw_packet = middleware.encode_data(packet.raw_bytes(settings)?)?;
38    transport
39        .send_raw_packet(stream, &raw_packet, settings)
40        .await
41}
42
43/// A stream-based connection.
44#[derive(Debug)]
45pub struct Connection<P: Parcel, S: AsyncRead + AsyncWrite + Send + Unpin> {
46    pub stream: S,
47    pub transport: Simple,
48    pub middleware: pipeline::Default,
49    pub settings: Settings,
50
51    pub _parcel: std::marker::PhantomData<P>,
52}
53
54impl<P, S> Connection<P, S>
55where
56    P: Parcel,
57    S: AsyncRead + AsyncWrite + Send + Unpin,
58{
59    /// Creates a new connection.
60    pub fn new(stream: S, settings: Settings) -> Self {
61        Self {
62            stream,
63            transport: Simple::new(),
64            middleware: pipeline::default(),
65            settings,
66            _parcel: std::marker::PhantomData,
67        }
68    }
69
70    /// Attempts to receive a packet.
71    pub async fn receive_packet(&mut self) -> Result<Option<P>, Error> {
72        receive_packet(
73            &mut self.transport,
74            &mut self.stream,
75            &self.settings,
76            &mut self.middleware,
77        )
78        .await
79    }
80
81    /// Sends a packet.
82    pub async fn send_packet(&mut self, packet: &P) -> Result<(), Error> {
83        send_packet(
84            &mut self.transport,
85            &mut self.stream,
86            &self.settings,
87            &mut self.middleware,
88            packet,
89        )
90        .await
91    }
92
93    pub fn into_inner(self) -> S {
94        self.stream
95    }
96
97    /// Split Connection into ReceiverConnection and SendConnection
98    pub fn split(self) -> (ReceiveConnection<P, S>, SendConnection<P, S>) {
99        let settings = self.settings.clone();
100        let (receiver, sender) = split(self.into_inner());
101
102        (
103            ReceiveConnection::new(receiver, settings.clone()),
104            SendConnection::new(sender, settings),
105        )
106    }
107}
108
109/// A stream-based connection.
110#[derive(Debug)]
111pub struct SendConnection<P: Parcel, S: AsyncWrite + Send + Unpin> {
112    pub writer: WriteHalf<S>,
113    pub transport: Simple,
114    pub middleware: pipeline::Default,
115    pub settings: Settings,
116
117    pub _parcel: std::marker::PhantomData<P>,
118}
119
120impl<P, S> SendConnection<P, S>
121where
122    P: Parcel,
123    S: AsyncWrite + Send + Unpin,
124{
125    /// Creates a new connection.
126    pub fn new(writer: WriteHalf<S>, settings: Settings) -> Self {
127        Self {
128            writer,
129            transport: Simple::new(),
130            middleware: pipeline::default(),
131            settings,
132            _parcel: std::marker::PhantomData,
133        }
134    }
135
136    /// Sends a packet.
137    pub async fn send_packet(&mut self, packet: &P) -> Result<(), Error> {
138        send_packet(
139            &mut self.transport,
140            &mut self.writer,
141            &self.settings,
142            &mut self.middleware,
143            packet,
144        )
145        .await
146    }
147
148    pub fn into_inner(self) -> WriteHalf<S> {
149        self.writer
150    }
151}
152
153/// A stream-based connection.
154#[derive(Debug)]
155pub struct ReceiveConnection<P: Parcel, S: AsyncRead + Send + Unpin> {
156    pub reader: ReadHalf<S>,
157    pub transport: Simple,
158    pub middleware: pipeline::Default,
159    pub settings: Settings,
160
161    pub _parcel: std::marker::PhantomData<P>,
162}
163
164impl<P, S> ReceiveConnection<P, S>
165where
166    P: Parcel,
167    S: AsyncRead + Send + Unpin,
168{
169    /// Creates a new connection.
170    pub fn new(reader: ReadHalf<S>, settings: Settings) -> Self {
171        Self {
172            reader,
173            transport: Simple::new(),
174            middleware: pipeline::default(),
175            settings,
176            _parcel: std::marker::PhantomData,
177        }
178    }
179
180    /// Attempts to receive a packet.
181    pub async fn receive_packet(&mut self) -> Result<Option<P>, Error> {
182        receive_packet(
183            &mut self.transport,
184            &mut self.reader,
185            &self.settings,
186            &mut self.middleware,
187        )
188        .await
189    }
190
191    pub fn into_inner(self) -> ReadHalf<S> {
192        self.reader
193    }
194}