1use std::io::Cursor;
2
3use protocol::wire::middleware::pipeline::{self, Pipeline};
4use protocol::{Error, Parcel, Settings};
5use tokio::io::{split, AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
6
7use super::transport::{Simple, Transport};
8
9async fn receive_packet<P: Parcel, S: AsyncRead + Send + Unpin>(
11 transport: &mut Simple,
12 stream: &mut S,
13 settings: &Settings,
14 middleware: &mut pipeline::Default,
15) -> Result<Option<P>, Error> {
16 transport.process_data(stream, &settings).await?;
17
18 if let Some(raw_packet) = transport.receive_raw_packet().await? {
19 let mut packet_data = Cursor::new(middleware.decode_data(raw_packet)?);
20
21 let packet = P::read(&mut packet_data, settings)?;
22
23 Ok(Some(packet))
24 } else {
25 Ok(None)
26 }
27}
28
29async fn send_packet<P: Parcel, S: AsyncWrite + Send + Unpin>(
31 transport: &mut Simple,
32 stream: &mut S,
33 settings: &Settings,
34 middleware: &mut pipeline::Default,
35 packet: &P,
36) -> Result<(), Error> {
37 let raw_packet = middleware.encode_data(packet.raw_bytes(settings)?)?;
38 transport
39 .send_raw_packet(stream, &raw_packet, settings)
40 .await
41}
42
43#[derive(Debug)]
45pub struct Connection<P: Parcel, S: AsyncRead + AsyncWrite + Send + Unpin> {
46 pub stream: S,
47 pub transport: Simple,
48 pub middleware: pipeline::Default,
49 pub settings: Settings,
50
51 pub _parcel: std::marker::PhantomData<P>,
52}
53
54impl<P, S> Connection<P, S>
55where
56 P: Parcel,
57 S: AsyncRead + AsyncWrite + Send + Unpin,
58{
59 pub fn new(stream: S, settings: Settings) -> Self {
61 Self {
62 stream,
63 transport: Simple::new(),
64 middleware: pipeline::default(),
65 settings,
66 _parcel: std::marker::PhantomData,
67 }
68 }
69
70 pub async fn receive_packet(&mut self) -> Result<Option<P>, Error> {
72 receive_packet(
73 &mut self.transport,
74 &mut self.stream,
75 &self.settings,
76 &mut self.middleware,
77 )
78 .await
79 }
80
81 pub async fn send_packet(&mut self, packet: &P) -> Result<(), Error> {
83 send_packet(
84 &mut self.transport,
85 &mut self.stream,
86 &self.settings,
87 &mut self.middleware,
88 packet,
89 )
90 .await
91 }
92
93 pub fn into_inner(self) -> S {
94 self.stream
95 }
96
97 pub fn split(self) -> (ReceiveConnection<P, S>, SendConnection<P, S>) {
99 let settings = self.settings.clone();
100 let (receiver, sender) = split(self.into_inner());
101
102 (
103 ReceiveConnection::new(receiver, settings.clone()),
104 SendConnection::new(sender, settings),
105 )
106 }
107}
108
109#[derive(Debug)]
111pub struct SendConnection<P: Parcel, S: AsyncWrite + Send + Unpin> {
112 pub writer: WriteHalf<S>,
113 pub transport: Simple,
114 pub middleware: pipeline::Default,
115 pub settings: Settings,
116
117 pub _parcel: std::marker::PhantomData<P>,
118}
119
120impl<P, S> SendConnection<P, S>
121where
122 P: Parcel,
123 S: AsyncWrite + Send + Unpin,
124{
125 pub fn new(writer: WriteHalf<S>, settings: Settings) -> Self {
127 Self {
128 writer,
129 transport: Simple::new(),
130 middleware: pipeline::default(),
131 settings,
132 _parcel: std::marker::PhantomData,
133 }
134 }
135
136 pub async fn send_packet(&mut self, packet: &P) -> Result<(), Error> {
138 send_packet(
139 &mut self.transport,
140 &mut self.writer,
141 &self.settings,
142 &mut self.middleware,
143 packet,
144 )
145 .await
146 }
147
148 pub fn into_inner(self) -> WriteHalf<S> {
149 self.writer
150 }
151}
152
153#[derive(Debug)]
155pub struct ReceiveConnection<P: Parcel, S: AsyncRead + Send + Unpin> {
156 pub reader: ReadHalf<S>,
157 pub transport: Simple,
158 pub middleware: pipeline::Default,
159 pub settings: Settings,
160
161 pub _parcel: std::marker::PhantomData<P>,
162}
163
164impl<P, S> ReceiveConnection<P, S>
165where
166 P: Parcel,
167 S: AsyncRead + Send + Unpin,
168{
169 pub fn new(reader: ReadHalf<S>, settings: Settings) -> Self {
171 Self {
172 reader,
173 transport: Simple::new(),
174 middleware: pipeline::default(),
175 settings,
176 _parcel: std::marker::PhantomData,
177 }
178 }
179
180 pub async fn receive_packet(&mut self) -> Result<Option<P>, Error> {
182 receive_packet(
183 &mut self.transport,
184 &mut self.reader,
185 &self.settings,
186 &mut self.middleware,
187 )
188 .await
189 }
190
191 pub fn into_inner(self) -> ReadHalf<S> {
192 self.reader
193 }
194}