tcb/graph/communication/
connector.rs1use super::sender;
2use crate::configuration::middleware_configuration::Configuration;
3use crossbeam::crossbeam_channel::unbounded;
4use crossbeam::Sender;
5use std::net::TcpStream;
6use std::sync::{Arc, Barrier};
7use std::thread;
8
9pub fn start(
22 local_id: usize,
23 peer_addresses: &Vec<String>,
24 configuration: &Arc<Configuration>,
25) -> Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>> {
26 let mut peers_channels_to_sockets_threads = Vec::new();
27 let mut channels_thread_spawn = Vec::new();
28
29 for i in 0..peer_addresses.len() {
31 let peer_id: usize;
32
33 if i < local_id {
34 peer_id = i;
35 } else {
36 peer_id = i + 1;
37 }
38
39 let temp_peer_port = peer_addresses[i].clone();
40 let temp_configuration = Arc::clone(configuration);
41
42 channels_thread_spawn.push(thread::spawn(move || {
43 connect_to_single_peer(local_id, peer_id, temp_peer_port, temp_configuration)
44 }));
45 }
46
47 for channel_spawn_result in channels_thread_spawn {
48 match channel_spawn_result.join() {
49 Ok(channel) => {
50 peers_channels_to_sockets_threads.push(channel);
51 }
52 Err(_) => {
53 println!("ERROR: There were problems when joining the peer channels");
54 }
55 }
56 }
57
58 peers_channels_to_sockets_threads
59}
60
61fn connect_to_single_peer(
66 local_index: usize,
67 peer_index: usize,
68 peer_address: String,
69 configuration: Arc<Configuration>,
70) -> Sender<(Arc<Barrier>, Arc<Vec<u8>>)> {
71 let out: Sender<(Arc<Barrier>, Arc<Vec<u8>>)>;
72
73 loop {
74 let connect = TcpStream::connect(&peer_address);
75 match connect {
76 Ok(stream) => {
77 stream
78 .set_nonblocking(false)
79 .expect("ERROR: Failed to set stream non-blocking mode");
80
81 let (socket_thread_send, socket_thread_recv) =
82 unbounded::<(Arc<Barrier>, Arc<Vec<u8>>)>();
83
84 out = socket_thread_send;
85
86 let temp_config_arc = Arc::clone(&configuration);
87
88 let thread_name = format!("sender_thread_{}_{}", local_index, peer_index);
89 let builder = thread::Builder::new()
90 .name(thread_name)
91 .stack_size(configuration.thread_stack_size);
92
93 builder
94 .spawn(move || {
95 sender::start(stream, socket_thread_recv, local_index, temp_config_arc);
96 })
97 .unwrap();
98
99 return out;
100 }
101 Err(_) => {}
102 }
103 }
104}