relay_man/client/
connection.rs

1use std::{
2    mem::MaybeUninit,
3    net::ToSocketAddrs,
4    sync::{Arc, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard},
5    time::{Duration, SystemTime},
6};
7
8use bytes_kman::TBytes;
9use socket2::{Domain, Protocol, SockAddr, Socket, Type};
10
11use crate::common::{
12    adress::Adress,
13    packets::{
14        Avalibile, InfoRequest, Packets, Register, Request, RequestFinal, RequestResponse, Search,
15    },
16};
17
18use super::response::{self, NewRequestFinal, RequestStage, Response};
19
20#[derive(Debug)]
21pub enum ConnectionError {
22    InvalidIp,
23    HostIsNotAlive,
24    InvalidInfo,
25    InvalidAdress,
26}
27
28pub struct Connection {
29    pub session: usize,
30    pub conn: Socket,
31    pub info: ConnectionInfo,
32    pub last_packet: SystemTime,
33    pub packets: Vec<Packets>,
34    pub adresses: Vec<Adress>,
35}
36
37#[derive(Clone, Debug)]
38pub struct ConnectionInfo {
39    pub client: String,
40    pub name: String,
41    pub public: Vec<u8>,
42    pub other: Vec<u8>,
43    pub privacy: bool,
44}
45
46impl Connection {
47    pub fn new(ip: impl Into<String>, info: ConnectionInfo) -> Result<Self, ConnectionError> {
48        let Ok(mut adress) = format!("{}:2120", ip.into())
49            .to_socket_addrs() else {return Err(ConnectionError::InvalidIp)};
50        let Some(adress) = adress.next() else{return Err(ConnectionError::InvalidIp)};
51        let address_sock = SockAddr::from(adress);
52        let conn = Socket::new(
53            Domain::for_address(adress),
54            Type::STREAM,
55            Some(Protocol::TCP),
56        )
57        .unwrap();
58        if conn.connect(&address_sock).is_err() {
59            return Err(ConnectionError::HostIsNotAlive);
60        }
61
62        let local_addr = conn.local_addr().unwrap().as_socket().unwrap().ip();
63
64        let pak = Packets::Register(Register {
65            client: info.client.clone(),
66            public: info.public.clone(),
67            name: info.name.clone(),
68            other: info.other.clone(),
69            privacy: info.privacy,
70            private_adress: local_addr.to_string(),
71        });
72
73        let mut bytes = pak.to_bytes();
74        bytes.reverse();
75
76        let Ok(_) = conn.send(&bytes) else {
77            return Err(ConnectionError::InvalidInfo);
78        };
79
80        let mut buffer = [MaybeUninit::new(0); 1024];
81
82        let Ok(len) = conn.recv(&mut buffer)else{
83            return Err(ConnectionError::InvalidInfo);
84        };
85
86        let buffer: &[u8] = unsafe { std::mem::transmute(&buffer[0..len]) };
87        let mut buffer = buffer.to_owned();
88        let Some(packet) = Packets::from_bytes(&mut buffer) else{return Err(ConnectionError::InvalidInfo)};
89        let Packets::RegisterResponse(res) = packet else {return Err(ConnectionError::InvalidInfo)};
90
91        if !res.accepted {
92            return Err(ConnectionError::InvalidAdress);
93        }
94
95        conn.set_nonblocking(true).unwrap();
96        let _ = conn.set_recv_buffer_size(1024);
97        let _ = conn.set_send_buffer_size(1024);
98
99        Ok(Self {
100            session: res.session,
101            conn,
102            info,
103            last_packet: SystemTime::now(),
104            packets: Vec::new(),
105            adresses: Vec::new(),
106        })
107    }
108
109    pub fn step(&mut self) {
110        if let Some(packet) = self.recv() {
111            if let Packets::SearchResponse(pak) = &packet {
112                self.adresses = pak.adresses.clone()
113            };
114            self.packets.push(packet)
115        }
116
117        if self.last_packet.elapsed().unwrap() < Duration::from_secs(2) {
118            return;
119        }
120
121        let pak = Packets::Tick {
122            session: self.session,
123        };
124        let mut bytes = pak.to_bytes();
125        bytes.reverse();
126        let _ = self.conn.send(&bytes);
127        self.last_packet = SystemTime::now();
128    }
129
130    pub fn send(&mut self, packet: Packets) {
131        let mut packet = packet;
132
133        // Set session for packages
134        match &mut packet {
135            Packets::UnRegister(pak) => pak.session = self.session,
136            Packets::Search(pak) => pak.session = self.session,
137            Packets::InfoRequest(pak) => pak.session = self.session,
138            Packets::Request(pak) => pak.session = self.session,
139            Packets::RequestResponse(pak) => pak.session = self.session,
140            Packets::Avalibile(pak) => pak.session = self.session,
141            Packets::RequestFinal(pak) => pak.session = self.session,
142            _ => {}
143        }
144
145        let mut bytes = packet.to_bytes();
146        bytes.reverse();
147        let _ = self.conn.send(&bytes);
148        self.last_packet = SystemTime::now()
149    }
150
151    pub fn recv(&self) -> Option<Packets> {
152        let mut buffer = [MaybeUninit::new(0); 1024];
153        if let Ok(len) = self.conn.recv(&mut buffer) {
154            let buffer: &[u8] = unsafe { std::mem::transmute(&buffer[0..len]) };
155            let mut buffer = buffer.to_owned();
156            if let Some(packet) = Packets::from_bytes(&mut buffer) {
157                return Some(packet);
158            }
159        }
160        None
161    }
162}
163
164pub trait TConnection {
165    fn step(&self);
166
167    fn read(&self) -> LockResult<RwLockReadGuard<Connection>>;
168    fn write(&self) -> LockResult<RwLockWriteGuard<Connection>>;
169
170    fn search(&self, search: Search) -> Response<Box<dyn TConnection>, response::SearchResponse>;
171    fn info(&self, adress: &Adress) -> Response<Box<dyn TConnection>, Option<ConnectionInfo>>;
172
173    fn request(
174        &self,
175        adress: &Adress,
176        secret: String,
177    ) -> Response<Box<dyn TConnection>, response::NewRequestResponse>;
178    fn request_response(
179        &self,
180        adress: &Adress,
181        accept: bool,
182    ) -> Response<Box<dyn TConnection>, response::NewRequestFinal>;
183
184    /// `time_offset` should be in nanosecconds
185    fn request_final(
186        &self,
187        adress: &Adress,
188        accept: bool,
189        time_offset: Option<u128>,
190    ) -> Response<Box<dyn TConnection>, response::ConnectOn>;
191    fn add_port(&self, port: u16);
192
193    fn adress(&self) -> Adress;
194
195    fn has_new(&self) -> Option<RequestStage>;
196    fn c(&self) -> Box<dyn TConnection + Send>;
197}
198
199impl TConnection for Arc<RwLock<Connection>> {
200    fn step(&self) {
201        self.write().unwrap().step();
202    }
203
204    fn read(&self) -> LockResult<RwLockReadGuard<Connection>> {
205        RwLock::read(self)
206    }
207
208    fn write(&self) -> LockResult<RwLockWriteGuard<Connection>> {
209        RwLock::write(self)
210    }
211
212    fn search(&self, search: Search) -> Response<Box<dyn TConnection>, response::SearchResponse> {
213        let pak = Packets::Search(search);
214        self.write().unwrap().send(pak.clone());
215
216        Response {
217            connection: Box::new(self.clone()),
218            packets: pak,
219            fn_has: search_fn_has,
220            fn_get: search_fn_get,
221        }
222    }
223
224    fn info(&self, adress: &Adress) -> Response<Box<dyn TConnection>, Option<ConnectionInfo>> {
225        let pak = Packets::InfoRequest(InfoRequest {
226            adress: adress.clone(),
227            session: 0,
228        });
229        self.write().unwrap().send(pak.clone());
230
231        Response {
232            connection: Box::new(self.clone()),
233            packets: pak,
234            fn_has: info_fn_has,
235            fn_get: info_fn_get,
236        }
237    }
238
239    fn request(
240        &self,
241        adress: &Adress,
242        secret: String,
243    ) -> Response<Box<dyn TConnection>, response::NewRequestResponse> {
244        let pak = Packets::Request(Request {
245            session: 0,
246            to: adress.clone(),
247            secret,
248        });
249        self.write().unwrap().send(pak.clone());
250
251        Response {
252            connection: Box::new(self.clone()),
253            packets: pak,
254            fn_has: request_fn_has,
255            fn_get: request_fn_get,
256        }
257    }
258
259    fn request_response(
260        &self,
261        adress: &Adress,
262        accept: bool,
263    ) -> Response<Box<dyn TConnection>, NewRequestFinal> {
264        let pak = Packets::RequestResponse(RequestResponse {
265            session: 0,
266            to: adress.clone(),
267            accepted: accept,
268            secret: String::new(),
269        });
270        self.write().unwrap().send(pak.clone());
271
272        Response {
273            connection: Box::new(self.clone()),
274            packets: pak,
275            fn_has: request_response_fn_has,
276            fn_get: request_response_fn_get,
277        }
278    }
279
280    fn request_final(
281        &self,
282        adress: &Adress,
283        accept: bool,
284        time_offset: Option<u128>,
285    ) -> Response<Box<dyn TConnection>, response::ConnectOn> {
286        let time_offset = match time_offset {
287            Some(s) => s,
288            None => Duration::from_secs(1).as_nanos(),
289        };
290
291        let pak = Packets::RequestFinal(RequestFinal {
292            session: 0,
293            to: adress.clone(),
294            accepted: accept,
295            time_offset,
296        });
297        self.write().unwrap().send(pak.clone());
298        Response {
299            connection: self.c(),
300            packets: pak,
301            fn_has: request_final_fn_has,
302            fn_get: request_final_fn_get,
303        }
304    }
305
306    fn add_port(&self, port: u16) {
307        let pak = Packets::Avalibile(Avalibile { session: 0, port });
308        self.write().unwrap().send(pak);
309    }
310
311    fn adress(&self) -> Adress {
312        self.read().unwrap().info.public.clone()
313    }
314
315    fn has_new(&self) -> Option<RequestStage> {
316        let mut res = None;
317
318        self.write().unwrap().packets.retain(|pak| {
319            if res.is_none() {
320                match pak {
321                    Packets::NewRequest(pak) => {
322                        res = Some(RequestStage::NewRequest(response::NewRequest {
323                            connection: Box::new(self.clone()),
324                            from: pak.from.clone(),
325                            secret: pak.secret.clone(),
326                        }));
327                        false
328                    }
329                    Packets::NewRequestResponse(pak) => {
330                        res = Some(RequestStage::NewRequestResponse(
331                            response::NewRequestResponse {
332                                connection: Box::new(self.clone()),
333                                from: pak.from.clone(),
334                                accept: pak.accepted,
335                                secret: pak.secret.clone(),
336                            },
337                        ));
338                        false
339                    }
340                    Packets::NewRequestFinal(pak) => {
341                        res = Some(RequestStage::NewRequestFinal(response::NewRequestFinal {
342                            connection: Box::new(self.clone()),
343                            from: pak.from.clone(),
344                            accept: pak.accepted,
345                        }));
346                        false
347                    }
348                    Packets::ConnectOn(pak) => {
349                        res = Some(RequestStage::ConnectOn(response::ConnectOn {
350                            connection: self.c(),
351                            adress: pak.adress.clone(),
352                            to: pak.to.clone(),
353                            port: pak.port,
354                            time: pak.time,
355                        }));
356                        false
357                    }
358                    _ => true,
359                }
360            } else {
361                true
362            }
363        });
364
365        res
366    }
367
368    fn c(&self) -> Box<dyn TConnection + Send> {
369        Box::new(self.clone())
370    }
371}
372
373unsafe impl Send for Connection {}
374
375// Search
376
377fn search_fn_has(conn: &Box<dyn TConnection>, _: &Packets) -> bool {
378    conn.step();
379    for pak in conn.read().unwrap().packets.iter() {
380        if let Packets::SearchResponse(_) = pak {
381            return true;
382        }
383    }
384    false
385}
386
387fn search_fn_get(conn: Box<dyn TConnection>, _: Packets) -> response::SearchResponse {
388    let mut res = None;
389
390    conn.write().unwrap().packets.retain(|pak| {
391        if let Packets::SearchResponse(pak) = pak {
392            if res.is_none() {
393                res = Some(response::SearchResponse {
394                    adresses: pak.adresses.clone(),
395                });
396                return false;
397            }
398        }
399        true
400    });
401
402    if let Some(res) = res {
403        res
404    } else {
405        panic!()
406    }
407}
408
409// End Search
410//
411// Info
412
413fn info_fn_has(conn: &Box<dyn TConnection>, packet: &Packets) -> bool {
414    conn.step();
415    if let Packets::InfoRequest(packet) = packet {
416        for pak in conn.read().unwrap().packets.iter() {
417            if let Packets::Info(pak) = pak {
418                if pak.adress == packet.adress {
419                    return true;
420                }
421            }
422        }
423    }
424    false
425}
426
427fn info_fn_get(conn: Box<dyn TConnection>, packet: Packets) -> Option<ConnectionInfo> {
428    let mut res = None;
429    if let Packets::InfoRequest(packet) = packet {
430        conn.write().unwrap().packets.retain(|pak| {
431            if let Packets::Info(pak) = pak {
432                if pak.adress == packet.adress && res.is_none() {
433                    if pak.has {
434                        res = Some(Some(ConnectionInfo {
435                            client: pak.client.clone(),
436                            name: pak.client.clone(),
437                            public: packet.adress.clone(),
438                            other: pak.other.clone(),
439                            privacy: false,
440                        }));
441                    } else {
442                        res = Some(None)
443                    }
444                    return false;
445                }
446            }
447            true
448        })
449    }
450
451    if let Some(res) = res {
452        res
453    } else {
454        panic!()
455    }
456}
457
458// End Info
459//
460// Request
461
462fn request_fn_has(conn: &Box<dyn TConnection>, packet: &Packets) -> bool {
463    conn.step();
464    if let Packets::Request(packet) = packet {
465        for pak in conn.read().unwrap().packets.iter() {
466            if let Packets::NewRequestResponse(pak) = pak {
467                if pak.from == packet.to {
468                    return true;
469                }
470            }
471        }
472    }
473    false
474}
475
476fn request_fn_get(conn: Box<dyn TConnection>, packet: Packets) -> response::NewRequestResponse {
477    let mut res = None;
478    if let Packets::Request(packet) = packet {
479        conn.write().unwrap().packets.retain(|pak| {
480            if let Packets::NewRequestResponse(pak) = pak {
481                if pak.from == packet.to && res.is_none() {
482                    res = Some(response::NewRequestResponse {
483                        connection: conn.c(),
484                        from: pak.from.clone(),
485                        accept: pak.accepted,
486                        secret: pak.secret.clone(),
487                    });
488                    return false;
489                }
490            }
491            true
492        })
493    }
494
495    if let Some(res) = res {
496        res
497    } else {
498        panic!()
499    }
500}
501
502// End Request
503//
504// RequestResponse
505
506fn request_response_fn_has(conn: &Box<dyn TConnection>, packet: &Packets) -> bool {
507    conn.step();
508    if let Packets::Request(packet) = packet {
509        for pak in conn.read().unwrap().packets.iter() {
510            if let Packets::NewRequestFinal(pak) = pak {
511                if pak.from == packet.to {
512                    return true;
513                }
514            }
515        }
516    }
517    false
518}
519
520fn request_response_fn_get(
521    conn: Box<dyn TConnection>,
522    packet: Packets,
523) -> response::NewRequestFinal {
524    let mut res = None;
525    if let Packets::Request(packet) = packet {
526        conn.write().unwrap().packets.retain(|pak| {
527            if let Packets::NewRequestFinal(pak) = pak {
528                if pak.from == packet.to && res.is_none() {
529                    res = Some(response::NewRequestFinal {
530                        connection: conn.c(),
531                        from: pak.from.clone(),
532                        accept: pak.accepted,
533                    });
534                    return false;
535                }
536            }
537            true
538        })
539    }
540
541    if let Some(res) = res {
542        res
543    } else {
544        panic!()
545    }
546}
547
548// End RequestResponse
549//
550// RequestFinal
551
552fn request_final_fn_has(conn: &Box<dyn TConnection>, packet: &Packets) -> bool {
553    conn.step();
554    if let Packets::RequestFinal(packet) = packet {
555        for pak in conn.read().unwrap().packets.iter() {
556            if let Packets::ConnectOn(pak) = pak {
557                if pak.adress == packet.to {
558                    return true;
559                }
560            }
561        }
562    }
563    false
564}
565
566fn request_final_fn_get(conn: Box<dyn TConnection>, packet: Packets) -> response::ConnectOn {
567    let mut res = None;
568    if let Packets::RequestFinal(packet) = packet {
569        conn.write().unwrap().packets.retain(|pak| {
570            if let Packets::ConnectOn(pak) = pak {
571                if pak.adress == packet.to && res.is_none() {
572                    res = Some(response::ConnectOn {
573                        connection: conn.c(),
574                        adress: pak.adress.clone(),
575                        to: pak.to.clone(),
576                        port: pak.port,
577                        time: pak.time,
578                    });
579                    return false;
580                }
581            }
582            true
583        })
584    }
585
586    if let Some(res) = res {
587        res
588    } else {
589        panic!()
590    }
591}