1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use crate::configuration::middleware_configuration::Configuration;
use crate::vv::communication::{handshake, reader};
use crate::vv::structs::messages::{ClientPeerMiddleware, StreamMsg};
use bincode::deserialize_from;
use crossbeam::Sender;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Barrier};
use std::thread;
pub fn start(
local_id: usize,
local_port: usize,
peer_addresses: Vec<String>,
middleware_channel: Sender<ClientPeerMiddleware>,
configuration: Arc<Configuration>,
setup_end_barrier: Arc<Barrier>,
) {
let server = TcpListener::bind(format!("0.0.0.0:{}", local_port))
.expect("ERROR: Stream failed to connect");
server
.set_nonblocking(false)
.expect("ERROR: Failed to set stream non-blocking mode");
let mut connected_peers = 0;
loop {
match server.accept() {
Ok((stream, _)) => match deserialize_from::<_, StreamMsg>(&stream) {
Ok(decoded_msg_type) => match decoded_msg_type {
StreamMsg::HND { index } => {
let setup_end_barrier_clone = Arc::clone(&setup_end_barrier);
handle_new_connection(
local_id,
index,
&peer_addresses,
stream,
&middleware_channel,
&mut connected_peers,
&configuration,
setup_end_barrier_clone,
);
}
_ => {
panic!("ERROR: Unexpected message type");
}
},
Err(e) => {
println!("ERROR: {}", e);
break;
}
},
Err(e) => {
println!("ERROR: {}", e);
break;
}
}
}
}
fn handle_new_connection(
local_id: usize,
peer_id: usize,
peer_addresses: &Vec<String>,
stream: TcpStream,
middleware_channel: &Sender<ClientPeerMiddleware>,
connected_peers: &mut usize,
configuration: &Arc<Configuration>,
setup_end_barrier: Arc<Barrier>,
) {
handshake::send_handshake(&stream, local_id);
let middleware_channel_temp = middleware_channel.clone();
*connected_peers += 1;
let thread_name = format!("stream_reader_{}_{}", local_id, peer_id);
let builder = thread::Builder::new()
.name(thread_name)
.stack_size(configuration.thread_stack_size);
builder
.spawn(move || {
reader::start(
stream,
middleware_channel_temp,
local_id,
peer_id,
setup_end_barrier,
);
})
.unwrap();
if *connected_peers == peer_addresses.len() {
let setup = ClientPeerMiddleware::SETUP;
match middleware_channel.send(setup) {
Ok(_) => {}
Err(e) => {
println!(
"ERROR: Failed to send the SETUP message to client\n\t- {}",
e
);
}
}
}
}