1pub mod channel;
2pub mod connection;
3pub mod ffi;
4pub mod fragmentation;
5pub mod header;
6pub mod int_buffer;
7pub mod nack;
8pub mod network_address;
9pub mod pool;
10pub mod receive_result;
11pub mod receiver;
12pub mod send_buffer_manager;
13pub mod sequence;
14pub mod sequence_buffer;
15pub mod tachyon_socket;
16pub mod unreliable_sender;
17
18mod connection_impl;
19
20#[cfg(test)]
22pub mod tachyon_test;
23
24use std::time::Duration;
25use std::time::Instant;
26
27use rustc_hash::FxHashMap;
28
29use self::channel::*;
30use self::connection::*;
31use self::fragmentation::*;
32use self::header::*;
33use self::network_address::NetworkAddress;
34use self::receive_result::ReceiveResult;
35use self::receive_result::TachyonReceiveResult;
36use self::receive_result::RECEIVE_ERROR_CHANNEL;
37use self::receive_result::RECEIVE_ERROR_UNKNOWN;
38use self::receiver::RECEIVE_WINDOW_SIZE_DEFAULT;
39use self::tachyon_socket::*;
40use self::unreliable_sender::UnreliableSender;
41
42pub const SEND_ERROR_CHANNEL: u32 = 2;
43pub const SEND_ERROR_SOCKET: u32 = 1;
44pub const SEND_ERROR_FRAGMENT: u32 = 3;
45pub const SEND_ERROR_UNKNOWN: u32 = 4;
46pub const SEND_ERROR_LENGTH: u32 = 5;
47pub const SEND_ERROR_IDENTITY: u32 = 6;
48
49const NACK_REDUNDANCY_DEFAULT: u32 = 1;
50
51pub type OnConnectedCallback = unsafe extern "C" fn();
52
53#[derive(Clone, Copy)]
54#[repr(C)]
55#[derive(Default, Debug)]
56pub struct TachyonStats {
57 pub channel_stats: ChannelStats,
58 pub packets_dropped: u64,
59 pub unreliable_sent: u64,
60 pub unreliable_received: u64,
61}
62
63impl std::fmt::Display for TachyonStats {
64 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65 write!(
66 f,
67 "channel_stats:{0} packets_dropped:{1} unreliable_sent:{2} unreliable_received:{3}\n",
68 self.channel_stats,
69 self.packets_dropped,
70 self.unreliable_sent,
71 self.unreliable_received
72 )
73 }
74}
75
76#[derive(Clone, Copy)]
77#[repr(C)]
78pub struct TachyonConfig {
79 pub use_identity: u32,
80 pub drop_packet_chance: u64,
81 pub drop_reliable_only: u32,
82 pub receive_window_size: u16,
83 pub nack_redundancy: u32
84}
85
86impl TachyonConfig {
87 pub fn default() -> Self {
88 let default = TachyonConfig {
89 use_identity: 0,
90 drop_packet_chance: 0,
91 drop_reliable_only: 0,
92 receive_window_size: RECEIVE_WINDOW_SIZE_DEFAULT,
93 nack_redundancy: NACK_REDUNDANCY_DEFAULT
94 };
95 return default;
96 }
97
98 pub fn get_receive_window_size(&self) -> u16 {
99 if self.receive_window_size > 0 {
100 return self.receive_window_size;
101 } else {
102 return RECEIVE_WINDOW_SIZE_DEFAULT;
103 }
104 }
105}
106
107#[derive(Clone, Copy)]
108#[repr(C)]
109#[derive(Default)]
110pub struct TachyonSendResult {
111 pub sent_len: u32,
112 pub error: u32,
113 pub header: Header,
114}
115
116pub struct Tachyon {
117 pub id: u16,
118 pub socket: TachyonSocket,
119 pub unreliable_sender: Option<UnreliableSender>,
120 pub identities: FxHashMap<u32, u32>,
121 pub connections: FxHashMap<NetworkAddress, Connection>,
122 pub channels: FxHashMap<(NetworkAddress, u8), Channel>,
123 pub channel_config: FxHashMap<u8, bool>,
124 pub config: TachyonConfig,
125 pub nack_send_data: Vec<u8>,
126 pub stats: TachyonStats,
127 pub start_time: Instant,
128 pub last_identity_link_request: Instant,
129 pub identity: Identity,
130 pub on_connected_callback: Option<OnConnectedCallback>,
131}
132
133impl Tachyon {
134 pub fn create(config: TachyonConfig) -> Self {
135 return Tachyon::create_with_id(config, 0);
136 }
137
138 pub fn create_with_id(config: TachyonConfig, id: u16) -> Self {
139 let socket = TachyonSocket::create();
140
141 let mut tachyon = Tachyon {
142 id,
143 identities: FxHashMap::default(),
144 connections: FxHashMap::default(),
145 channels: FxHashMap::default(),
146 channel_config: FxHashMap::default(),
147 socket: socket,
148 unreliable_sender: None,
149 config,
150 nack_send_data: vec![0; 4096],
151 stats: TachyonStats::default(),
152 start_time: Instant::now(),
153 last_identity_link_request: Instant::now() - Duration::new(100, 0),
154 identity: Identity::default(),
155 on_connected_callback: None,
156 };
157 tachyon.channel_config.insert(1, true);
158 tachyon.channel_config.insert(2, false);
159
160 return tachyon;
161 }
162
163
164 pub fn time_since_start(&self) -> u64 {
165 return Instant::now().duration_since(self.start_time).as_millis() as u64;
166 }
167
168 pub fn bind(&mut self, address: NetworkAddress) -> bool {
169 match self.socket.bind_socket(address) {
170 CreateConnectResult::Success => {
171 self.unreliable_sender = self.create_unreliable_sender();
172 return true;
173 }
174 CreateConnectResult::Error => {
175 return false;
176 }
177 }
178 }
179
180 pub fn connect(&mut self, address: NetworkAddress) -> bool {
181 match self.socket.connect_socket(address) {
182 CreateConnectResult::Success => {
183 let local_address = NetworkAddress::default();
184 self.try_create_connection(local_address);
185 self.create_configured_channels(local_address);
186 self.unreliable_sender = self.create_unreliable_sender();
187 return true;
188 }
189 CreateConnectResult::Error => {
190 return false;
191 }
192 }
193 }
194
195 pub fn create_unreliable_sender(&self) -> Option<UnreliableSender> {
196 let socket = self.socket.clone_socket();
197 if !socket.is_some() {
198 return None;
199 }
200 let sender = UnreliableSender { socket: socket };
201 return Some(sender);
202 }
203
204 pub fn get_channel(&mut self, address: NetworkAddress, channel_id: u8) -> Option<&mut Channel> {
205 match self.channels.get_mut(&(address, channel_id)) {
206 Some(channel) => {
207 return Some(channel);
208 }
209 None => {
210 return None;
211 }
212 }
213 }
214
215 fn create_configured_channels(&mut self, address: NetworkAddress) {
216 for config in &self.channel_config {
217 let channel_id = *config.0;
218 let ordered = *config.1;
219 match self.channels.get_mut(&(address, channel_id)) {
220 Some(_) => {}
221 None => {
222 let channel = Channel::create(channel_id, ordered, address, self.config.get_receive_window_size(), self.config.nack_redundancy);
223 self.channels.insert((address, channel_id), channel);
224 }
225 }
226 }
227 }
228
229 pub fn get_channel_count(&mut self, address: NetworkAddress) -> u32 {
230 let mut count = 0;
231 for config in &self.channel_config {
232 let channel_id = *config.0;
233 if self.channels.contains_key(&(address, channel_id)) {
234 count += 1;
235 }
236 }
237 return count;
238 }
239
240 fn remove_configured_channels(&mut self, address: NetworkAddress) {
241 for config in &self.channel_config {
242 let channel_id = *config.0;
243 self.channels.remove(&(address, channel_id));
244 }
245 }
246
247 pub fn configure_channel(&mut self, channel_id: u8, ordered: bool) -> bool {
248 if channel_id < 3 {
249 return false;
250 }
251 self.channel_config.insert(channel_id, ordered);
252 return true;
253 }
254
255 pub fn get_combined_stats(&mut self) -> TachyonStats {
256 let mut channel_stats = ChannelStats::default();
257 for channel in self.channels.values_mut() {
258 channel.update_stats();
259 channel_stats.add_from(&channel.stats);
260 }
261 let mut stats = self.stats.clone();
262 stats.channel_stats = channel_stats;
263 return stats;
264 }
265
266 pub fn update(&mut self) {
267 self.client_identity_update();
268
269 for channel in self.channels.values_mut() {
270 channel.update(&self.socket);
271 }
272 }
273
274 fn receive_published_channel_id(&mut self, receive_buffer: &mut [u8], address: NetworkAddress, channel_id: u8) -> u32 {
275 match self.channels.get_mut(&(address, channel_id)) {
276 Some(channel) => {
277 let res = channel.receive_published(receive_buffer);
278 return res.0;
279 }
280 None => {
281 return 0;
282 }
283 }
284 }
285
286 fn receive_published_all_channels(&mut self, receive_buffer: &mut [u8]) -> TachyonReceiveResult {
287 let mut result = TachyonReceiveResult::default();
288
289 for channel in self.channels.values_mut() {
290 let res = channel.receive_published(receive_buffer);
291 if res.0 > 0 {
292 result.length = res.0;
293 result.address = res.1;
294 result.channel = channel.id as u16;
295 return result;
296 }
297 }
298 return result;
299 }
300
301 pub fn receive_loop(&mut self, receive_buffer: &mut [u8]) -> TachyonReceiveResult {
302 let mut result = TachyonReceiveResult::default();
303
304 for _ in 0..100 {
305 let receive_result = self.receive_from_socket(receive_buffer);
306 match receive_result {
307 ReceiveResult::Reliable {
308 network_address: socket_addr,
309 channel_id,
310 } => {
311 let published = self.receive_published_channel_id(receive_buffer, socket_addr, channel_id);
312 if published > 0 {
313 result.channel = channel_id as u16;
314 result.length = published;
315 result.address = socket_addr;
316 return result;
317 }
318 }
319 ReceiveResult::UnReliable {
320 received_len,
321 network_address: socket_addr,
322 } => {
323 result.length = received_len as u32;
324 result.address = socket_addr;
325 return result;
326 }
327 ReceiveResult::Empty => {
328 break;
329 }
330 ReceiveResult::Retry => {}
331 ReceiveResult::Error => {
332 result.error = RECEIVE_ERROR_UNKNOWN;
333 return result;
334 }
335 ReceiveResult::ChannelError => {
336 result.error = RECEIVE_ERROR_CHANNEL;
337 return result;
338 }
339 }
340 }
341 return self.receive_published_all_channels(receive_buffer);
342 }
343
344 fn receive_from_socket(&mut self, receive_buffer: &mut [u8]) -> ReceiveResult {
345 let address: NetworkAddress;
346 let received_len: usize;
347 let header: Header;
348
349 let socket_result = self.socket.receive(receive_buffer,self.config.drop_packet_chance,self.config.drop_reliable_only == 1);
350 match socket_result {
351 SocketReceiveResult::Success {bytes_received, network_address} => {
352 received_len = bytes_received;
353 address = network_address;
354
355 header = Header::read(receive_buffer);
356
357 if self.socket.is_server {
358 if self.config.use_identity == 1 {
359 let connection_header: ConnectionHeader;
360
361 if header.message_type == MESSAGE_TYPE_LINK_IDENTITY {
362 connection_header = ConnectionHeader::read(receive_buffer);
363 self.try_link_identity(address, connection_header.id, connection_header.session_id);
364 return ReceiveResult::Empty;
365 } else if header.message_type == MESSAGE_TYPE_UNLINK_IDENTITY {
366 connection_header = ConnectionHeader::read(receive_buffer);
367 self.try_unlink_identity(address, connection_header.id, connection_header.session_id);
368 return ReceiveResult::Empty;
369 } else {
370 if !self.validate_and_update_linked_connection(address) {
371 return ReceiveResult::Empty;
372 }
373 }
374 } else {
375 self.on_receive_connection_update(address);
376 }
377 } else {
378 if self.config.use_identity == 1 {
379 if header.message_type == MESSAGE_TYPE_IDENTITY_LINKED {
380 self.identity.set_linked(1);
381 if let Some(callback) = self.on_connected_callback {
382 unsafe {
383 callback();
384 }
385 }
386 return ReceiveResult::Empty;
387 } else if header.message_type == MESSAGE_TYPE_IDENTITY_UNLINKED {
388 self.identity.set_linked(0);
389 return ReceiveResult::Empty;
390 }
391
392 if !self.identity.is_linked() {
393 return ReceiveResult::Empty;
394 }
395 }
396 }
397 }
398 SocketReceiveResult::Empty => {
399 return ReceiveResult::Empty;
400 }
401 SocketReceiveResult::Error => {
402 return ReceiveResult::Error;
403 }
404 SocketReceiveResult::Dropped => {
405 self.stats.packets_dropped += 1;
406 return ReceiveResult::Retry;
407 }
408 }
409
410 if header.message_type == MESSAGE_TYPE_UNRELIABLE {
411 self.stats.unreliable_received += 1;
412 return ReceiveResult::UnReliable {
413 received_len: received_len,
414 network_address: address,
415 };
416 }
417
418 let channel = match self.channels.get_mut(&(address, header.channel)) {
419 Some(c) => c,
420 None => {
421 return ReceiveResult::ChannelError;
422 }
423 };
424
425 channel.stats.bytes_received += received_len as u64;
426
427 if header.message_type == MESSAGE_TYPE_NONE {
428 channel.stats.nones_received += 1;
429 if channel.receiver.receive_packet(header.sequence, receive_buffer, received_len)
430 {
431 channel.stats.nones_accepted += 1;
432 }
433 return ReceiveResult::Retry;
434 }
435
436 if header.message_type == MESSAGE_TYPE_NACK {
437 channel.process_nack_message(address, receive_buffer);
438 return ReceiveResult::Retry;
439 }
440
441 if header.message_type == MESSAGE_TYPE_FRAGMENT {
442 channel.process_fragment_message(header.sequence, receive_buffer, received_len);
443 return ReceiveResult::Retry;
444 }
445
446
447
448 if header.message_type == MESSAGE_TYPE_RELIABLE || header.message_type == MESSAGE_TYPE_RELIABLE_WITH_NACK {
449
450 if header.message_type == MESSAGE_TYPE_RELIABLE_WITH_NACK {
451 channel.process_single_nack(address, receive_buffer);
452 }
453
454 if channel.receiver.receive_packet(header.sequence, receive_buffer, received_len) {
455 channel.stats.received += 1;
456 return ReceiveResult::Reliable {
457 network_address: address,
458 channel_id: header.channel,
459 };
460 } else {
461 return ReceiveResult::Retry;
462 }
463 }
464
465 return ReceiveResult::Error;
466 }
467
468 pub fn send_unreliable(&mut self, address: NetworkAddress, data: &mut [u8], length: usize) -> TachyonSendResult {
469 if !self.can_send() {
470 let mut result = TachyonSendResult::default();
471 result.error = SEND_ERROR_IDENTITY;
472 return result;
473 }
474
475 match &self.unreliable_sender {
476 Some(sender) => {
477 let result = sender.send_unreliable(address, data, length);
478 if result.error == 0 {
479 self.stats.unreliable_sent += 1;
480 }
481 return result;
482 }
483 None => {
484 let mut result = TachyonSendResult::default();
485 result.error = SEND_ERROR_UNKNOWN;
486 return result;
487 }
488 }
489 }
490
491 pub fn send_reliable(&mut self, channel_id: u8, address: NetworkAddress, data: &mut [u8], body_len: usize) -> TachyonSendResult {
492 let mut result = TachyonSendResult::default();
493
494 if !self.can_send() {
495 result.error = SEND_ERROR_IDENTITY;
496 return result;
497 }
498
499 if body_len == 0 {
500 result.error = SEND_ERROR_LENGTH;
501 return result;
502 }
503
504 if channel_id == 0 {
505 result.error = SEND_ERROR_CHANNEL;
506 return result;
507 }
508
509 if !self.socket.socket.is_some() {
510 result.error = SEND_ERROR_SOCKET;
511 return result;
512 }
513
514 let channel = match self.channels.get_mut(&(address, channel_id)) {
515 Some(c) => c,
516 None => {
517 result.error = SEND_ERROR_CHANNEL;
518 return result;
519 }
520 };
521
522 if Fragmentation::should_fragment(body_len) {
523 let mut fragment_bytes_sent = 0;
524 let frag_sequences = channel.frag.create_fragments(&mut channel.send_buffers, channel.id, data, body_len);
525 if frag_sequences.len() == 0 {
526 result.error = SEND_ERROR_FRAGMENT;
527 return result;
528 }
529
530 for seq in frag_sequences {
531 match channel.send_buffers.get_send_buffer(seq) {
532 Some(fragment) => {
533 let sent =self.socket.send_to(address, &fragment.buffer, fragment.buffer.len());
534 fragment_bytes_sent += sent;
535
536 channel.stats.bytes_sent += sent as u64;
537 channel.stats.fragments_sent += 1;
538 }
539 None => {
540 result.error = SEND_ERROR_FRAGMENT;
541 return result;
542 }
543 }
544 }
545
546 result.header.message_type = MESSAGE_TYPE_FRAGMENT;
547 result.sent_len = fragment_bytes_sent as u32;
548
549 channel.stats.sent += 1;
550
551 return result;
552 }
553
554
555 result = channel.send_reliable(address, data, body_len, &self.socket);
556 return result;
557
558 }
603}
604
605#[cfg(test)]
606mod tests {
607
608 use serial_test::serial;
609
610 use crate::tachyon::tachyon_test::TachyonTest;
611
612 use super::*;
613
614 #[test]
615 fn test_nack_rotation() {
616 println!("{0}", 4 % 5);
617 }
618
619 #[test]
620 #[serial]
621 fn test_reliable() {
622 let mut test = TachyonTest::default();
625 test.connect();
626
627 test.send_buffer[0] = 4;
628 let sent = test.client_send_reliable(1, 2);
629 assert_eq!(2 + TACHYON_HEADER_SIZE, sent.sent_len as usize);
631
632 let res = test.server_receive();
633 assert_eq!(2, res.length);
634 assert_eq!(4, test.receive_buffer[0]);
635
636 test.client_send_reliable(2, 33);
637 let res = test.server_receive();
638 assert_eq!(33, res.length);
639
640 test.client_send_reliable(2, 3497);
642 let res = test.server_receive();
643 assert_eq!(3497, res.length);
644 }
645
646 #[test]
647 #[serial]
648 fn test_unconfigured_channel_fails() {
649 let mut test = TachyonTest::default();
650 test.client.configure_channel(3, true);
651 test.connect();
652
653 let sent = test.client_send_reliable(3, 2);
654 assert_eq!(2 + TACHYON_HEADER_SIZE, sent.sent_len as usize);
655 assert_eq!(0, sent.error);
656
657 let res = test.server_receive();
658 assert_eq!(0, res.length);
659 assert_eq!(RECEIVE_ERROR_CHANNEL, res.error);
660 }
661
662 #[test]
663 #[serial]
664 fn test_configured_channel() {
665 let mut test = TachyonTest::default();
666 test.client.configure_channel(3, true);
667 test.server.configure_channel(3, true);
668 test.connect();
669
670 let sent = test.client_send_reliable(3, 2);
671 assert_eq!(2 + TACHYON_HEADER_SIZE, sent.sent_len as usize);
672 assert_eq!(0, sent.error);
673
674 let res = test.server_receive();
675 assert_eq!(2, res.length);
676 assert_eq!(0, res.error);
677 }
678
679 #[test]
680 #[serial]
681 fn test_unreliable() {
682 let mut test = TachyonTest::default();
683 test.connect();
684
685 let send = test.client_send_unreliable(0);
688 assert_eq!(SEND_ERROR_LENGTH, send.error);
689
690 let res = test.server_receive();
691 assert_eq!(0, res.length);
692
693 test.receive_buffer[0] = 1;
694 test.send_buffer[1] = 4;
695 test.send_buffer[2] = 5;
696 test.send_buffer[3] = 6;
697 let sent = test.client_send_unreliable(4);
698 assert_eq!(0, sent.error);
699 assert_eq!(4, sent.sent_len as usize);
700
701 let res = test.server_receive();
702 assert_eq!(4, res.length);
703 assert_eq!(0, test.receive_buffer[0]);
704 assert_eq!(4, test.receive_buffer[1]);
705 assert_eq!(5, test.receive_buffer[2]);
706 assert_eq!(6, test.receive_buffer[3]);
707 }
708}