1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use super::sender;
use crate::configuration::middleware_configuration::Configuration;
use crossbeam::crossbeam_channel::unbounded;
use crossbeam::Sender;
use std::net::TcpStream;
use std::sync::{Arc, Barrier};
use std::thread;
pub fn start(
local_id: usize,
peer_addresses: &Vec<String>,
configuration: &Arc<Configuration>,
) -> Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>> {
let mut peers_channels_to_sockets_threads = Vec::new();
let mut channels_thread_spawn = Vec::new();
for i in 0..peer_addresses.len() {
let peer_id: usize;
if i < local_id {
peer_id = i;
} else {
peer_id = i + 1;
}
let temp_peer_port = peer_addresses[i].clone();
let temp_configuration = Arc::clone(configuration);
channels_thread_spawn.push(thread::spawn(move || {
connect_to_single_peer(local_id, peer_id, temp_peer_port, temp_configuration)
}));
}
for channel_spawn_result in channels_thread_spawn {
match channel_spawn_result.join() {
Ok(channel) => {
peers_channels_to_sockets_threads.push(channel);
}
Err(_) => {
println!("ERROR: There were problems when joining the peer channels");
}
}
}
peers_channels_to_sockets_threads
}
fn connect_to_single_peer(
local_index: usize,
peer_index: usize,
peer_address: String,
configuration: Arc<Configuration>,
) -> Sender<(Arc<Barrier>, Arc<Vec<u8>>)> {
let out: Sender<(Arc<Barrier>, Arc<Vec<u8>>)>;
loop {
let connect = TcpStream::connect(&peer_address);
match connect {
Ok(stream) => {
stream
.set_nonblocking(false)
.expect("ERROR: Failed to set stream non-blocking mode");
let (socket_thread_send, socket_thread_recv) =
unbounded::<(Arc<Barrier>, Arc<Vec<u8>>)>();
out = socket_thread_send;
let temp_config_arc = Arc::clone(&configuration);
let thread_name = format!("sender_thread_{}_{}", local_index, peer_index);
let builder = thread::Builder::new()
.name(thread_name)
.stack_size(configuration.thread_stack_size);
builder
.spawn(move || {
sender::start(stream, socket_thread_recv, local_index, temp_config_arc);
})
.unwrap();
return out;
}
Err(_) => {}
}
}
}