tcb/vv/version_vector.rs
1use crate::broadcast::broadcast_trait::{GenericReturn, TCB};
2use crate::configuration::middleware_configuration::Configuration;
3use crate::vv::communication::{acceptor, connector};
4use crate::vv::middleware::middleware_thread;
5use crate::vv::structs::messages::{ClientPeerMiddleware, MiddlewareClient};
6use crate::vv::structs::version_vector::VersionVector;
7use crossbeam::crossbeam_channel::unbounded;
8use crossbeam::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TryRecvError};
9use std::sync::{Arc, Barrier};
10use std::time::Duration;
11use std::{thread, usize};
12
13/**
14 * Client side of the version vector based middleware service.
15 * Maintains the API and necessary state to send and deliver messages.
16 */
17#[allow(non_snake_case)]
18pub struct VV {
19 //Receiver end of the channel between the client and the middleware thread
20 receive_channel: Receiver<MiddlewareClient>,
21 //Sender end of the channel between the client and the middleware thread
22 middleware_channel: Sender<ClientPeerMiddleware>,
23 message_id: usize,
24 //Dot of the next sent message
25 V: VersionVector,
26 //Peer's id
27 local_id: usize,
28}
29
30impl VV {
31 /**
32 * Updates the next sent message's version vector upon a delivery.
33 *
34 * # Arguments
35 *
36 * `message` - Delivered or stable message.
37 */
38 fn handle_delivery(&mut self, message: MiddlewareClient) -> GenericReturn {
39 match message {
40 MiddlewareClient::DELIVER {
41 sender_id,
42 version_vector,
43 message,
44 } => {
45 self.V[sender_id] = version_vector[sender_id];
46
47 GenericReturn::Delivery(message.payload, sender_id, version_vector[sender_id])
48 }
49 MiddlewareClient::STABLE {
50 sender_id,
51 message_id,
52 ..
53 } => GenericReturn::Stable(sender_id, message_id),
54 _ => {
55 panic!("ERROR: Received a SETUP when it shouldn't!");
56 }
57 }
58 }
59
60 /**
61 * Starting method of the Middleware service. It creates and initializes
62 * the necessary variables, communication channels and threads.
63 *
64 * # Arguments
65 *
66 * `local_id` - Local peer's globally unique id.
67 *
68 * `local_port` - Port where the middleware will be listening for connections.
69 *
70 * `peer_addresses` - Addresses the middleware will connect to.
71 *
72 * `configuration` - Middleware's configuration file.
73 */
74 fn start_service(
75 local_id: usize,
76 local_port: usize,
77 peer_addresses: Vec<String>,
78 configuration: Arc<Configuration>,
79 ) -> (Sender<ClientPeerMiddleware>, Receiver<MiddlewareClient>) {
80 //Creating the clone of the middleware configuration arc
81 let configuration_clone = Arc::clone(&configuration);
82
83 //Creating the channel where the middleware writes to
84 //and the client reads from
85 let (middleware_send_channel, peer_receive_channel) = unbounded::<MiddlewareClient>();
86
87 //Creating the channel where the main middleware thread reads from
88 //and the peer threads and client write to
89 let (peer_reader_send_channel, middleware_receive_channel) =
90 unbounded::<ClientPeerMiddleware>();
91
92 let peer_reader_send_channel_clone = peer_reader_send_channel.clone();
93
94 //Cloning the peer addresses for the acceptor thread
95 let acceptor_thread_peer_addresses = peer_addresses.clone();
96
97 //Formatting the peer's acceptor thread name
98 let thread_name = format!("acceptor_thread_{}", local_id);
99 let builder = thread::Builder::new()
100 .name(thread_name)
101 .stack_size(configuration.thread_stack_size);
102
103 //Cloning the channel to the logging service
104 let setup_end_barrier = Arc::new(Barrier::new(peer_addresses.len() + 1));
105 let setup_end_barrier_clone = Arc::clone(&setup_end_barrier);
106
107 //Spawning the acceptor thread
108 builder
109 .spawn(move || {
110 acceptor::start(
111 local_id,
112 local_port,
113 acceptor_thread_peer_addresses,
114 peer_reader_send_channel_clone,
115 configuration,
116 setup_end_barrier_clone,
117 );
118 })
119 .unwrap();
120
121 //Connecting to the peers' ports and getting the channels sender ends
122 //between the middleware and the sender thread
123 let channels_to_socket_threads: Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>> =
124 connector::start(local_id, &peer_addresses, &configuration_clone);
125
126 //Formatting the peer's middlware thread name
127 let thread_name = format!("middleware_thread_{}", local_id);
128 let builder = thread::Builder::new()
129 .name(thread_name)
130 .stack_size(configuration_clone.middleware_thread_stack_size);
131
132 //Spawning the main middleware thread
133 builder
134 .spawn(move || {
135 middleware_thread::start(
136 local_id,
137 peer_addresses,
138 middleware_receive_channel,
139 middleware_send_channel,
140 channels_to_socket_threads,
141 configuration_clone,
142 )
143 })
144 .unwrap();
145
146 setup_end_barrier.wait();
147 //Return the channels the peer writes and reads from to the middleware
148 (peer_reader_send_channel, peer_receive_channel)
149 }
150}
151
152#[allow(non_snake_case)]
153impl TCB for VV {
154 /**
155 * Type of the return from a send call, which is an empty value or an error.
156 */
157 type SendCallReturn = Result<(), SendError<ClientPeerMiddleware>>;
158
159 /**
160 * Creates a new middleware instance. This function only returns after the middleware
161 * has a connection to every other peer in both directions.
162 *
163 * # Arguments
164 *
165 * `local_id` - Peer's globally unique id in the group.
166 *
167 * `local_port` - Port where the middleware will be listening for connections.
168 *
169 * `peer_addresses` - Addresses the middleware will connect to.
170 *
171 * `configuration` - Middleware's configuration file.
172 */
173 fn new(
174 local_id: usize,
175 local_port: usize,
176 peer_addresses: Vec<String>,
177 configuration: Configuration,
178 ) -> Self {
179 let configuration = Arc::new(configuration);
180 let client_number = peer_addresses.len() + 1;
181
182 let (middleware_channel, receive_channel) =
183 Self::start_service(local_id, local_port, peer_addresses, configuration);
184
185 //Initializing the version vector
186 let V = VersionVector::new(client_number);
187
188 VV {
189 receive_channel,
190 middleware_channel,
191 message_id: 0,
192 V,
193 local_id,
194 }
195 }
196
197 /**
198 * Broadcasts a message to every peer in the group.
199 * Returns the sent message context if successfull.
200 *
201 * # Arguments
202 *
203 * `message` - Serialized message to be broadcast
204 */
205 fn send(&mut self, message: Vec<u8>) -> Self::SendCallReturn {
206 self.message_id += 1;
207 self.V[self.local_id] = self.message_id;
208
209 let msg = ClientPeerMiddleware::CLIENT {
210 msg_id: self.message_id,
211 payload: message,
212 version_vector: self.V.clone(),
213 };
214
215 self.middleware_channel.send(msg)?;
216
217 Ok(())
218 }
219
220 /**
221 * Signals and waits for the middleware to terminate.
222 */
223 fn end(&self) {
224 let end_message = ClientPeerMiddleware::END;
225 self.middleware_channel.send(end_message).unwrap();
226
227 loop {
228 match self.receive_channel.recv() {
229 Ok(msg) => match msg {
230 MiddlewareClient::SETUP => {
231 break;
232 }
233 _ => {}
234 },
235 Err(_) => {}
236 }
237 }
238 }
239
240 /**
241 * Delivers a message from the middleware. Blocks the calling thread
242 * until a message is delivered or the channel to the middleware is
243 * empty or disconnected.
244 */
245 fn recv(&mut self) -> Result<GenericReturn, RecvError> {
246 match self.receive_channel.recv() {
247 Ok(msg) => Ok(self.handle_delivery(msg)),
248 Err(e) => Err(e),
249 }
250 }
251
252 /**
253 * Attempts to deliver a message from the middleware without blocking
254 * the caller thread. Either a message is immeadiately delivered
255 * from the channel or an error is returned if the channel is empty.
256 */
257 fn try_recv(&mut self) -> Result<GenericReturn, TryRecvError> {
258 match self.receive_channel.try_recv() {
259 Ok(msg) => Ok(self.handle_delivery(msg)),
260 Err(e) => Err(e),
261 }
262 }
263
264 /**
265 * Waits for a message to be delivered from the middleware for a
266 * limited time. If the channel is empty and not disconnected, the
267 * caller thread is blocked until a message is received in the channel
268 * or the timeout ends. If there are no messages until the timeout ends or
269 * the channel becomes disconnected, an error is returned.
270 *
271 * # Arguments
272 *
273 * `duration` - Timeout duration
274 */
275 fn recv_timeout(&mut self, duration: Duration) -> Result<GenericReturn, RecvTimeoutError> {
276 match self.receive_channel.recv_timeout(duration) {
277 Ok(msg) => Ok(self.handle_delivery(msg)),
278 Err(e) => Err(e),
279 }
280 }
281
282 /**
283 * ACKS a stable message, but is not necessary to call in the VV approach.
284 *
285 * * # Arguments
286 *
287 * `id` - Stable dot id field
288 *
289 * `counter` - Stable dot counter field
290 */
291 fn tcbstable(&mut self, _: usize, _: usize) {
292 //Not implemented for VV
293 }
294}