grapevine/
node.rs

1use crate::{
2    connection::{Message, NodeAddr, NodeMap},
3    error::Error,
4    Result,
5};
6use message_io::{
7    events::EventQueue,
8    network::{Endpoint, NetEvent, Network, Transport},
9};
10use std::{
11    net::SocketAddr,
12    sync::{Arc, Mutex},
13    time::Duration,
14};
15
16/// Structure of a node
17pub struct Node {
18    /// List of peers of the current node
19    pub connections: Arc<Mutex<NodeMap>>,
20    /// Public address of the node
21    pub node_addr: SocketAddr,
22    /// Sets the duration (in seconds) of
23    /// emitting messages to other peers
24    pub duration: u32,
25    /// Network of nodes
26    pub network: Arc<Mutex<Network>>,
27    /// Network events queue
28    pub event_queue: EventQueue<NetEvent>,
29    /// Sets the optional peer address to connect to
30    pub peer: Option<String>,
31}
32
33impl Node {
34    /// Creates a new `Node`
35    pub fn new(port: u32, duration: u32, peer: Option<String>) -> Result<Self> {
36        let (mut network, event_queue) = Network::split();
37
38        // Node's own listening address (localhost + port)
39        let listening_addr = format!("127.0.0.1:{}", port);
40        match network.listen(Transport::FramedTcp, &listening_addr) {
41            Ok((_, addr)) => {
42                log_my_address(&addr);
43
44                Ok(Self {
45                    connections: Arc::new(Mutex::new(NodeMap::new(addr))),
46                    node_addr: addr,
47                    duration,
48                    network: Arc::new(Mutex::new(network)),
49                    event_queue,
50                    peer,
51                })
52            }
53            Err(e) => Err(Error::NetworkListeningError(format!(
54                "{}: {}",
55                e, listening_addr
56            ))),
57        }
58    }
59
60    /// Executes the peer-to-peer process.
61    pub fn execute(mut self) -> Result<()> {
62        if let Some(addr) = &self.peer {
63            let mut network = self
64                .network
65                .lock()
66                .map_err(|e| Error::NetworkLockError(e.to_string()))?;
67
68            // Connection to the first peer
69            match network.connect(Transport::FramedTcp, addr) {
70                Ok((endpoint, _)) => {
71                    {
72                        let mut nodes = self
73                            .connections
74                            .lock()
75                            .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
76                        nodes.add_old_one(endpoint);
77                    }
78
79                    send_message(
80                        &mut network,
81                        endpoint,
82                        &Message::RetrievePubAddr(self.node_addr),
83                    )?;
84
85                    // Request a list of existing peers
86                    // Response will be in event queue
87                    send_message(&mut network, endpoint, &Message::RetrievePeerList)?;
88                }
89                Err(e) => {
90                    return Err(Error::NetworkConnectionError(format!("{}: {}", e, &addr)));
91                }
92            }
93        }
94
95        // spawn thread to send random messages to known peers
96        self.spawn_emit_loop()?;
97
98        // process messages
99        self.process_message()?;
100
101        Ok(())
102    }
103
104    fn spawn_emit_loop(&self) -> Result<()> {
105        let sleep_duration = Duration::from_secs(self.duration as u64);
106        let peers_mut = Arc::clone(&self.connections);
107        let network_mut = Arc::clone(&self.network);
108
109        std::thread::spawn(move || {
110            // sleeping and sending
111            loop {
112                std::thread::sleep(sleep_duration);
113
114                let peers = peers_mut.lock().expect("Unable to obtain mutex on peers");
115                let receivers = peers.fetch_receivers();
116
117                // if there are no receivers, skip
118                if receivers.is_empty() {
119                    continue;
120                }
121
122                let mut network = network_mut.lock().expect("Failed to lock network");
123
124                let msg_text = generate_random_message();
125                let msg = Message::RequestRandomInfo(msg_text.clone());
126
127                log_sending_message(
128                    &msg_text,
129                    &receivers
130                        .iter()
131                        .map(|NodeAddr { public, .. }| public)
132                        .collect(),
133                );
134
135                for NodeAddr { endpoint, .. } in receivers {
136                    send_message(&mut network, endpoint, &msg).expect("Failed to send message");
137                }
138            }
139        });
140        Ok(())
141    }
142
143    fn process_message(&mut self) -> Result<()> {
144        loop {
145            match self.event_queue.receive() {
146                // Waiting events
147                NetEvent::Message(message_sender, input_data) => {
148                    match bincode::deserialize(&input_data)? {
149                        Message::RetrievePubAddr(pub_addr) => {
150                            let mut peers = self
151                                .connections
152                                .lock()
153                                .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
154                            peers.add_new_one(message_sender, pub_addr);
155                        }
156                        Message::RetrievePeerList => {
157                            let list = {
158                                let peers = self
159                                    .connections
160                                    .lock()
161                                    .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
162                                peers.get_peers_list()
163                            };
164                            let msg = Message::RespondToListQuery(list);
165                            send_message(
166                                &mut self.network.lock().expect("Error in sending message"),
167                                message_sender,
168                                &msg,
169                            )?;
170                        }
171                        Message::RespondToListQuery(addrs) => {
172                            let filtered: Vec<&SocketAddr> =
173                                addrs.iter().filter(|x| *x != &self.node_addr).collect();
174
175                            log_connected_to_the_peers(&filtered);
176
177                            let mut network = self
178                                .network
179                                .lock()
180                                .map_err(|e| Error::NetworkLockError(e.to_string()))?;
181
182                            for peer in filtered {
183                                if peer == &message_sender.addr() {
184                                    continue;
185                                }
186
187                                // connecting to peer
188                                let (endpoint, _) = network
189                                    .connect(Transport::FramedTcp, *peer)
190                                    .map_err(|e| Error::NetworkConnectionError(e.to_string()))?;
191
192                                // sending public address
193                                let msg = Message::RetrievePubAddr(self.node_addr);
194                                send_message(&mut network, endpoint, &msg)?;
195
196                                // saving peer
197                                self.connections
198                                    .lock()
199                                    .map_err(|e| Error::AddPeerError(e.to_string()))?
200                                    .add_old_one(endpoint);
201                            }
202                        }
203                        Message::RequestRandomInfo(text) => {
204                            let pub_addr = self
205                                .connections
206                                .lock()
207                                .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?
208                                .get_pub_addr(&message_sender)
209                                .expect("Error in fetching public address");
210                            log_message_received(&pub_addr, &text);
211                        }
212                    }
213                }
214                NetEvent::Connected(_, _) => {}
215                NetEvent::Disconnected(endpoint) => {
216                    let mut peers = self
217                        .connections
218                        .lock()
219                        .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
220                    NodeMap::drop(&mut peers, endpoint);
221                }
222            }
223        }
224    }
225}
226
227fn send_message(network: &mut Network, to: Endpoint, msg: &Message) -> Result<()> {
228    let output_data = bincode::serialize(msg)?;
229    network.send(to, &output_data);
230    Ok(())
231}
232
233fn generate_random_message() -> String {
234    petname::Petnames::default().generate_one(2, "-")
235}
236
237trait ToSocketAddr {
238    fn get_addr(&self) -> SocketAddr;
239}
240
241impl ToSocketAddr for Endpoint {
242    fn get_addr(&self) -> SocketAddr {
243        self.addr()
244    }
245}
246
247impl ToSocketAddr for &Endpoint {
248    fn get_addr(&self) -> SocketAddr {
249        self.addr()
250    }
251}
252
253impl ToSocketAddr for SocketAddr {
254    fn get_addr(&self) -> SocketAddr {
255        *self
256    }
257}
258
259impl ToSocketAddr for &SocketAddr {
260    fn get_addr(&self) -> SocketAddr {
261        **self
262    }
263}
264
265fn format_list_of_addrs<T: ToSocketAddr>(items: &Vec<T>) -> String {
266    if items.is_empty() {
267        "[no one]".to_owned()
268    } else {
269        let joined = items
270            .iter()
271            .map(|x| format!("\"{}\"", ToSocketAddr::get_addr(x)))
272            .collect::<Vec<String>>()
273            .join(", ");
274
275        format!("[{}]", joined)
276    }
277}
278
279fn log_message_received<T: ToSocketAddr>(from: &T, text: &str) {
280    log::info!(
281        "Received message [{}] from \"{}\"",
282        text,
283        ToSocketAddr::get_addr(from)
284    );
285}
286
287fn log_my_address<T: ToSocketAddr>(addr: &T) {
288    log::info!("My address is \"{}\"", ToSocketAddr::get_addr(addr));
289}
290
291fn log_connected_to_the_peers<T: ToSocketAddr>(peers: &Vec<T>) {
292    log::info!("Connected to the peers at {}", format_list_of_addrs(peers));
293}
294
295fn log_sending_message<T: ToSocketAddr>(message: &str, receivers: &Vec<T>) {
296    log::info!(
297        "Sending message [{}] to {}",
298        message,
299        format_list_of_addrs(receivers)
300    );
301}