relay_man/server/
mod.rs

1mod connect;
2mod on_info;
3mod on_request;
4mod on_request_final;
5mod on_request_response;
6mod on_search;
7
8use bytes_kman::TBytes;
9use polling::{Event, Poller};
10use rand::random;
11use socket2::{Domain, Protocol, SockAddr, Socket, Type};
12
13// RelayServer allways should be on this port
14pub const PORT: u16 = 2120;
15
16use crate::common::{adress::Adress, packets::*, FromRawSock, IntoRawSock, RawSock};
17use std::{
18    mem::MaybeUninit,
19    net::ToSocketAddrs,
20    time::{Duration, SystemTime},
21};
22
23#[derive(PartialEq, Clone, Debug)]
24pub enum Connecting {
25    Start(usize),
26    Finishing(usize, u128),
27}
28impl Connecting {
29    pub fn session(&self) -> usize {
30        match self {
31            Connecting::Start(s) => *s,
32            Connecting::Finishing(s, _) => *s,
33        }
34    }
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub enum ClientStage {
39    NotRegistered,
40    Registered(RegisteredClient),
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct RegisteredClient {
45    pub name: String,
46    pub client: String,
47    pub other: Vec<u8>,
48    pub adress: Adress,
49    pub ports: Vec<u16>,
50    pub to_connect: Vec<Connecting>,
51    pub privacy: bool,
52    pub private_adress: String,
53}
54
55#[derive(Debug)]
56pub struct Client {
57    pub session: usize,
58    pub conn: Socket,
59    pub fd: RawSock,
60    pub from: SockAddr,
61    pub stage: ClientStage,
62    pub last_message: SystemTime,
63    pub buffer: Vec<MaybeUninit<u8>>,
64}
65
66impl PartialEq for Client {
67    fn eq(&self, other: &Self) -> bool {
68        self.session == other.session && self.stage == other.stage
69    }
70}
71
72#[derive(Debug)]
73pub struct RelayServer {
74    pub clients: Vec<Client>,
75    pub poller: Poller,
76    pub conn: Socket,
77    pub fd: RawSock,
78    pub buffer: Vec<MaybeUninit<u8>>,
79    pub client_timeout: Duration,
80}
81
82#[derive(Debug)]
83pub enum RelayServerError {
84    CannotCreatePoller,
85}
86
87impl RelayServer {
88    pub fn new(ip: impl Into<String>, client_timeout: Duration) -> Result<Self, RelayServerError> {
89        let adress = format!("{}:{}", ip.into(), PORT);
90        let adress = adress.to_socket_addrs().unwrap().next().unwrap();
91        let adress_sock = SockAddr::from(adress);
92        let conn = Socket::new(
93            Domain::for_address(adress),
94            Type::STREAM,
95            Some(Protocol::TCP),
96        )
97        .unwrap();
98
99        conn.set_nonblocking(true).unwrap();
100        conn.bind(&adress_sock).unwrap();
101        conn.listen(128).unwrap();
102
103        let Ok(poller) = Poller::new() else{
104            println!("Cannot create poller!");
105            return Err(RelayServerError::CannotCreatePoller)
106        };
107
108        let fd = conn.into_raw();
109        poller.add(fd, Event::readable(0)).unwrap();
110        let conn = Socket::from_raw(fd);
111
112        let mut buffer = Vec::new();
113        buffer.resize(1024, MaybeUninit::new(0));
114
115        Ok(Self {
116            clients: Vec::new(),
117            poller,
118            buffer,
119            fd,
120            client_timeout,
121            conn,
122        })
123    }
124
125    pub fn avalibile_adress(&self, adress: &Adress) -> bool {
126        for client in self.clients.iter() {
127            if let ClientStage::Registered(client) = &client.stage {
128                if client.adress == *adress {
129                    return false;
130                }
131            }
132        }
133        true
134    }
135
136    pub fn create_session(&self) -> usize {
137        let mut session = random();
138
139        'l: loop {
140            if session == 0 {
141                session = random();
142                continue 'l;
143            }
144
145            for client in self.clients.iter() {
146                if client.session == session {
147                    session = random();
148                    continue 'l;
149                }
150            }
151            break;
152        }
153
154        session
155    }
156
157    pub fn listen(&mut self) {
158        let mut events = Vec::new();
159        let Ok(_) = self.poller.wait(&mut events, None) else {return};
160
161        for event in events {
162            if event.key == 0 {
163                self.accept_new();
164                self.poller.modify(self.fd, Event::readable(0)).unwrap();
165            } else if let Some(fd) = self.process_client(event.key) {
166                self.poller.modify(fd, Event::readable(event.key)).unwrap();
167            }
168        }
169    }
170
171    pub fn accept_new(&mut self) {
172        if let Ok((conn, from)) = self.conn.accept() {
173            let _ = conn.set_nonblocking(true);
174            let fd = conn.into_raw();
175            let session = self.create_session();
176            self.poller.add(fd, Event::readable(session)).unwrap();
177            let conn = Socket::from_raw(fd);
178            let _ = conn.set_recv_buffer_size(1024);
179            let _ = conn.set_send_buffer_size(1024);
180
181            let client = Client {
182                session,
183                fd,
184                conn,
185                from,
186                stage: ClientStage::NotRegistered,
187                last_message: SystemTime::now(),
188                buffer: vec![MaybeUninit::new(0); 1024],
189            };
190
191            self.clients.push(client);
192        }
193    }
194
195    pub fn process_client(&mut self, session: usize) -> Option<RawSock> {
196        let mut to_search = Vec::new();
197        let mut to_info = Vec::new();
198        let mut to_request = Vec::new();
199        let mut to_request_response = Vec::new();
200        let mut to_request_final = Vec::new();
201
202        let mut used_adresses = Vec::new();
203        let mut index = None;
204        let mut fd = None;
205        for (i, client) in self.clients.iter().enumerate() {
206            if let ClientStage::Registered(rclient) = &client.stage {
207                used_adresses.push(rclient.adress.clone())
208            }
209
210            if client.session == session {
211                index = Some(i);
212                fd = Some(client.fd)
213            }
214        }
215
216        let Some(index) = index else{return fd};
217
218        if let Some(client) = self.clients.get_mut(index) {
219            let Ok(len) = client.conn.recv(&mut client.buffer)else {
220                client.last_message = SystemTime::UNIX_EPOCH;
221                return None};
222            if len == 0 {
223                client.last_message = SystemTime::UNIX_EPOCH;
224                return None;
225            }
226
227            let buffer: &[u8] = unsafe { std::mem::transmute(&client.buffer[0..len]) };
228            let mut buffer = buffer.to_owned();
229            while !buffer.is_empty() {
230                let Some(packet) = Packets::from_bytes(&mut buffer)else{return fd};
231                match packet {
232                    Packets::Register(register) => {
233                        if used_adresses.contains(&register.public) {
234                            let pak = Packets::RegisterResponse(RegisterResponse {
235                                accepted: false,
236                                session: 0,
237                            });
238                            let mut bytes = pak.to_bytes();
239                            bytes.reverse();
240
241                            let _ = client.conn.send(&bytes);
242                            return fd;
243                        }
244
245                        // Adress is valid
246
247                        client.stage = ClientStage::Registered(RegisteredClient {
248                            name: register.name,
249                            client: register.client,
250                            other: register.other,
251                            adress: register.public,
252                            ports: vec![],
253                            to_connect: vec![],
254                            privacy: register.privacy,
255                            private_adress: register.private_adress,
256                        });
257
258                        let pak = Packets::RegisterResponse(RegisterResponse {
259                            accepted: true,
260                            session: client.session,
261                        });
262
263                        let mut bytes = pak.to_bytes();
264                        bytes.reverse();
265
266                        let _ = client.conn.send(&bytes);
267                    }
268                    Packets::UnRegister(session) => {
269                        if client.session == session.session {
270                            client.last_message = std::time::UNIX_EPOCH;
271                        }
272                    }
273                    Packets::Search(search) => {
274                        if search.session == client.session {
275                            to_search.push(search);
276                            client.last_message = SystemTime::now();
277                        }
278                    }
279                    Packets::InfoRequest(info) => {
280                        if info.session == client.session {
281                            to_info.push(info);
282                            client.last_message = SystemTime::now();
283                        }
284                    }
285                    Packets::Request(request) => {
286                        if request.session == client.session {
287                            to_request.push(request);
288                            client.last_message = SystemTime::now();
289                        }
290                    }
291                    Packets::RequestResponse(request_response) => {
292                        if request_response.session == client.session {
293                            to_request_response.push(request_response);
294                            client.last_message = SystemTime::now();
295                        }
296                    }
297                    Packets::Avalibile(avalibile) => {
298                        if avalibile.session == client.session {
299                            if let ClientStage::Registered(client) = &mut client.stage {
300                                client.ports.push(avalibile.port);
301                            }
302                            client.last_message = SystemTime::now();
303                        }
304                    }
305                    Packets::RequestFinal(request_final) => {
306                        if request_final.session == client.session {
307                            to_request_final.push(request_final);
308                            client.last_message = SystemTime::now();
309                        }
310                    }
311                    Packets::Tick { session } => {
312                        if client.session == session {
313                            client.last_message = SystemTime::now();
314                        }
315                    }
316
317                    _ => {}
318                }
319            }
320        }
321
322        for search in to_search {
323            self.on_search(index, search)
324        }
325
326        for info in to_info {
327            self.on_info(index, info)
328        }
329
330        for request in to_request {
331            self.on_request(index, request)
332        }
333
334        for request_response in to_request_response {
335            self.on_request_response(index, request_response)
336        }
337
338        for request_final in to_request_final {
339            self.on_request_final(index, request_final)
340        }
341
342        fd
343    }
344
345    pub fn step(&mut self) {
346        self.listen();
347        self.clients.retain(|client| {
348            if client.last_message.elapsed().unwrap() < self.client_timeout {
349                true
350            } else {
351                let _ = self.poller.delete(client.fd);
352                false
353            }
354        });
355
356        self.connect();
357    }
358}