1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::{
collections::HashMap,
net::TcpStream,
sync::{Arc, Mutex},
thread::{self},
};
use colored::Colorize;
use flume::Receiver;
use crate::{
types::{
ComponentChannels,
MessageType::{self},
NetComponent,
},
Message,
};
pub struct ReliableDelivery {
peers: Arc<Mutex<HashMap<String, bool>>>,
pub component_channels: Option<ComponentChannels>,
pub delivered_messages: Arc<Mutex<HashMap<String, HashMap<i64, Message>>>>,
}
impl ReliableDelivery {
pub fn new(target: Vec<String>) -> ReliableDelivery {
let map: Arc<Mutex<HashMap<String, bool>>> =
Arc::new(Mutex::new(HashMap::with_capacity(target.len())));
let map2: Arc<Mutex<HashMap<String, HashMap<i64, Message>>>> =
Arc::new(Mutex::new(HashMap::with_capacity(target.len())));
for peer in &target {
map.lock().unwrap().insert(peer.clone(), true);
map2.lock().unwrap().insert(peer.clone(), HashMap::new());
}
ReliableDelivery {
peers: map,
component_channels: None,
delivered_messages: map2,
}
}
}
impl NetComponent for ReliableDelivery {
fn start(&mut self) {
let delivered_messages = self.delivered_messages.clone();
let receiver = self.component_channels.as_mut().unwrap().rc.clone();
thread::spawn(move || handle_requests(delivered_messages, receiver));
for subscirption in &self.component_channels.as_mut().unwrap().subscriptions {
let sub = subscirption.clone();
let peers = self.peers.clone();
let delivered_messages = self.delivered_messages.clone();
thread::spawn(move || loop {
let msg = sub
.recv()
.expect("FROM ReliableDelivery: Failled unwrapping msg from pub_sub");
match msg.message_type {
MessageType::FailledNode => {
println!("{}: received Failled Node Message and proceeding to redistribute its previous delivered_messages", "FROM RB".blue());
peers
.lock()
.expect("failled locking the peers map")
.insert(msg.source.clone(), false);
for map in delivered_messages
.lock()
.expect("failled locking node delivered_messages")
.get_mut(&msg.source.clone())
.iter_mut()
{
for message in map.iter_mut() {
for peer in peers.lock().unwrap().iter() {
if *peer.1 == true {
let stream = TcpStream::connect(peer.0).expect(
"From ReliableDelivery: error connecting to Peer",
);
serde_json::to_writer(stream, message.1)
.expect("From ReliableDelivery:failed to push value into stream");
}
}
}
}
}
_ => {}
}
});
}
}
fn add_component_channels(&mut self, cmp: ComponentChannels) {
self.component_channels = Some(cmp);
}
}
impl ReliableDelivery {
pub fn send(&self, msg: &Message) {
for peer in self.peers.lock().unwrap().iter_mut() {
let stream = TcpStream::connect(peer.0)
.expect("From BestEffortDelivery: error connecting to Peer");
serde_json::to_writer(stream, &msg)
.expect("From BestEffortDelivery:failed to push value into stream");
}
}
}
pub fn handle_requests(
delivered_messages: Arc<Mutex<HashMap<String, HashMap<i64, Message>>>>,
rc: Receiver<Message>,
) {
loop {
let msg = rc
.recv()
.expect("FROM Reliable Broadcast: Failled unwraping Message");
match msg.message_type {
MessageType::ReliableDelivery => {
println!("{}: Received a Message to be delivered", "FROM RB".blue());
if delivered_messages
.lock()
.unwrap()
.get_mut(&msg.source.clone())
.unwrap()
.contains_key(&msg.sn)
!= true
{
delivered_messages
.lock()
.unwrap()
.get_mut(&msg.source.clone())
.unwrap()
.insert(msg.sn, msg.clone());
println!(
"{}: Message with info:{} and SN:{} From node:{} delivered succesfully",
"FROM RB".green(),
msg.info,
msg.sn,
msg.source
);
} else {
println!("{}:Message Already exists", "FROM RB".green());
}
}
_ => {}
}
}
}