Skip to main content

tcb/vv/
version_vector.rs

1use crate::broadcast::broadcast_trait::{GenericReturn, TCB};
2use crate::configuration::middleware_configuration::Configuration;
3use crate::vv::communication::{acceptor, connector};
4use crate::vv::middleware::middleware_thread;
5use crate::vv::structs::messages::{ClientPeerMiddleware, MiddlewareClient};
6use crate::vv::structs::version_vector::VersionVector;
7use crossbeam::crossbeam_channel::unbounded;
8use crossbeam::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TryRecvError};
9use std::sync::{Arc, Barrier};
10use std::time::Duration;
11use std::{thread, usize};
12
13/**
14 * Client side of the version vector based middleware service.
15 * Maintains the API and necessary state to send and deliver messages.
16 */
17#[allow(non_snake_case)]
18pub struct VV {
19    //Receiver end of the channel between the client and the middleware thread
20    receive_channel: Receiver<MiddlewareClient>,
21    //Sender end of the channel between the client and the middleware thread
22    middleware_channel: Sender<ClientPeerMiddleware>,
23    message_id: usize,
24    //Dot of the next sent message
25    V: VersionVector,
26    //Peer's id
27    local_id: usize,
28}
29
30impl VV {
31    /**
32     * Updates the next sent message's version vector upon a delivery.
33     *
34     * # Arguments
35     *
36     * `message` - Delivered or stable message.
37     */
38    fn handle_delivery(&mut self, message: MiddlewareClient) -> GenericReturn {
39        match message {
40            MiddlewareClient::DELIVER {
41                sender_id,
42                version_vector,
43                message,
44            } => {
45                self.V[sender_id] = version_vector[sender_id];
46
47                GenericReturn::Delivery(message.payload, sender_id, version_vector[sender_id])
48            }
49            MiddlewareClient::STABLE {
50                sender_id,
51                message_id,
52                ..
53            } => GenericReturn::Stable(sender_id, message_id),
54            _ => {
55                panic!("ERROR: Received a SETUP when it shouldn't!");
56            }
57        }
58    }
59
60    /**
61     * Starting method of the Middleware service. It creates and initializes
62     * the necessary variables, communication channels and threads.
63     *
64     * # Arguments
65     *
66     * `local_id` - Local peer's globally unique id.
67     *
68     * `local_port` - Port where the middleware will be listening for connections.
69     *
70     * `peer_addresses` - Addresses the middleware will connect to.
71     *
72     * `configuration` - Middleware's configuration file.
73     */
74    fn start_service(
75        local_id: usize,
76        local_port: usize,
77        peer_addresses: Vec<String>,
78        configuration: Arc<Configuration>,
79    ) -> (Sender<ClientPeerMiddleware>, Receiver<MiddlewareClient>) {
80        //Creating the clone of the middleware configuration arc
81        let configuration_clone = Arc::clone(&configuration);
82
83        //Creating the channel where the middleware writes to
84        //and the client reads from
85        let (middleware_send_channel, peer_receive_channel) = unbounded::<MiddlewareClient>();
86
87        //Creating the channel where the main middleware thread reads from
88        //and the peer threads and client write to
89        let (peer_reader_send_channel, middleware_receive_channel) =
90            unbounded::<ClientPeerMiddleware>();
91
92        let peer_reader_send_channel_clone = peer_reader_send_channel.clone();
93
94        //Cloning the peer addresses for the acceptor thread
95        let acceptor_thread_peer_addresses = peer_addresses.clone();
96
97        //Formatting the peer's acceptor thread name
98        let thread_name = format!("acceptor_thread_{}", local_id);
99        let builder = thread::Builder::new()
100            .name(thread_name)
101            .stack_size(configuration.thread_stack_size);
102
103        //Cloning the channel to the logging service
104        let setup_end_barrier = Arc::new(Barrier::new(peer_addresses.len() + 1));
105        let setup_end_barrier_clone = Arc::clone(&setup_end_barrier);
106
107        //Spawning the acceptor thread
108        builder
109            .spawn(move || {
110                acceptor::start(
111                    local_id,
112                    local_port,
113                    acceptor_thread_peer_addresses,
114                    peer_reader_send_channel_clone,
115                    configuration,
116                    setup_end_barrier_clone,
117                );
118            })
119            .unwrap();
120
121        //Connecting to the peers' ports and getting the channels sender ends
122        //between the middleware and the sender thread
123        let channels_to_socket_threads: Vec<Sender<(Arc<Barrier>, Arc<Vec<u8>>)>> =
124            connector::start(local_id, &peer_addresses, &configuration_clone);
125
126        //Formatting the peer's middlware thread name
127        let thread_name = format!("middleware_thread_{}", local_id);
128        let builder = thread::Builder::new()
129            .name(thread_name)
130            .stack_size(configuration_clone.middleware_thread_stack_size);
131
132        //Spawning the main middleware thread
133        builder
134            .spawn(move || {
135                middleware_thread::start(
136                    local_id,
137                    peer_addresses,
138                    middleware_receive_channel,
139                    middleware_send_channel,
140                    channels_to_socket_threads,
141                    configuration_clone,
142                )
143            })
144            .unwrap();
145
146        setup_end_barrier.wait();
147        //Return the channels the peer writes and reads from to the middleware
148        (peer_reader_send_channel, peer_receive_channel)
149    }
150}
151
152#[allow(non_snake_case)]
153impl TCB for VV {
154    /**
155     * Type of the return from a send call, which is an empty value or an error.
156     */
157    type SendCallReturn = Result<(), SendError<ClientPeerMiddleware>>;
158
159    /**
160     * Creates a new middleware instance. This function only returns after the middleware
161     * has a connection to every other peer in both directions.
162     *
163     * # Arguments
164     *
165     * `local_id` - Peer's globally unique id in the group.
166     *
167     * `local_port` - Port where the middleware will be listening for connections.
168     *
169     * `peer_addresses` - Addresses the middleware will connect to.
170     *
171     * `configuration` - Middleware's configuration file.
172     */
173    fn new(
174        local_id: usize,
175        local_port: usize,
176        peer_addresses: Vec<String>,
177        configuration: Configuration,
178    ) -> Self {
179        let configuration = Arc::new(configuration);
180        let client_number = peer_addresses.len() + 1;
181
182        let (middleware_channel, receive_channel) =
183            Self::start_service(local_id, local_port, peer_addresses, configuration);
184
185        //Initializing the version vector
186        let V = VersionVector::new(client_number);
187
188        VV {
189            receive_channel,
190            middleware_channel,
191            message_id: 0,
192            V,
193            local_id,
194        }
195    }
196
197    /**
198     * Broadcasts a message to every peer in the group.
199     * Returns the sent message context if successfull.
200     *
201     * # Arguments
202     *
203     * `message` - Serialized message to be broadcast
204     */
205    fn send(&mut self, message: Vec<u8>) -> Self::SendCallReturn {
206        self.message_id += 1;
207        self.V[self.local_id] = self.message_id;
208
209        let msg = ClientPeerMiddleware::CLIENT {
210            msg_id: self.message_id,
211            payload: message,
212            version_vector: self.V.clone(),
213        };
214
215        self.middleware_channel.send(msg)?;
216
217        Ok(())
218    }
219
220    /**
221     * Signals and waits for the middleware to terminate.
222     */
223    fn end(&self) {
224        let end_message = ClientPeerMiddleware::END;
225        self.middleware_channel.send(end_message).unwrap();
226
227        loop {
228            match self.receive_channel.recv() {
229                Ok(msg) => match msg {
230                    MiddlewareClient::SETUP => {
231                        break;
232                    }
233                    _ => {}
234                },
235                Err(_) => {}
236            }
237        }
238    }
239
240    /**
241     * Delivers a message from the middleware. Blocks the calling thread
242     * until a message is delivered or the channel to the middleware is
243     * empty or disconnected.
244     */
245    fn recv(&mut self) -> Result<GenericReturn, RecvError> {
246        match self.receive_channel.recv() {
247            Ok(msg) => Ok(self.handle_delivery(msg)),
248            Err(e) => Err(e),
249        }
250    }
251
252    /**
253     * Attempts to deliver a message from the middleware without blocking
254     * the caller thread. Either a message is immeadiately delivered
255     * from the channel or an error is returned if the channel is empty.
256     */
257    fn try_recv(&mut self) -> Result<GenericReturn, TryRecvError> {
258        match self.receive_channel.try_recv() {
259            Ok(msg) => Ok(self.handle_delivery(msg)),
260            Err(e) => Err(e),
261        }
262    }
263
264    /**
265     * Waits for a message to be delivered from the middleware for a
266     * limited time. If the channel is empty and not disconnected, the
267     * caller thread is blocked until a message is received in the channel
268     * or the timeout ends. If there are no messages until the timeout ends or
269     * the channel becomes disconnected, an error is returned.
270     *
271     * # Arguments
272     *
273     * `duration` - Timeout duration
274     */
275    fn recv_timeout(&mut self, duration: Duration) -> Result<GenericReturn, RecvTimeoutError> {
276        match self.receive_channel.recv_timeout(duration) {
277            Ok(msg) => Ok(self.handle_delivery(msg)),
278            Err(e) => Err(e),
279        }
280    }
281
282    /**
283     * ACKS a stable message, but is not necessary to call in the VV approach.
284     *
285     * * # Arguments
286     *
287     * `id` - Stable dot id field
288     *
289     * `counter` - Stable dot counter field
290     */
291    fn tcbstable(&mut self, _: usize, _: usize) {
292        //Not implemented for VV
293    }
294}