1use crate::failure_detector::FailureDetector;
2use crate::reliable_delivery::ReliableDelivery;
3use crate::types::{ComponentChannels, ComponentTypes, Message, MessageType, NetComponent};
4use crate::uniform_reliable_delivery::UniformReliableDelivery;
5use colored::Colorize;
6use flume::Sender;
7use local_ip_address::local_ip;
8use pub_sub::PubSub;
9use serde_json;
10extern crate pub_sub;
11
12use std::borrow::BorrowMut;
13use std::collections::HashMap;
14use std::net::TcpListener;
15use std::net::{self, TcpStream};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::thread::JoinHandle;
19use std::time::Duration;
20
21pub struct Node {
26 name: String,
27 pub addr: String,
28 peers: Vec<String>,
29 peers_threashold: usize,
30 listner: TcpListener,
31 ready: bool,
32 pub message_broker: Arc<Mutex<HashMap<MessageType, Vec<Sender<Message>>>>>,
33 pub publishers: HashMap<ComponentTypes, PubSub<Message>>, pub components: HashMap<ComponentTypes, Option<Box<dyn NetComponent>>>,
35 pub user_callbacks: Arc<Mutex<HashMap<MessageType, Vec<Box<dyn Fn(&Message) + Send>>>>>,
36}
37
38impl Node {
39 pub fn new(name: String, port: usize, peers_threashold: usize) -> Node {
40 let addr = format!("{}:{}", local_ip().unwrap().to_string(), port,);
41 println!("{}: addr is at :{}", "FROM NODE".blue(), addr);
42
43 return Node {
44 name,
45 addr: addr.clone(),
46 peers: Vec::with_capacity(peers_threashold),
47 peers_threashold,
48 listner: net::TcpListener::bind(addr.to_string()).unwrap(),
49 ready: false,
50 message_broker: Arc::new(Mutex::new(HashMap::new())),
51 publishers: HashMap::with_capacity(peers_threashold),
52 components: HashMap::new(),
53 user_callbacks: Arc::new(Mutex::new(HashMap::new())),
54 };
55 }
56
57 pub fn start(&mut self) -> JoinHandle<()> {
58 for component in self.components.iter_mut() {
59 let cfg = component.1;
60 match cfg.borrow_mut() {
61 Some(cmp) => {
62 cmp.start();
63 }
64 None => {}
65 }
66 }
67 let listner = self.listner.try_clone().expect("Failled cloning Listner");
68 let addr = self.addr.clone();
69 let user_callbacks = self.user_callbacks.clone();
70 let broker = self.message_broker.clone();
71 let handle = thread::spawn(move || {
72 handle_requests(listner, addr, user_callbacks, broker);
73 });
74 self.ready = true;
75 return handle;
76 }
77
78 pub fn add_component(
79 &mut self,
80 mut component: Box<dyn NetComponent>,
81 cmp_type: ComponentTypes,
82 messages: Vec<MessageType>,
83 target_componets: Vec<ComponentTypes>,
84 ) {
85 let (_tx, _rc) = flume::unbounded();
86 let pub_sub_channel: PubSub<Message> = pub_sub::PubSub::new();
87 self.publishers
88 .insert(cmp_type.clone(), pub_sub_channel.clone());
89 let mut component_channels = ComponentChannels {
90 subscriptions: Vec::new(),
91 rc: _rc.clone(),
92 publisher: pub_sub_channel.clone(),
93 };
94
95 let mut message_broker = self.message_broker.lock().unwrap();
96 for msg_type in messages.iter() {
97 message_broker.entry(msg_type.clone()).or_insert(Vec::new());
98 message_broker.get_mut(msg_type).unwrap().push(_tx.clone());
99 }
100 for target_cmp in target_componets.iter() {
102 component_channels
103 .subscriptions
104 .push(self.publishers.get(target_cmp).unwrap().subscribe());
105 }
106 component.add_component_channels(component_channels);
107 self.components.insert(cmp_type, Some(component));
108 }
109
110 pub fn has_failure_detector(&mut self, timeout: Duration, delay: Duration) -> &mut Node {
111 let mut peers = Vec::new();
112 for peer in self.peers.iter() {
113 if *peer != self.addr {
114 peers.push(peer.clone());
115 }
116 }
117 let fd = FailureDetector::new(
118 self.peers_threashold,
119 timeout,
120 delay,
121 self.addr.clone(),
122 peers,
123 );
124 let messages = vec![MessageType::HeartBeat, MessageType::RequestHeartBeat];
125 self.add_component(
126 Box::new(fd),
127 ComponentTypes::FaillureDetector,
128 messages,
129 Vec::new(),
130 );
131 return self;
132 }
133
134 pub fn has_reliable_delivery(&mut self) -> &mut Node {
135 let rb = ReliableDelivery::new(self.peers.clone());
136 if self
137 .components
138 .contains_key(&ComponentTypes::FaillureDetector)
139 {
140 let messages = vec![MessageType::ReliableDelivery];
141 let target_components = vec![ComponentTypes::FaillureDetector];
142 self.add_component(
143 Box::new(rb),
144 ComponentTypes::ReliableDelivery,
145 messages,
146 target_components,
147 );
148 return self;
149 } else {
150 panic!("Reliable delivery requires Faillure detector consider adding has_faillure_detector method");
151 }
152 }
153
154 pub fn has_uniform_reliable_delivery(&mut self) -> &mut Node {
155 let urb = UniformReliableDelivery::new(self.addr.clone(), self.peers.clone());
156 if self
157 .components
158 .contains_key(&ComponentTypes::FaillureDetector)
159 {
160 let messages = vec![
161 MessageType::UniformReliableDelivery,
162 MessageType::AckDelivery,
163 ];
164 let target_components = vec![ComponentTypes::FaillureDetector];
165 self.add_component(
166 Box::new(urb),
167 ComponentTypes::UniformReliableDelivery,
168 messages,
169 target_components,
170 );
171 return self;
172 } else {
173 panic!("Reliable delivery requires Faillure detector consider adding has_faillure_detector method");
174 }
175 }
176
177 pub fn send(&self, value: &Message, addr: String) {
178 println!("sending request");
179 let stream = TcpStream::connect(addr).expect("From Node: error connecting to Peer");
180 serde_json::to_writer(stream, value).expect("From Node:failed to push value into stream");
181 println!("From Client: Message sent");
182 }
183 pub fn add_peer(&mut self, peer: String) -> &mut Node {
184 self.peers.push(peer);
185 return self;
186 }
187
188 pub fn add_peers(&mut self, peers: Vec<String>) -> &mut Node {
189 self.peers = peers;
190 return self;
191 }
192
193 pub fn broadcast(&mut self, value: &Message) {
194 for peer in self.peers.iter() {
195 let result = TcpStream::connect(peer.clone());
196 match result {
197 Ok(stream) => {
198 serde_json::to_writer(stream, value)
199 .expect("FROM Server:failed to push value into stream");
200 }
201 Err(e) => {
202 println!("Failled Brodcasting Message");
203 println!("{}", e);
204 }
205 }
206 }
207 }
208
209 pub fn on_receive_message(
210 &mut self,
211 msg: MessageType,
212 callback: Box<dyn Fn(&Message) + Send>,
213 ) -> &mut Node {
214 self.user_callbacks
215 .lock()
216 .unwrap()
217 .entry(msg.clone())
218 .or_insert(Vec::new());
219 self.user_callbacks
220 .lock()
221 .unwrap()
222 .get_mut(&msg)
223 .unwrap()
224 .push(callback);
225 return self;
226 }
227}
228pub fn handle_connection(stream: net::TcpStream) -> Message {
230 let result: Message =
231 serde_json::from_reader(stream).expect("FROM Node: Failed deserializing the Message");
232 return result;
233}
234
235pub fn handle_requests(
236 listner: TcpListener,
237 source: String,
238 _callbacks: Arc<Mutex<HashMap<MessageType, Vec<Box<dyn Fn(&Message) + Send>>>>>,
239 message_broker: Arc<Mutex<HashMap<MessageType, Vec<Sender<Message>>>>>,
240) {
241 println!(
242 "{}: server started succesfully and ready to receive peer requests",
243 "FROM NODE".blue()
244 );
245
246 for stream in listner.incoming() {
247 let stream = stream.expect("From Server:failed connecting to client request");
248 let user_callbacks = _callbacks.clone();
253 let broker = message_broker.clone();
254 thread::spawn(move || {
255 let msg = handle_connection(stream);
256 println!(
257 "{}:Received {:?} from node:{}",
258 "From Node".blue(),
259 msg.message_type,
260 msg.sender
261 );
262 if user_callbacks
263 .lock()
264 .unwrap()
265 .contains_key(&msg.message_type)
266 {
267 for callback in user_callbacks
268 .lock()
269 .unwrap()
270 .get(&msg.message_type)
271 .unwrap()
272 .iter()
273 {
274 callback(&msg);
275 }
276 }
277 for component in broker
278 .lock()
279 .unwrap()
280 .get(&msg.message_type)
281 .unwrap()
282 .iter()
283 {
284 component
285 .send(msg.clone())
286 .expect("failled forwarding message to apporpriate component");
287 }
288 });
289 }
290}