tachyon/
tachyon.rs

1pub mod channel;
2pub mod connection;
3pub mod ffi;
4pub mod fragmentation;
5pub mod header;
6pub mod int_buffer;
7pub mod nack;
8pub mod network_address;
9pub mod pool;
10pub mod receive_result;
11pub mod receiver;
12pub mod send_buffer_manager;
13pub mod sequence;
14pub mod sequence_buffer;
15pub mod tachyon_socket;
16pub mod unreliable_sender;
17
18mod connection_impl;
19
20// additional stress/scale testing
21#[cfg(test)]
22pub mod tachyon_test;
23
24use std::time::Duration;
25use std::time::Instant;
26
27use rustc_hash::FxHashMap;
28
29use self::channel::*;
30use self::connection::*;
31use self::fragmentation::*;
32use self::header::*;
33use self::network_address::NetworkAddress;
34use self::receive_result::ReceiveResult;
35use self::receive_result::TachyonReceiveResult;
36use self::receive_result::RECEIVE_ERROR_CHANNEL;
37use self::receive_result::RECEIVE_ERROR_UNKNOWN;
38use self::receiver::RECEIVE_WINDOW_SIZE_DEFAULT;
39use self::tachyon_socket::*;
40use self::unreliable_sender::UnreliableSender;
41
42pub const SEND_ERROR_CHANNEL: u32 = 2;
43pub const SEND_ERROR_SOCKET: u32 = 1;
44pub const SEND_ERROR_FRAGMENT: u32 = 3;
45pub const SEND_ERROR_UNKNOWN: u32 = 4;
46pub const SEND_ERROR_LENGTH: u32 = 5;
47pub const SEND_ERROR_IDENTITY: u32 = 6;
48
49const NACK_REDUNDANCY_DEFAULT: u32 = 1;
50
51pub type OnConnectedCallback = unsafe extern "C" fn();
52
53#[derive(Clone, Copy)]
54#[repr(C)]
55#[derive(Default, Debug)]
56pub struct TachyonStats {
57    pub channel_stats: ChannelStats,
58    pub packets_dropped: u64,
59    pub unreliable_sent: u64,
60    pub unreliable_received: u64,
61}
62
63impl std::fmt::Display for TachyonStats {
64    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
65        write!(
66            f,
67            "channel_stats:{0} packets_dropped:{1} unreliable_sent:{2} unreliable_received:{3}\n",
68            self.channel_stats,
69            self.packets_dropped,
70            self.unreliable_sent,
71            self.unreliable_received
72        )
73    }
74}
75
76#[derive(Clone, Copy)]
77#[repr(C)]
78pub struct TachyonConfig {
79    pub use_identity: u32,
80    pub drop_packet_chance: u64,
81    pub drop_reliable_only: u32,
82    pub receive_window_size: u16,
83    pub nack_redundancy: u32
84}
85
86impl TachyonConfig {
87    pub fn default() -> Self {
88        let default = TachyonConfig {
89            use_identity: 0,
90            drop_packet_chance: 0,
91            drop_reliable_only: 0,
92            receive_window_size: RECEIVE_WINDOW_SIZE_DEFAULT,
93            nack_redundancy: NACK_REDUNDANCY_DEFAULT
94        };
95        return default;
96    }
97
98    pub fn get_receive_window_size(&self) -> u16 {
99        if self.receive_window_size > 0 {
100            return self.receive_window_size;
101        } else {
102            return RECEIVE_WINDOW_SIZE_DEFAULT;
103        }
104    }
105}
106
107#[derive(Clone, Copy)]
108#[repr(C)]
109#[derive(Default)]
110pub struct TachyonSendResult {
111    pub sent_len: u32,
112    pub error: u32,
113    pub header: Header,
114}
115
116pub struct Tachyon {
117    pub id: u16,
118    pub socket: TachyonSocket,
119    pub unreliable_sender: Option<UnreliableSender>,
120    pub identities: FxHashMap<u32, u32>,
121    pub connections: FxHashMap<NetworkAddress, Connection>,
122    pub channels: FxHashMap<(NetworkAddress, u8), Channel>,
123    pub channel_config: FxHashMap<u8, bool>,
124    pub config: TachyonConfig,
125    pub nack_send_data: Vec<u8>,
126    pub stats: TachyonStats,
127    pub start_time: Instant,
128    pub last_identity_link_request: Instant,
129    pub identity: Identity,
130    pub on_connected_callback: Option<OnConnectedCallback>,
131}
132
133impl Tachyon {
134    pub fn create(config: TachyonConfig) -> Self {
135        return Tachyon::create_with_id(config, 0);
136    }
137
138    pub fn create_with_id(config: TachyonConfig, id: u16) -> Self {
139        let socket = TachyonSocket::create();
140
141        let mut tachyon = Tachyon {
142            id,
143            identities: FxHashMap::default(),
144            connections: FxHashMap::default(),
145            channels: FxHashMap::default(),
146            channel_config: FxHashMap::default(),
147            socket: socket,
148            unreliable_sender: None,
149            config,
150            nack_send_data: vec![0; 4096],
151            stats: TachyonStats::default(),
152            start_time: Instant::now(),
153            last_identity_link_request: Instant::now() - Duration::new(100, 0),
154            identity: Identity::default(),
155            on_connected_callback: None,
156        };
157        tachyon.channel_config.insert(1, true);
158        tachyon.channel_config.insert(2, false);
159
160        return tachyon;
161    }
162
163
164    pub fn time_since_start(&self) -> u64 {
165        return Instant::now().duration_since(self.start_time).as_millis() as u64;
166    }
167
168    pub fn bind(&mut self, address: NetworkAddress) -> bool {
169        match self.socket.bind_socket(address) {
170            CreateConnectResult::Success => {
171                self.unreliable_sender = self.create_unreliable_sender();
172                return true;
173            }
174            CreateConnectResult::Error => {
175                return false;
176            }
177        }
178    }
179
180    pub fn connect(&mut self, address: NetworkAddress) -> bool {
181        match self.socket.connect_socket(address) {
182            CreateConnectResult::Success => {
183                let local_address = NetworkAddress::default();
184                self.try_create_connection(local_address);
185                self.create_configured_channels(local_address);
186                self.unreliable_sender = self.create_unreliable_sender();
187                return true;
188            }
189            CreateConnectResult::Error => {
190                return false;
191            }
192        }
193    }
194
195    pub fn create_unreliable_sender(&self) -> Option<UnreliableSender> {
196        let socket = self.socket.clone_socket();
197        if !socket.is_some() {
198            return None;
199        }
200        let sender = UnreliableSender { socket: socket };
201        return Some(sender);
202    }
203
204    pub fn get_channel(&mut self, address: NetworkAddress, channel_id: u8) -> Option<&mut Channel> {
205        match self.channels.get_mut(&(address, channel_id)) {
206            Some(channel) => {
207                return Some(channel);
208            }
209            None => {
210                return None;
211            }
212        }
213    }
214
215    fn create_configured_channels(&mut self, address: NetworkAddress) {
216        for config in &self.channel_config {
217            let channel_id = *config.0;
218            let ordered = *config.1;
219            match self.channels.get_mut(&(address, channel_id)) {
220                Some(_) => {}
221                None => {
222                    let channel = Channel::create(channel_id, ordered, address, self.config.get_receive_window_size(), self.config.nack_redundancy);
223                    self.channels.insert((address, channel_id), channel);
224                }
225            }
226        }
227    }
228
229    pub fn get_channel_count(&mut self, address: NetworkAddress) -> u32 {
230        let mut count = 0;
231        for config in &self.channel_config {
232            let channel_id = *config.0;
233            if self.channels.contains_key(&(address, channel_id)) {
234                count += 1;
235            }
236        }
237        return count;
238    }
239
240    fn remove_configured_channels(&mut self, address: NetworkAddress) {
241        for config in &self.channel_config {
242            let channel_id = *config.0;
243            self.channels.remove(&(address, channel_id));
244        }
245    }
246
247    pub fn configure_channel(&mut self, channel_id: u8, ordered: bool) -> bool {
248        if channel_id < 3 {
249            return false;
250        }
251        self.channel_config.insert(channel_id, ordered);
252        return true;
253    }
254
255    pub fn get_combined_stats(&mut self) -> TachyonStats {
256        let mut channel_stats = ChannelStats::default();
257        for channel in self.channels.values_mut() {
258            channel.update_stats();
259            channel_stats.add_from(&channel.stats);
260        }
261        let mut stats = self.stats.clone();
262        stats.channel_stats = channel_stats;
263        return stats;
264    }
265
266    pub fn update(&mut self) {
267        self.client_identity_update();
268
269        for channel in self.channels.values_mut() {
270            channel.update(&self.socket);
271        }
272    }
273
274    fn receive_published_channel_id(&mut self,  receive_buffer: &mut [u8], address: NetworkAddress, channel_id: u8) -> u32 {
275        match self.channels.get_mut(&(address, channel_id)) {
276            Some(channel) => {
277                let res = channel.receive_published(receive_buffer);
278                return res.0;
279            }
280            None => {
281                return 0;
282            }
283        }
284    }
285
286    fn receive_published_all_channels(&mut self, receive_buffer: &mut [u8]) -> TachyonReceiveResult {
287        let mut result = TachyonReceiveResult::default();
288
289        for channel in self.channels.values_mut() {
290            let res = channel.receive_published(receive_buffer);
291            if res.0 > 0 {
292                result.length = res.0;
293                result.address = res.1;
294                result.channel = channel.id as u16;
295                return result;
296            }
297        }
298        return result;
299    }
300
301    pub fn receive_loop(&mut self, receive_buffer: &mut [u8]) -> TachyonReceiveResult {
302        let mut result = TachyonReceiveResult::default();
303
304        for _ in 0..100 {
305            let receive_result = self.receive_from_socket(receive_buffer);
306            match receive_result {
307                ReceiveResult::Reliable {
308                    network_address: socket_addr,
309                    channel_id,
310                } => {
311                    let published = self.receive_published_channel_id(receive_buffer, socket_addr, channel_id);
312                    if published > 0 {
313                        result.channel = channel_id as u16;
314                        result.length = published;
315                        result.address = socket_addr;
316                        return result;
317                    }
318                }
319                ReceiveResult::UnReliable {
320                    received_len,
321                    network_address: socket_addr,
322                } => {
323                    result.length = received_len as u32;
324                    result.address = socket_addr;
325                    return result;
326                }
327                ReceiveResult::Empty => {
328                    break;
329                }
330                ReceiveResult::Retry => {}
331                ReceiveResult::Error => {
332                    result.error = RECEIVE_ERROR_UNKNOWN;
333                    return result;
334                }
335                ReceiveResult::ChannelError => {
336                    result.error = RECEIVE_ERROR_CHANNEL;
337                    return result;
338                }
339            }
340        }
341        return self.receive_published_all_channels(receive_buffer);
342    }
343
344    fn receive_from_socket(&mut self, receive_buffer: &mut [u8]) -> ReceiveResult {
345        let address: NetworkAddress;
346        let received_len: usize;
347        let header: Header;
348
349        let socket_result = self.socket.receive(receive_buffer,self.config.drop_packet_chance,self.config.drop_reliable_only == 1);
350        match socket_result {
351            SocketReceiveResult::Success {bytes_received, network_address} => {
352                received_len = bytes_received;
353                address = network_address;
354
355                header = Header::read(receive_buffer);
356
357                if self.socket.is_server {
358                    if self.config.use_identity == 1 {
359                        let connection_header: ConnectionHeader;
360
361                        if header.message_type == MESSAGE_TYPE_LINK_IDENTITY {
362                            connection_header = ConnectionHeader::read(receive_buffer);
363                            self.try_link_identity(address, connection_header.id, connection_header.session_id);
364                            return ReceiveResult::Empty;
365                        } else if header.message_type == MESSAGE_TYPE_UNLINK_IDENTITY {
366                            connection_header = ConnectionHeader::read(receive_buffer);
367                            self.try_unlink_identity(address, connection_header.id, connection_header.session_id);
368                            return ReceiveResult::Empty;
369                        } else {
370                            if !self.validate_and_update_linked_connection(address) {
371                                return ReceiveResult::Empty;
372                            }
373                        }
374                    } else {
375                        self.on_receive_connection_update(address);
376                    }
377                } else {
378                    if self.config.use_identity == 1 {
379                        if header.message_type == MESSAGE_TYPE_IDENTITY_LINKED {
380                            self.identity.set_linked(1);
381                            if let Some(callback) = self.on_connected_callback {
382                                unsafe {
383                                    callback();
384                                }
385                            }
386                            return ReceiveResult::Empty;
387                        } else if header.message_type == MESSAGE_TYPE_IDENTITY_UNLINKED {
388                            self.identity.set_linked(0);
389                            return ReceiveResult::Empty;
390                        }
391
392                        if !self.identity.is_linked() {
393                            return ReceiveResult::Empty;
394                        }
395                    }
396                }
397            }
398            SocketReceiveResult::Empty => {
399                return ReceiveResult::Empty;
400            }
401            SocketReceiveResult::Error => {
402                return ReceiveResult::Error;
403            }
404            SocketReceiveResult::Dropped => {
405                self.stats.packets_dropped += 1;
406                return ReceiveResult::Retry;
407            }
408        }
409
410        if header.message_type == MESSAGE_TYPE_UNRELIABLE {
411            self.stats.unreliable_received += 1;
412            return ReceiveResult::UnReliable {
413                received_len: received_len,
414                network_address: address,
415            };
416        }
417
418        let channel = match self.channels.get_mut(&(address, header.channel)) {
419            Some(c) => c,
420            None => {
421                return ReceiveResult::ChannelError;
422            }
423        };
424
425        channel.stats.bytes_received += received_len as u64;
426
427        if header.message_type == MESSAGE_TYPE_NONE {
428            channel.stats.nones_received += 1;
429            if channel.receiver.receive_packet(header.sequence, receive_buffer, received_len)
430            {
431                channel.stats.nones_accepted += 1;
432            }
433            return ReceiveResult::Retry;
434        }
435
436        if header.message_type == MESSAGE_TYPE_NACK {
437            channel.process_nack_message(address, receive_buffer);
438            return ReceiveResult::Retry;
439        }
440
441        if header.message_type == MESSAGE_TYPE_FRAGMENT {
442            channel.process_fragment_message(header.sequence, receive_buffer, received_len);
443            return ReceiveResult::Retry;
444        }
445
446        
447
448        if header.message_type == MESSAGE_TYPE_RELIABLE || header.message_type == MESSAGE_TYPE_RELIABLE_WITH_NACK {
449
450            if header.message_type == MESSAGE_TYPE_RELIABLE_WITH_NACK {
451                channel.process_single_nack(address, receive_buffer);
452            }
453
454            if channel.receiver.receive_packet(header.sequence, receive_buffer, received_len) {
455                channel.stats.received += 1;
456                return ReceiveResult::Reliable {
457                    network_address: address,
458                    channel_id: header.channel,
459                };
460            } else {
461                return ReceiveResult::Retry;
462            }
463        }
464
465        return ReceiveResult::Error;
466    }
467
468    pub fn send_unreliable(&mut self, address: NetworkAddress, data: &mut [u8], length: usize) -> TachyonSendResult {
469        if !self.can_send() {
470            let mut result = TachyonSendResult::default();
471            result.error = SEND_ERROR_IDENTITY;
472            return result;
473        }
474
475        match &self.unreliable_sender {
476            Some(sender) => {
477                let result = sender.send_unreliable(address, data, length);
478                if result.error == 0 {
479                    self.stats.unreliable_sent += 1;
480                }
481                return result;
482            }
483            None => {
484                let mut result = TachyonSendResult::default();
485                result.error = SEND_ERROR_UNKNOWN;
486                return result;
487            }
488        }
489    }
490
491    pub fn send_reliable(&mut self, channel_id: u8, address: NetworkAddress, data: &mut [u8], body_len: usize) -> TachyonSendResult {
492        let mut result = TachyonSendResult::default();
493
494        if !self.can_send() {
495            result.error = SEND_ERROR_IDENTITY;
496            return result;
497        }
498
499        if body_len == 0 {
500            result.error = SEND_ERROR_LENGTH;
501            return result;
502        }
503
504        if channel_id == 0 {
505            result.error = SEND_ERROR_CHANNEL;
506            return result;
507        }
508
509        if !self.socket.socket.is_some() {
510            result.error = SEND_ERROR_SOCKET;
511            return result;
512        }
513
514        let channel = match self.channels.get_mut(&(address, channel_id)) {
515            Some(c) => c,
516            None => {
517                result.error = SEND_ERROR_CHANNEL;
518                return result;
519            }
520        };
521
522        if Fragmentation::should_fragment(body_len) {
523            let mut fragment_bytes_sent = 0;
524            let frag_sequences = channel.frag.create_fragments(&mut channel.send_buffers, channel.id, data, body_len);
525            if frag_sequences.len() == 0 {
526                result.error = SEND_ERROR_FRAGMENT;
527                return result;
528            }
529
530            for seq in frag_sequences {
531                match channel.send_buffers.get_send_buffer(seq) {
532                    Some(fragment) => {
533                        let sent =self.socket.send_to(address, &fragment.buffer, fragment.buffer.len());
534                        fragment_bytes_sent += sent;
535
536                        channel.stats.bytes_sent += sent as u64;
537                        channel.stats.fragments_sent += 1;
538                    }
539                    None => {
540                        result.error = SEND_ERROR_FRAGMENT;
541                        return result;
542                    }
543                }
544            }
545
546            result.header.message_type = MESSAGE_TYPE_FRAGMENT;
547            result.sent_len = fragment_bytes_sent as u32;
548
549            channel.stats.sent += 1;
550
551            return result;
552        }
553
554        
555        result = channel.send_reliable(address, data, body_len, &self.socket);
556        return result;
557
558        // let nack_option = channel.receiver.nack_queue.pop_front();
559        // let header_size: usize;
560
561        // if nack_option.is_some() {
562        //     header_size = TACHYON_NACKED_HEADER_SIZE;
563            
564        // } else {
565        //     header_size = TACHYON_HEADER_SIZE;
566        // }
567        // let send_buffer_len = length + header_size;
568
569        // match channel.send_buffers.create_send_buffer(send_buffer_len) {
570        //     Some(send_buffer) => {
571        //         let sequence = send_buffer.sequence;
572        //         let buffer = &mut send_buffer.buffer;
573        //         buffer[header_size..length + header_size].copy_from_slice(&data[0..length]);
574
575        //         let mut header = Header::default();
576        //         header.message_type = message_type;
577        //         header.channel = channel.id;
578        //         header.sequence = sequence;
579
580        //         if let Some(nack) = nack_option {
581        //             header.start_sequence = nack.start_sequence;
582        //             header.flags = nack.flags;
583        //             channel.receiver.nack_queue.push_back(nack);
584        //         }
585                
586        //         header.write(buffer);
587
588        //         let sent_len = self.socket.send_to(address, &buffer, send_buffer_len);
589        //         result.sent_len = sent_len as u32;
590        //         result.header = header;
591
592        //         channel.stats.bytes_sent += sent_len as u64;
593        //         channel.stats.sent += 1;
594
595        //         return result;
596        //     }
597        //     None => {
598        //         result.error = SEND_ERROR_UNKNOWN;
599        //         return result;
600        //     }
601        // }
602    }
603}
604
605#[cfg(test)]
606mod tests {
607
608    use serial_test::serial;
609
610    use crate::tachyon::tachyon_test::TachyonTest;
611
612    use super::*;
613
614    #[test]
615    fn test_nack_rotation() {
616        println!("{0}", 4 % 5);
617    }
618
619    #[test]
620    #[serial]
621    fn test_reliable() {
622        // reliable messages just work with message bodies, headers are all internal
623
624        let mut test = TachyonTest::default();
625        test.connect();
626
627        test.send_buffer[0] = 4;
628        let sent = test.client_send_reliable(1, 2);
629        // sent_len reports total including header.
630        assert_eq!(2 + TACHYON_HEADER_SIZE, sent.sent_len as usize);
631
632        let res = test.server_receive();
633        assert_eq!(2, res.length);
634        assert_eq!(4, test.receive_buffer[0]);
635
636        test.client_send_reliable(2, 33);
637        let res = test.server_receive();
638        assert_eq!(33, res.length);
639
640        // fragmented
641        test.client_send_reliable(2, 3497);
642        let res = test.server_receive();
643        assert_eq!(3497, res.length);
644    }
645
646    #[test]
647    #[serial]
648    fn test_unconfigured_channel_fails() {
649        let mut test = TachyonTest::default();
650        test.client.configure_channel(3, true);
651        test.connect();
652
653        let sent = test.client_send_reliable(3, 2);
654        assert_eq!(2 + TACHYON_HEADER_SIZE, sent.sent_len as usize);
655        assert_eq!(0, sent.error);
656
657        let res = test.server_receive();
658        assert_eq!(0, res.length);
659        assert_eq!(RECEIVE_ERROR_CHANNEL, res.error);
660    }
661
662    #[test]
663    #[serial]
664    fn test_configured_channel() {
665        let mut test = TachyonTest::default();
666        test.client.configure_channel(3, true);
667        test.server.configure_channel(3, true);
668        test.connect();
669
670        let sent = test.client_send_reliable(3, 2);
671        assert_eq!(2 + TACHYON_HEADER_SIZE, sent.sent_len as usize);
672        assert_eq!(0, sent.error);
673
674        let res = test.server_receive();
675        assert_eq!(2, res.length);
676        assert_eq!(0, res.error);
677    }
678
679    #[test]
680    #[serial]
681    fn test_unreliable() {
682        let mut test = TachyonTest::default();
683        test.connect();
684
685        // unreliable messages need to be body length + 1;
686        // send length error
687        let send = test.client_send_unreliable(0);
688        assert_eq!(SEND_ERROR_LENGTH, send.error);
689
690        let res = test.server_receive();
691        assert_eq!(0, res.length);
692
693        test.receive_buffer[0] = 1;
694        test.send_buffer[1] = 4;
695        test.send_buffer[2] = 5;
696        test.send_buffer[3] = 6;
697        let sent = test.client_send_unreliable(4);
698        assert_eq!(0, sent.error);
699        assert_eq!(4, sent.sent_len as usize);
700
701        let res = test.server_receive();
702        assert_eq!(4, res.length);
703        assert_eq!(0, test.receive_buffer[0]);
704        assert_eq!(4, test.receive_buffer[1]);
705        assert_eq!(5, test.receive_buffer[2]);
706        assert_eq!(6, test.receive_buffer[3]);
707    }
708}