Skip to main content

tcb/graph/communication/
connector.rs

1use super::sender;
2use crate::configuration::middleware_configuration::Configuration;
3use crossbeam::crossbeam_channel::unbounded;
4use crossbeam::Sender;
5use std::net::TcpStream;
6use std::sync::{Arc, Barrier};
7use std::thread;
8
9/**
10 * Starts the Connector thread that connects to every peer in the group and ends when
11 * successfully connected to all of them.
12 *
13 * # Arguments
14 *
15 * `local_id` - Local peer's globally unique id.
16 *
17 * `peer_addresses` - Addresses the middleware will connect to.
18 *
19 * `configuration` - Middleware's configuration file.
20 */
21pub fn start(
22    local_id: usize,
23    peer_addresses: &Vec<String>,
24    configuration: &Arc<Configuration>,
25) -> Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>> {
26    let mut peers_channels_to_sockets_threads = Vec::new();
27    let mut channels_thread_spawn = Vec::new();
28
29    //The connections to the peers will be concurrent
30    for i in 0..peer_addresses.len() {
31        let peer_id: usize;
32
33        if i < local_id {
34            peer_id = i;
35        } else {
36            peer_id = i + 1;
37        }
38
39        let temp_peer_port = peer_addresses[i].clone();
40        let temp_configuration = Arc::clone(configuration);
41
42        channels_thread_spawn.push(thread::spawn(move || {
43            connect_to_single_peer(local_id, peer_id, temp_peer_port, temp_configuration)
44        }));
45    }
46
47    for channel_spawn_result in channels_thread_spawn {
48        match channel_spawn_result.join() {
49            Ok(channel) => {
50                peers_channels_to_sockets_threads.push(channel);
51            }
52            Err(_) => {
53                println!("ERROR: There were problems when joining the peer channels");
54            }
55        }
56    }
57
58    peers_channels_to_sockets_threads
59}
60
61/**
62 * Connects to a single peer. The call to this will only end when the
63 * connection to the peer is successfull.
64 */
65fn connect_to_single_peer(
66    local_index: usize,
67    peer_index: usize,
68    peer_address: String,
69    configuration: Arc<Configuration>,
70) -> Sender<(Arc<Barrier>, Arc<Vec<u8>>)> {
71    let out: Sender<(Arc<Barrier>, Arc<Vec<u8>>)>;
72
73    loop {
74        let connect = TcpStream::connect(&peer_address);
75        match connect {
76            Ok(stream) => {
77                stream
78                    .set_nonblocking(false)
79                    .expect("ERROR: Failed to set stream non-blocking mode");
80
81                let (socket_thread_send, socket_thread_recv) =
82                    unbounded::<(Arc<Barrier>, Arc<Vec<u8>>)>();
83
84                out = socket_thread_send;
85
86                let temp_config_arc = Arc::clone(&configuration);
87
88                let thread_name = format!("sender_thread_{}_{}", local_index, peer_index);
89                let builder = thread::Builder::new()
90                    .name(thread_name)
91                    .stack_size(configuration.thread_stack_size);
92
93                builder
94                    .spawn(move || {
95                        sender::start(stream, socket_thread_recv, local_index, temp_config_arc);
96                    })
97                    .unwrap();
98
99                return out;
100            }
101            Err(_) => {}
102        }
103    }
104}