1mod connect;
2mod on_info;
3mod on_request;
4mod on_request_final;
5mod on_request_response;
6mod on_search;
7
8use bytes_kman::TBytes;
9use polling::{Event, Poller};
10use rand::random;
11use socket2::{Domain, Protocol, SockAddr, Socket, Type};
12
13pub const PORT: u16 = 2120;
15
16use crate::common::{adress::Adress, packets::*, FromRawSock, IntoRawSock, RawSock};
17use std::{
18 mem::MaybeUninit,
19 net::ToSocketAddrs,
20 time::{Duration, SystemTime},
21};
22
23#[derive(PartialEq, Clone, Debug)]
24pub enum Connecting {
25 Start(usize),
26 Finishing(usize, u128),
27}
28impl Connecting {
29 pub fn session(&self) -> usize {
30 match self {
31 Connecting::Start(s) => *s,
32 Connecting::Finishing(s, _) => *s,
33 }
34 }
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub enum ClientStage {
39 NotRegistered,
40 Registered(RegisteredClient),
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct RegisteredClient {
45 pub name: String,
46 pub client: String,
47 pub other: Vec<u8>,
48 pub adress: Adress,
49 pub ports: Vec<u16>,
50 pub to_connect: Vec<Connecting>,
51 pub privacy: bool,
52 pub private_adress: String,
53}
54
55#[derive(Debug)]
56pub struct Client {
57 pub session: usize,
58 pub conn: Socket,
59 pub fd: RawSock,
60 pub from: SockAddr,
61 pub stage: ClientStage,
62 pub last_message: SystemTime,
63 pub buffer: Vec<MaybeUninit<u8>>,
64}
65
66impl PartialEq for Client {
67 fn eq(&self, other: &Self) -> bool {
68 self.session == other.session && self.stage == other.stage
69 }
70}
71
72#[derive(Debug)]
73pub struct RelayServer {
74 pub clients: Vec<Client>,
75 pub poller: Poller,
76 pub conn: Socket,
77 pub fd: RawSock,
78 pub buffer: Vec<MaybeUninit<u8>>,
79 pub client_timeout: Duration,
80}
81
82#[derive(Debug)]
83pub enum RelayServerError {
84 CannotCreatePoller,
85}
86
87impl RelayServer {
88 pub fn new(ip: impl Into<String>, client_timeout: Duration) -> Result<Self, RelayServerError> {
89 let adress = format!("{}:{}", ip.into(), PORT);
90 let adress = adress.to_socket_addrs().unwrap().next().unwrap();
91 let adress_sock = SockAddr::from(adress);
92 let conn = Socket::new(
93 Domain::for_address(adress),
94 Type::STREAM,
95 Some(Protocol::TCP),
96 )
97 .unwrap();
98
99 conn.set_nonblocking(true).unwrap();
100 conn.bind(&adress_sock).unwrap();
101 conn.listen(128).unwrap();
102
103 let Ok(poller) = Poller::new() else{
104 println!("Cannot create poller!");
105 return Err(RelayServerError::CannotCreatePoller)
106 };
107
108 let fd = conn.into_raw();
109 poller.add(fd, Event::readable(0)).unwrap();
110 let conn = Socket::from_raw(fd);
111
112 let mut buffer = Vec::new();
113 buffer.resize(1024, MaybeUninit::new(0));
114
115 Ok(Self {
116 clients: Vec::new(),
117 poller,
118 buffer,
119 fd,
120 client_timeout,
121 conn,
122 })
123 }
124
125 pub fn avalibile_adress(&self, adress: &Adress) -> bool {
126 for client in self.clients.iter() {
127 if let ClientStage::Registered(client) = &client.stage {
128 if client.adress == *adress {
129 return false;
130 }
131 }
132 }
133 true
134 }
135
136 pub fn create_session(&self) -> usize {
137 let mut session = random();
138
139 'l: loop {
140 if session == 0 {
141 session = random();
142 continue 'l;
143 }
144
145 for client in self.clients.iter() {
146 if client.session == session {
147 session = random();
148 continue 'l;
149 }
150 }
151 break;
152 }
153
154 session
155 }
156
157 pub fn listen(&mut self) {
158 let mut events = Vec::new();
159 let Ok(_) = self.poller.wait(&mut events, None) else {return};
160
161 for event in events {
162 if event.key == 0 {
163 self.accept_new();
164 self.poller.modify(self.fd, Event::readable(0)).unwrap();
165 } else if let Some(fd) = self.process_client(event.key) {
166 self.poller.modify(fd, Event::readable(event.key)).unwrap();
167 }
168 }
169 }
170
171 pub fn accept_new(&mut self) {
172 if let Ok((conn, from)) = self.conn.accept() {
173 let _ = conn.set_nonblocking(true);
174 let fd = conn.into_raw();
175 let session = self.create_session();
176 self.poller.add(fd, Event::readable(session)).unwrap();
177 let conn = Socket::from_raw(fd);
178 let _ = conn.set_recv_buffer_size(1024);
179 let _ = conn.set_send_buffer_size(1024);
180
181 let client = Client {
182 session,
183 fd,
184 conn,
185 from,
186 stage: ClientStage::NotRegistered,
187 last_message: SystemTime::now(),
188 buffer: vec![MaybeUninit::new(0); 1024],
189 };
190
191 self.clients.push(client);
192 }
193 }
194
195 pub fn process_client(&mut self, session: usize) -> Option<RawSock> {
196 let mut to_search = Vec::new();
197 let mut to_info = Vec::new();
198 let mut to_request = Vec::new();
199 let mut to_request_response = Vec::new();
200 let mut to_request_final = Vec::new();
201
202 let mut used_adresses = Vec::new();
203 let mut index = None;
204 let mut fd = None;
205 for (i, client) in self.clients.iter().enumerate() {
206 if let ClientStage::Registered(rclient) = &client.stage {
207 used_adresses.push(rclient.adress.clone())
208 }
209
210 if client.session == session {
211 index = Some(i);
212 fd = Some(client.fd)
213 }
214 }
215
216 let Some(index) = index else{return fd};
217
218 if let Some(client) = self.clients.get_mut(index) {
219 let Ok(len) = client.conn.recv(&mut client.buffer)else {
220 client.last_message = SystemTime::UNIX_EPOCH;
221 return None};
222 if len == 0 {
223 client.last_message = SystemTime::UNIX_EPOCH;
224 return None;
225 }
226
227 let buffer: &[u8] = unsafe { std::mem::transmute(&client.buffer[0..len]) };
228 let mut buffer = buffer.to_owned();
229 while !buffer.is_empty() {
230 let Some(packet) = Packets::from_bytes(&mut buffer)else{return fd};
231 match packet {
232 Packets::Register(register) => {
233 if used_adresses.contains(®ister.public) {
234 let pak = Packets::RegisterResponse(RegisterResponse {
235 accepted: false,
236 session: 0,
237 });
238 let mut bytes = pak.to_bytes();
239 bytes.reverse();
240
241 let _ = client.conn.send(&bytes);
242 return fd;
243 }
244
245 client.stage = ClientStage::Registered(RegisteredClient {
248 name: register.name,
249 client: register.client,
250 other: register.other,
251 adress: register.public,
252 ports: vec![],
253 to_connect: vec![],
254 privacy: register.privacy,
255 private_adress: register.private_adress,
256 });
257
258 let pak = Packets::RegisterResponse(RegisterResponse {
259 accepted: true,
260 session: client.session,
261 });
262
263 let mut bytes = pak.to_bytes();
264 bytes.reverse();
265
266 let _ = client.conn.send(&bytes);
267 }
268 Packets::UnRegister(session) => {
269 if client.session == session.session {
270 client.last_message = std::time::UNIX_EPOCH;
271 }
272 }
273 Packets::Search(search) => {
274 if search.session == client.session {
275 to_search.push(search);
276 client.last_message = SystemTime::now();
277 }
278 }
279 Packets::InfoRequest(info) => {
280 if info.session == client.session {
281 to_info.push(info);
282 client.last_message = SystemTime::now();
283 }
284 }
285 Packets::Request(request) => {
286 if request.session == client.session {
287 to_request.push(request);
288 client.last_message = SystemTime::now();
289 }
290 }
291 Packets::RequestResponse(request_response) => {
292 if request_response.session == client.session {
293 to_request_response.push(request_response);
294 client.last_message = SystemTime::now();
295 }
296 }
297 Packets::Avalibile(avalibile) => {
298 if avalibile.session == client.session {
299 if let ClientStage::Registered(client) = &mut client.stage {
300 client.ports.push(avalibile.port);
301 }
302 client.last_message = SystemTime::now();
303 }
304 }
305 Packets::RequestFinal(request_final) => {
306 if request_final.session == client.session {
307 to_request_final.push(request_final);
308 client.last_message = SystemTime::now();
309 }
310 }
311 Packets::Tick { session } => {
312 if client.session == session {
313 client.last_message = SystemTime::now();
314 }
315 }
316
317 _ => {}
318 }
319 }
320 }
321
322 for search in to_search {
323 self.on_search(index, search)
324 }
325
326 for info in to_info {
327 self.on_info(index, info)
328 }
329
330 for request in to_request {
331 self.on_request(index, request)
332 }
333
334 for request_response in to_request_response {
335 self.on_request_response(index, request_response)
336 }
337
338 for request_final in to_request_final {
339 self.on_request_final(index, request_final)
340 }
341
342 fd
343 }
344
345 pub fn step(&mut self) {
346 self.listen();
347 self.clients.retain(|client| {
348 if client.last_message.elapsed().unwrap() < self.client_timeout {
349 true
350 } else {
351 let _ = self.poller.delete(client.fd);
352 false
353 }
354 });
355
356 self.connect();
357 }
358}