pulsar_network/client/messages.rs
1use crate::envelope::{Envelope, Kind};
2use crate::{Client, Message, Peer};
3use fides::{chacha20poly1305, x25519};
4use opis::Int;
5use std::collections::HashMap;
6use std::str;
7use std::sync::Arc;
8use std::sync::mpsc::{channel, Sender, Receiver};
9use std::thread;
10use std::time::Instant;
11
12impl Client {
13
14 pub fn messages(&self) -> Receiver<(Message, Peer)> {
15
16 let (sender, receiver): (Sender<(Message, Peer)>, Receiver<(Message, Peer)>) = channel();
17
18 let private_key = self.private_key;
19
20 let public_key = self.public_key;
21
22 let route = self.route.clone();
23
24 let peers_clone = Arc::clone(&self.peers);
25
26 let incoming_socket_clone = Arc::clone(&self.incoming_socket);
27
28 let seeders_clone = Arc::clone(&self.seeders);
29
30 let bootstrap: bool = self.bootstrap;
31
32 thread::spawn(move || {
33
34 let incoming_socket = incoming_socket_clone.lock().unwrap();
35
36 let mut now = Instant::now();
37
38 let seeders = seeders_clone.lock().unwrap().clone();
39
40 let join_request = Envelope::new(Kind::JoinRequest, &route.to_bytes(), &public_key, &route);
41
42 if !bootstrap {
43
44 for seeder in &seeders {
45
46 let _res = incoming_socket.send_to(&join_request.to_bytes(), seeder);
47
48 }
49
50 }
51
52 loop {
53
54 if now.elapsed().as_secs() > 300 {
55
56 let mut peers = peers_clone.lock().unwrap();
57
58 let copy_of_peers = peers.clone();
59
60 *peers = HashMap::new();
61
62 drop(peers);
63
64 let ping_request = Envelope::new(Kind::PingRequest, &[], &public_key, &route);
65
66 for (_, list) in ©_of_peers {
67
68 for (_, peer) in list {
69
70 incoming_socket.send_to(&ping_request.to_bytes(), peer.address).unwrap();
71
72 }
73
74 }
75
76 if !bootstrap && copy_of_peers.len() == 1 {
77
78 for seeder in &seeders {
79
80 let _res = incoming_socket.send_to(&join_request.to_bytes(), seeder);
81
82 }
83 }
84
85 now = Instant::now();
86
87 } else {
88
89 let mut buf = [0; 32000];
90
91 let (amt, src) = incoming_socket.recv_from(&mut buf).unwrap();
92
93 let buf = &mut buf[..amt];
94
95 match Envelope::from_bytes(&buf.to_vec()) {
96
97 Ok(e) => {
98
99 if e.route == route {
100
101 match e.kind {
102
103 Kind::JoinRequest => {
104
105 let ping_request = Envelope::new(Kind::PingRequest, &[], &public_key, &route);
106
107 incoming_socket.send_to(&ping_request.to_bytes(), &src).unwrap();
108
109 let tables = peers_clone.lock().unwrap();
110
111 for (_, table) in tables.iter() {
112
113 let mut peers = Vec::new();
114
115 for (_, peer) in table {
116 peers.push(peer);
117 }
118
119 peers.sort_by_key(|k| Int::from_bytes(&public_key) ^ Int::from_bytes(&k.public_key));
120
121 let join_response = Envelope::new(Kind::JoinResponse, peers[0].address.to_string().as_bytes(), &public_key, &route);
122
123 incoming_socket.send_to(&join_response.to_bytes(), &src).unwrap();
124
125 }
126
127 },
128
129 Kind::JoinResponse => {
130
131 match str::from_utf8(&e.message) {
132
133 Ok(s) => {
134
135 let ping_request = Envelope::new(Kind::PingResponse, &[], &public_key, &route);
136
137 incoming_socket.send_to(&ping_request.to_bytes(), s).unwrap();
138
139 },
140
141 Err(_) => ()
142
143 }
144
145 },
146
147 Kind::PingRequest => {
148
149 let mut peers = peers_clone.lock().unwrap();
150
151 let peer = Peer {
152 address: src,
153 public_key: e.sender,
154 shared_key: x25519::shared_key(&private_key, &e.sender)
155 };
156
157 peer.add_peer(&mut peers, public_key);
158
159 let ping_response = Envelope::new(Kind::PingResponse, &[], &public_key, &route);
160
161 incoming_socket.send_to(&ping_response.to_bytes(), &src).unwrap();
162
163 },
164
165 Kind::PingResponse => {
166
167 let peer = Peer {
168 address: src,
169 public_key: e.sender,
170 shared_key: x25519::shared_key(&private_key, &e.sender)
171 };
172
173 let mut peers = peers_clone.lock().unwrap();
174
175 peer.add_peer(&mut peers, public_key);
176
177 },
178
179 Kind::Encrypted => {
180
181 let shared_key = x25519::shared_key(&private_key, &e.sender);
182
183 match chacha20poly1305::decrypt(&shared_key, &e.message) {
184
185 Ok(plain) => {
186 match Message::from_bytes(&plain) {
187
188 Ok(message) => {
189
190 let peer = Peer {
191 address: src,
192 public_key: e.sender,
193 shared_key: shared_key
194 };
195
196 sender.send((message, peer)).unwrap()
197
198 },
199 Err(_) => ()
200 }
201
202 },
203
204 Err(_) => ()
205
206 }
207
208 }
209
210 }
211
212 }
213
214 },
215
216 _ => ()
217
218 }
219
220 }
221
222 }
223
224 });
225
226 receiver
227
228 }
229}