1use std::{collections::HashMap, sync::Arc, fmt::Display};
2
3use tokio::{sync::{mpsc, Mutex}, task::JoinHandle};
4
5use super::tcp;
6
7#[derive(Debug)]
8pub enum NxError {
9 Io(tcp::IoError),
10 Closed,
11 IdFormatInvalid,
12}
13
14impl Display for NxError {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 match self {
17 NxError::Io(e) => write!(f, "IoError: {}", e),
18 NxError::Closed => write!(f, "Closed"),
19 NxError::IdFormatInvalid => write!(f, "IdFormatInvalid"),
20 }
21 }
22}
23
24pub async fn connect(addr: &str) -> Result<NxClient,NxError> {
25 NxClient::new(addr).await
26}
27
28pub async fn listen(port: &u16) -> Result<NxListener,NxError> {
29 NxListener::listen(port).await
30}
31
32fn append_id(id: u128, message: Vec<u8>) -> Vec<u8> {
33 let mut message = message;
34 message.splice(0..0, id.to_be_bytes().iter().cloned());
35 message
36}
37
38fn shift_id(mut message: Vec<u8>) -> Result<(u128,Vec<u8>),NxError> {
39 if message.len() < 16 {
40 return Err(NxError::IdFormatInvalid);
41 }
42 let id = message.drain(0..16).collect::<Vec<u8>>();
43 if id.len() != 16 {
44 return Err(NxError::IdFormatInvalid);
45 }
46 let id = u128::from_be_bytes(id.as_slice().try_into().unwrap());
47 Ok((id, message))
48}
49
50pub type NxPoolHashMap = HashMap<u128, mpsc::Sender<Vec<u8>>>;
51pub type NxPool = Arc<Mutex<NxPoolHashMap>>;
52
53pub struct NxBoxReader {
54 pub id: u128,
55 pub receiver: mpsc::Receiver<Vec<u8>>,
56}
57
58impl NxBoxReader {
59 pub fn new(id: u128, receiver: mpsc::Receiver<Vec<u8>>) -> Self {
60 Self {
61 id,
62 receiver,
63 }
64 }
65 pub async fn read(&mut self) -> Result<Vec<u8>,NxError> {
66 match self.receiver.recv().await {
67 Some(message) => Ok(message),
68 None => Err(NxError::Closed),
69 }
70 }
71}
72
73pub enum NxWriteChannel {
74 Write(Vec<u8>),
75 Close,
76}
77
78pub struct NxBoxWriter {
79 pub id: u128,
80 pub writechannel: mpsc::Sender<NxWriteChannel>,
81}
82
83impl NxBoxWriter {
84 pub fn new(id: u128, writechannel: mpsc::Sender<NxWriteChannel>) -> Self {
85 Self {
86 id,
87 writechannel,
88 }
89 }
90 pub async fn write(&mut self, mut message: Vec<u8>) -> Result<(),NxError> {
91 message = append_id(self.id, message);
92 self.writechannel.send(NxWriteChannel::Write(message)).await.map_err(|_| NxError::Closed)
93 }
94 pub async fn close(self) -> Result<(),NxError> {
95 self.writechannel.send(NxWriteChannel::Close).await.map_err(|_| NxError::Closed)
96 }
97}
98
99pub struct NxBoxIo {
100 pub id: u128,
101 pub writer: NxBoxWriter,
102 pub reader: NxBoxReader,
103}
104
105impl NxBoxIo {
106 pub fn new(id: u128, writer: NxBoxWriter, reader: NxBoxReader) -> Self {
107 Self {
108 id,
109 writer,
110 reader,
111 }
112 }
113 pub async fn read(&mut self) -> Result<Vec<u8>,NxError> {
114 self.reader.read().await
115 }
116 pub async fn write(&mut self, message: Vec<u8>) -> Result<(),NxError> {
117 self.writer.write(message).await
118 }
119 pub async fn close(self) -> Result<(),NxError> {
120 self.writer.close().await
121 }
122}
123
124pub struct NxReader {
125 pool: NxPool,
126 receiver: tokio::task::JoinHandle<()>,
127 newidreceiver: mpsc::Receiver<(u128,Vec<u8>)>,
128}
129
130impl NxReader {
131 pub fn new(mut reader: tcp::TcpReader) -> Self {
132 let pool = Arc::new(Mutex::new(HashMap::new() as NxPoolHashMap));
133 let pool_clone = pool.clone();
134 let (newidnotify, newidreceiver) = mpsc::channel(32);
135 let receiver = tokio::spawn(async move {
136 loop {
137 let message = match reader.read().await {
138 Some(message) => message,
139 None => {
140 break;
141 }
142 };
143 let message = match message {
144 Ok(message) => message,
145 Err(_) => {
146 continue;
147 }
148 };
149 let (id, message) = match shift_id(message.to_vec()) {
150 Ok((id, message)) => (id, message),
151 Err(_) => {
152 continue;
153 }
154 };
155 let mut pool = pool_clone.lock().await;
156 let sender = match pool.get_mut(&id) {
157 Some(sender) => sender,
158 None => {
159 newidnotify.send((id,message)).await.unwrap();
160 continue;
161 }
162 };
163 match sender.send(message).await {
164 Ok(_) => {}
165 Err(_) => {
166 continue;
167 }
168 }
169 }
170 });
171 Self {
172 pool,
173 receiver,
174 newidreceiver,
175 }
176 }
177 pub async fn open_send(&mut self, id: u128, message: Vec<u8>) -> mpsc::Receiver<Vec<u8>> {
178 let (sender, receiver) = mpsc::channel(32);
179 let mut pool = self.pool.lock().await;
180 match sender.send(message).await {
181 Ok(_) => {}
182 Err(_) => {
183 }
184 };
185 pool.insert(id, sender);
186 receiver
187 }
188 pub async fn open(&mut self, id: u128) -> mpsc::Receiver<Vec<u8>> {
189 let (sender, receiver) = mpsc::channel(32);
190 let mut pool = self.pool.lock().await;
191 pool.insert(id, sender);
192 receiver
193 }
194 pub async fn next<'a>(&'a mut self, writer: &'a mut NxWriter) -> Option<NxBoxIo> {
195 let id = self.newidreceiver.recv().await;
196 let (id,message) = match id {
197 Some(id) => id,
198 None => return None,
199 };
200 let reader = self.open_send(id, message).await;
201 let reader = NxBoxReader::new(id, reader);
202 let writer = writer.sender.clone();
203 let writer = NxBoxWriter::new(id, writer);
204 Some(NxBoxIo::new(id, writer, reader))
205 }
206 pub async fn close_id(&mut self, id: u128) {
207 let mut pool = self.pool.lock().await;
208 pool.remove(&id);
209 }
210 pub fn close(&mut self) {
211 self.receiver.abort();
212 }
213}
214
215pub struct NxWriter {
216 pub receiver: JoinHandle<()>,
217 pub sender: mpsc::Sender<NxWriteChannel>,
218}
219
220impl NxWriter {
221 pub fn new(mut writer: tcp::TcpWriter) -> Self {
222 let (sender, mut receiver) = mpsc::channel::<NxWriteChannel>(32);
223 let receiver = tokio::spawn(async move {
224 loop {
225 let message = match receiver.recv().await {
226 Some(message) => message,
227 None => {
228 break;
229 }
230 };
231 match message {
232 NxWriteChannel::Write(message) => {
233 match writer.send(message).await {
234 Ok(_) => {}
235 Err(_) => {
236 break;
237 }
238 }
239 }
240 NxWriteChannel::Close => {
241 break;
242 }
243 }
244 }
245 });
246 Self {
247 sender,
248 receiver,
249 }
250 }
251 pub async fn close(&mut self) -> Result<(),NxError> {
252 self.sender.send(NxWriteChannel::Close).await.map_err(|_| NxError::Closed)
253 }
254}
255
256pub struct NxSession {
257 writer: NxWriter,
258 reader: NxReader,
259}
260
261impl NxSession {
262 pub fn new(reader: NxReader, writer: NxWriter) -> Self {
263 Self {
264 writer,
265 reader,
266 }
267 }
268 pub async fn next(&mut self) -> Option<NxBoxIo> {
269 self.reader.next(&mut self.writer).await
270 }
271}
272
273pub struct NxClient {
274 writer: NxWriter,
275 reader: NxReader,
276}
277
278impl NxClient {
279 pub async fn new(addr: &str) -> Result<Self,NxError> {
280 let io = tcp::connect(addr).await;
281 let io = match io {
282 Ok(io) => io,
283 Err(e) => return Err(NxError::Io(e)),
284 };
285 let (reader, writer) = io;
286 let reader = NxReader::new(reader);
287 let writer = NxWriter::new(writer);
288 Ok(Self {
289 writer,
290 reader,
291 })
292 }
293 pub async fn open(&mut self) -> Result<NxBoxIo,NxError> {
294 let id = uuid::Uuid::new_v4().as_u128();
295 let reader = NxBoxReader::new(id, self.reader.open(id).await);
296 let sender = self.writer.sender.clone();
297 let writer = NxBoxWriter::new(id, sender);
298 let client = NxBoxIo::new(id, writer, reader);
299 Ok(client)
300 }
301 pub async fn close(&mut self) -> Result<(),NxError> {
302 self.writer.close().await
303 }
304}
305pub struct NxListener {
306 listener: tcp::TcpListener,
307}
308
309impl NxListener {
310 pub async fn accept(&mut self) -> Result<(NxSession,tcp::TcpAddr),NxError> {
311 match self.listener.accept().await {
312 Ok((reader,writer,addr)) => Ok((NxSession::new(NxReader::new(reader),NxWriter::new(writer)),addr)),
313 Err(e) => Err(NxError::Io(e)),
314 }
315 }
316 pub async fn listen(port: &u16) -> Result<Self,NxError> {
317 let listener = match tcp::listen(port).await {
318 Ok(listener) => listener,
319 Err(e) => {
320 return Err(NxError::Io(e));
321 }
322 };
323 Ok(Self { listener })
324 }
325}