1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use super::version_vector::VV;
use crate::configuration::middleware_configuration::Configuration;
use crate::vv::structs::messages::{ClientPeerMiddleware, Message, MiddlewareClient};
use crate::vv::structs::version_vector::VersionVector;
use bincode::serialize;
use crossbeam::{Receiver, Sender};
use std::sync::{Arc, Barrier};
pub fn start(
local_id: usize,
peer_addresses: Vec<String>,
receive_channel: Receiver<ClientPeerMiddleware>,
client: Sender<MiddlewareClient>,
peer_channels: Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>>,
configuration: Arc<Configuration>,
) {
let mut vv = VV::new(
peer_addresses.len() + 1,
local_id,
client.clone(),
Arc::clone(&configuration),
);
loop {
match receive_channel.recv() {
Ok(ClientPeerMiddleware::CLIENT {
msg_id,
payload,
version_vector,
}) => {
handle_message_from_client(
&mut vv,
msg_id,
payload,
version_vector,
&peer_channels,
);
}
Ok(ClientPeerMiddleware::PEER { message, peer_id }) => {
vv.receive(peer_id, message);
}
Ok(ClientPeerMiddleware::SETUP) => {}
Ok(ClientPeerMiddleware::END) => {
handle_finished_setup(&client);
break;
}
Err(_) => {
break;
}
}
}
}
fn handle_message_from_client(
vv: &mut VV,
msg_id: usize,
payload: Vec<u8>,
version_vector: VersionVector,
channels: &Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>>,
) {
let message = Message::new(msg_id, payload, version_vector);
vv.dequeue(message.clone());
let encoded_message: Vec<u8> =
serialize(&message).expect("ERROR: Couldn't serialize the CLIENT message");
let arc_msg = Arc::new(encoded_message);
let stream_sender_barrier = Arc::new(Barrier::new(channels.len()));
for channel in channels {
match &channel.send((Arc::clone(&stream_sender_barrier), Arc::clone(&arc_msg))) {
Ok(_) => {}
Err(e) => {
println!("ERROR: Could not send message to sender threads\n\t- {}", e);
}
}
}
}
fn handle_finished_setup(client: &Sender<MiddlewareClient>) {
match client.send(MiddlewareClient::SETUP) {
Ok(_) => {}
Err(e) => {
println!(
"ERROR: Failed to send the finishing SETUP message to client\n\t- {}",
e
);
}
}
}