vfp/
protocol.rs

1use std::{collections::HashMap, sync::Arc, fmt::Display};
2
3use tokio::{sync::{mpsc, Mutex}, task::JoinHandle};
4
5use super::tcp;
6
7#[derive(Debug)]
8pub enum NxError {
9    Io(tcp::IoError),
10    Closed,
11    IdFormatInvalid,
12}
13
14impl Display for NxError {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        match self {
17            NxError::Io(e) => write!(f, "IoError: {}", e),
18            NxError::Closed => write!(f, "Closed"),
19            NxError::IdFormatInvalid => write!(f, "IdFormatInvalid"),
20        }
21    }
22}
23
24pub async fn connect(addr: &str) -> Result<NxClient,NxError> {
25    NxClient::new(addr).await
26}
27
28pub async fn listen(port: &u16) -> Result<NxListener,NxError> {
29    NxListener::listen(port).await
30}
31
32fn append_id(id: u128, message: Vec<u8>) -> Vec<u8> {
33    let mut message = message;
34    message.splice(0..0, id.to_be_bytes().iter().cloned());
35    message
36}
37
38fn shift_id(mut message: Vec<u8>) -> Result<(u128,Vec<u8>),NxError> {
39    if message.len() < 16 {
40        return Err(NxError::IdFormatInvalid);
41    }
42    let id = message.drain(0..16).collect::<Vec<u8>>();
43    if id.len() != 16 {
44        return Err(NxError::IdFormatInvalid);
45    }
46    let id = u128::from_be_bytes(id.as_slice().try_into().unwrap());
47    Ok((id, message))
48}
49
50pub type NxPoolHashMap = HashMap<u128, mpsc::Sender<Vec<u8>>>;
51pub type NxPool = Arc<Mutex<NxPoolHashMap>>;
52
53pub struct NxBoxReader {
54    pub id: u128,
55    pub receiver: mpsc::Receiver<Vec<u8>>,
56}
57
58impl NxBoxReader {
59    pub fn new(id: u128, receiver: mpsc::Receiver<Vec<u8>>) -> Self {
60        Self {
61            id,
62            receiver,
63        }
64    }
65    pub async fn read(&mut self) -> Result<Vec<u8>,NxError> {
66        match self.receiver.recv().await {
67            Some(message) => Ok(message),
68            None => Err(NxError::Closed),
69        }
70    }
71}
72
73pub enum NxWriteChannel {
74    Write(Vec<u8>),
75    Close,
76}
77
78pub struct NxBoxWriter {
79    pub id: u128,
80    pub writechannel: mpsc::Sender<NxWriteChannel>,
81}
82
83impl NxBoxWriter {
84    pub fn new(id: u128, writechannel: mpsc::Sender<NxWriteChannel>) -> Self {
85        Self {
86            id,
87            writechannel,
88        }
89    }
90    pub async fn write(&mut self, mut message: Vec<u8>) -> Result<(),NxError> {
91        message = append_id(self.id, message);
92        self.writechannel.send(NxWriteChannel::Write(message)).await.map_err(|_| NxError::Closed)
93    }
94    pub async fn close(self) -> Result<(),NxError> {
95        self.writechannel.send(NxWriteChannel::Close).await.map_err(|_| NxError::Closed)
96    }
97}
98
99pub struct NxBoxIo {
100    pub id: u128,
101    pub writer: NxBoxWriter,
102    pub reader: NxBoxReader,
103}
104
105impl NxBoxIo {
106    pub fn new(id: u128, writer: NxBoxWriter, reader: NxBoxReader) -> Self {
107        Self {
108            id,
109            writer,
110            reader,
111        }
112    }
113    pub async fn read(&mut self) -> Result<Vec<u8>,NxError> {
114        self.reader.read().await
115    }
116    pub async fn write(&mut self, message: Vec<u8>) -> Result<(),NxError> {
117        self.writer.write(message).await
118    }
119    pub async fn close(self) -> Result<(),NxError> {
120        self.writer.close().await
121    }
122}
123
124pub struct NxReader {
125    pool: NxPool,
126    receiver: tokio::task::JoinHandle<()>,
127    newidreceiver: mpsc::Receiver<(u128,Vec<u8>)>,
128}
129
130impl NxReader {
131    pub fn new(mut reader: tcp::TcpReader) -> Self {
132        let pool = Arc::new(Mutex::new(HashMap::new() as NxPoolHashMap));
133        let pool_clone = pool.clone();
134        let (newidnotify, newidreceiver) = mpsc::channel(32);
135        let receiver = tokio::spawn(async move {
136            loop {
137                let message = match reader.read().await {
138                    Some(message) => message,
139                    None => {
140                        break;
141                    }
142                };
143                let message = match message {
144                    Ok(message) => message,
145                    Err(_) => {
146                        continue;
147                    }
148                };
149                let (id, message) = match shift_id(message.to_vec()) {
150                    Ok((id, message)) => (id, message),
151                    Err(_) => {
152                        continue;
153                    }
154                };
155                let mut pool = pool_clone.lock().await;
156                let sender = match pool.get_mut(&id) {
157                    Some(sender) => sender,
158                    None => {
159                        newidnotify.send((id,message)).await.unwrap();
160                        continue;
161                    }
162                };
163                match sender.send(message).await {
164                    Ok(_) => {}
165                    Err(_) => {
166                        continue;
167                    }
168                }
169            }
170        });
171        Self {
172            pool,
173            receiver,
174            newidreceiver,
175        }
176    }
177    pub async fn open_send(&mut self, id: u128, message: Vec<u8>) -> mpsc::Receiver<Vec<u8>> {
178        let (sender, receiver) = mpsc::channel(32);
179        let mut pool = self.pool.lock().await;
180        match sender.send(message).await {
181            Ok(_) => {}
182            Err(_) => {
183            }
184        };
185        pool.insert(id, sender);
186        receiver
187    }
188    pub async fn open(&mut self, id: u128) -> mpsc::Receiver<Vec<u8>> {
189        let (sender, receiver) = mpsc::channel(32);
190        let mut pool = self.pool.lock().await;
191        pool.insert(id, sender);
192        receiver
193    }
194    pub async fn next<'a>(&'a mut self, writer: &'a mut NxWriter) -> Option<NxBoxIo> {
195        let id = self.newidreceiver.recv().await;
196        let (id,message) = match id {
197            Some(id) => id,
198            None => return None,
199        };
200        let reader = self.open_send(id, message).await;
201        let reader = NxBoxReader::new(id, reader);
202        let writer = writer.sender.clone();
203        let writer = NxBoxWriter::new(id, writer);
204        Some(NxBoxIo::new(id, writer, reader))
205    }
206    pub async fn close_id(&mut self, id: u128) {
207        let mut pool = self.pool.lock().await;
208        pool.remove(&id);
209    }
210    pub fn close(&mut self) {
211        self.receiver.abort();
212    }
213}
214
215pub struct NxWriter {
216    pub receiver: JoinHandle<()>,
217    pub sender: mpsc::Sender<NxWriteChannel>,
218}
219
220impl NxWriter {
221    pub fn new(mut writer: tcp::TcpWriter) -> Self {
222        let (sender, mut receiver) = mpsc::channel::<NxWriteChannel>(32);
223        let receiver = tokio::spawn(async move {
224            loop {
225                let message = match receiver.recv().await {
226                    Some(message) => message,
227                    None => {
228                        break;
229                    }
230                };
231                match message {
232                    NxWriteChannel::Write(message) => {
233                        match writer.send(message).await {
234                            Ok(_) => {}
235                            Err(_) => {
236                                break;
237                            }
238                        }
239                    }
240                    NxWriteChannel::Close => {
241                        break;
242                    }
243                }
244            }
245        });
246        Self {
247            sender,
248            receiver,
249        }
250    }
251    pub async fn close(&mut self) -> Result<(),NxError> {
252        self.sender.send(NxWriteChannel::Close).await.map_err(|_| NxError::Closed)
253    }
254}
255
256pub struct NxSession {
257    writer: NxWriter,
258    reader: NxReader,
259}
260
261impl NxSession {
262    pub fn new(reader: NxReader, writer: NxWriter) -> Self {
263        Self {
264            writer,
265            reader,
266        }
267    }
268    pub async fn next(&mut self) -> Option<NxBoxIo> {
269        self.reader.next(&mut self.writer).await
270    }
271}
272
273pub struct NxClient {
274    writer: NxWriter,
275    reader: NxReader,
276}
277
278impl NxClient {
279    pub async fn new(addr: &str) -> Result<Self,NxError> {
280        let io = tcp::connect(addr).await;
281        let io = match io {
282            Ok(io) => io,
283            Err(e) => return Err(NxError::Io(e)),
284        };
285        let (reader, writer) = io;
286        let reader = NxReader::new(reader);
287        let writer = NxWriter::new(writer);
288        Ok(Self {
289            writer,
290            reader,
291        })
292    }
293    pub async fn open(&mut self) -> Result<NxBoxIo,NxError> {
294        let id = uuid::Uuid::new_v4().as_u128();
295        let reader = NxBoxReader::new(id, self.reader.open(id).await);
296        let sender = self.writer.sender.clone();
297        let writer = NxBoxWriter::new(id, sender);
298        let client = NxBoxIo::new(id, writer, reader);
299        Ok(client)
300    }
301    pub async fn close(&mut self) -> Result<(),NxError> {
302        self.writer.close().await
303    }
304}
305pub struct NxListener {
306    listener: tcp::TcpListener,
307}
308
309impl NxListener {
310    pub async fn accept(&mut self) -> Result<(NxSession,tcp::TcpAddr),NxError> {
311        match self.listener.accept().await {
312            Ok((reader,writer,addr)) => Ok((NxSession::new(NxReader::new(reader),NxWriter::new(writer)),addr)),
313            Err(e) => Err(NxError::Io(e)),
314        }
315    }
316    pub async fn listen(port: &u16) -> Result<Self,NxError> {
317        let listener = match tcp::listen(port).await {
318            Ok(listener) => listener,
319            Err(e) => {
320                return Err(NxError::Io(e));
321            }
322        };
323        Ok(Self { listener })
324    }
325}