pulsar_network/client/
messages.rs

1use crate::envelope::{Envelope, Kind};
2use crate::{Client, Message, Peer};
3use fides::{chacha20poly1305, x25519};
4use opis::Int;
5use std::collections::HashMap;
6use std::str;
7use std::sync::Arc;
8use std::sync::mpsc::{channel, Sender, Receiver};
9use std::thread;
10use std::time::Instant;
11
12impl Client {
13
14    pub fn messages(&self) -> Receiver<(Message, Peer)> {
15
16        let (sender, receiver): (Sender<(Message, Peer)>, Receiver<(Message, Peer)>) = channel();
17
18        let private_key = self.private_key;
19
20        let public_key = self.public_key;
21
22        let route = self.route.clone();
23
24        let peers_clone = Arc::clone(&self.peers);
25
26        let incoming_socket_clone = Arc::clone(&self.incoming_socket);
27
28        let seeders_clone = Arc::clone(&self.seeders);
29
30        let bootstrap: bool = self.bootstrap;
31
32        thread::spawn(move || {
33
34            let incoming_socket = incoming_socket_clone.lock().unwrap();
35        
36            let mut now = Instant::now();
37            
38            let seeders = seeders_clone.lock().unwrap().clone();
39
40            let join_request = Envelope::new(Kind::JoinRequest, &route.to_bytes(), &public_key, &route);
41            
42            if !bootstrap {
43
44                for seeder in &seeders {
45                    
46                    let _res = incoming_socket.send_to(&join_request.to_bytes(), seeder);
47
48                }
49            
50            }
51
52            loop {
53
54                if now.elapsed().as_secs() > 300 {
55
56                    let mut peers = peers_clone.lock().unwrap();
57
58                    let copy_of_peers = peers.clone();
59                    
60                    *peers = HashMap::new();
61
62                    drop(peers);
63
64                    let ping_request = Envelope::new(Kind::PingRequest, &[], &public_key, &route);
65
66                    for (_, list) in &copy_of_peers {
67
68                        for (_, peer) in list {
69
70                            incoming_socket.send_to(&ping_request.to_bytes(), peer.address).unwrap();
71            
72                        }
73
74                    }
75
76                    if !bootstrap && copy_of_peers.len() == 1 {
77
78                        for seeder in &seeders {
79                
80                            let _res = incoming_socket.send_to(&join_request.to_bytes(), seeder);
81            
82                        }
83                    }
84
85                    now = Instant::now();
86
87                } else {
88
89                    let mut buf = [0; 32000];
90
91                    let (amt, src) = incoming_socket.recv_from(&mut buf).unwrap();
92
93                    let buf = &mut buf[..amt];
94                            
95                    match Envelope::from_bytes(&buf.to_vec()) {
96
97                        Ok(e) => {
98
99                            if e.route == route {
100
101                                match e.kind {
102
103                                    Kind::JoinRequest => {
104                                        
105                                        let ping_request = Envelope::new(Kind::PingRequest, &[], &public_key, &route);
106
107                                        incoming_socket.send_to(&ping_request.to_bytes(), &src).unwrap();
108                                                    
109                                        let tables = peers_clone.lock().unwrap();
110
111                                        for (_, table) in tables.iter() {
112                                            
113                                            let mut peers = Vec::new();
114                                            
115                                            for (_, peer) in table {
116                                                peers.push(peer);
117                                            }
118
119                                            peers.sort_by_key(|k| Int::from_bytes(&public_key) ^ Int::from_bytes(&k.public_key));
120
121                                            let join_response = Envelope::new(Kind::JoinResponse, peers[0].address.to_string().as_bytes(), &public_key, &route);
122
123                                            incoming_socket.send_to(&join_response.to_bytes(), &src).unwrap();
124
125                                        }
126
127                                    },
128                    
129                                    Kind::JoinResponse => {
130
131                                        match str::from_utf8(&e.message) {
132                    
133                                            Ok(s) => {
134
135                                                let ping_request = Envelope::new(Kind::PingResponse, &[], &public_key, &route);
136                                        
137                                                incoming_socket.send_to(&ping_request.to_bytes(), s).unwrap();
138
139                                            },
140
141                                            Err(_) => ()
142
143                                        }
144
145                                    },
146                                    
147                                    Kind::PingRequest => {
148                                        
149                                        let mut peers = peers_clone.lock().unwrap();
150
151                                        let peer = Peer {
152                                            address: src,
153                                            public_key: e.sender,
154                                            shared_key: x25519::shared_key(&private_key, &e.sender)
155                                        };
156                                        
157                                        peer.add_peer(&mut peers, public_key);
158                                        
159                                        let ping_response = Envelope::new(Kind::PingResponse, &[], &public_key, &route);
160                                        
161                                        incoming_socket.send_to(&ping_response.to_bytes(), &src).unwrap();
162                                        
163                                    },
164                                    
165                                    Kind::PingResponse => {
166
167                                        let peer = Peer {
168                                            address: src,
169                                            public_key: e.sender,
170                                            shared_key: x25519::shared_key(&private_key, &e.sender)
171                                        };
172                                        
173                                        let mut peers = peers_clone.lock().unwrap();
174                                            
175                                        peer.add_peer(&mut peers, public_key);
176
177                                    },
178                                    
179                                    Kind::Encrypted => {
180
181                                        let shared_key = x25519::shared_key(&private_key, &e.sender);
182                                                
183                                        match chacha20poly1305::decrypt(&shared_key, &e.message) {
184
185                                            Ok(plain) => {
186                                                match Message::from_bytes(&plain) {
187
188                                                    Ok(message) => {
189                                                        
190                                                        let peer = Peer {
191                                                            address: src,
192                                                            public_key: e.sender,
193                                                            shared_key: shared_key
194                                                        };
195                                                        
196                                                        sender.send((message, peer)).unwrap()
197
198                                                    },
199                                                    Err(_) => ()
200                                                }
201
202                                            },
203
204                                            Err(_) => ()
205
206                                        }
207
208                                    }
209
210                                }
211                            
212                            }
213
214                        },
215
216                        _ => ()
217
218                    }
219
220                }
221
222            }
223
224        });
225
226        receiver
227
228    }
229}