rust_net/
node.rs

1use crate::failure_detector::FailureDetector;
2use crate::reliable_delivery::ReliableDelivery;
3use crate::types::{ComponentChannels, ComponentTypes, Message, MessageType, NetComponent};
4use crate::uniform_reliable_delivery::UniformReliableDelivery;
5use colored::Colorize;
6use flume::Sender;
7use local_ip_address::local_ip;
8use pub_sub::PubSub;
9use serde_json;
10extern crate pub_sub;
11
12use std::borrow::BorrowMut;
13use std::collections::HashMap;
14use std::net::TcpListener;
15use std::net::{self, TcpStream};
16use std::sync::{Arc, Mutex};
17use std::thread;
18use std::thread::JoinHandle;
19use std::time::Duration;
20
21/// Acts as a mediator between Components
22/// All communications is done via channels
23/// Node will automatically choose the right component to do the right Task in Runtime
24/// Extern Messages are delivered to componnents via flume channels and internal Messages are delivered via pub_sub
25pub struct Node {
26    name: String,
27    pub addr: String,
28    peers: Vec<String>,
29    peers_threashold: usize,
30    listner: TcpListener,
31    ready: bool,
32    pub message_broker: Arc<Mutex<HashMap<MessageType, Vec<Sender<Message>>>>>,
33    pub publishers: HashMap<ComponentTypes, PubSub<Message>>, // used only for cloning to newer components
34    pub components: HashMap<ComponentTypes, Option<Box<dyn NetComponent>>>,
35    pub user_callbacks: Arc<Mutex<HashMap<MessageType, Vec<Box<dyn Fn(&Message) + Send>>>>>,
36}
37
38impl Node {
39    pub fn new(name: String, port: usize, peers_threashold: usize) -> Node {
40        let addr = format!("{}:{}", local_ip().unwrap().to_string(), port,);
41        println!("{}: addr is at :{}", "FROM NODE".blue(), addr);
42
43        return Node {
44            name,
45            addr: addr.clone(),
46            peers: Vec::with_capacity(peers_threashold),
47            peers_threashold,
48            listner: net::TcpListener::bind(addr.to_string()).unwrap(),
49            ready: false,
50            message_broker: Arc::new(Mutex::new(HashMap::new())),
51            publishers: HashMap::with_capacity(peers_threashold),
52            components: HashMap::new(),
53            user_callbacks: Arc::new(Mutex::new(HashMap::new())),
54        };
55    }
56
57    pub fn start(&mut self) -> JoinHandle<()> {
58        for component in self.components.iter_mut() {
59            let cfg = component.1;
60            match cfg.borrow_mut() {
61                Some(cmp) => {
62                    cmp.start();
63                }
64                None => {}
65            }
66        }
67        let listner = self.listner.try_clone().expect("Failled cloning Listner");
68        let addr = self.addr.clone();
69        let user_callbacks = self.user_callbacks.clone();
70        let broker = self.message_broker.clone();
71        let handle = thread::spawn(move || {
72            handle_requests(listner, addr, user_callbacks, broker);
73        });
74        self.ready = true;
75        return handle;
76    }
77
78    pub fn add_component(
79        &mut self,
80        mut component: Box<dyn NetComponent>,
81        cmp_type: ComponentTypes,
82        messages: Vec<MessageType>,
83        target_componets: Vec<ComponentTypes>,
84    ) {
85        let (_tx, _rc) = flume::unbounded();
86        let pub_sub_channel: PubSub<Message> = pub_sub::PubSub::new();
87        self.publishers
88            .insert(cmp_type.clone(), pub_sub_channel.clone());
89        let mut component_channels = ComponentChannels {
90            subscriptions: Vec::new(),
91            rc: _rc.clone(),
92            publisher: pub_sub_channel.clone(),
93        };
94
95        let mut message_broker = self.message_broker.lock().unwrap();
96        for msg_type in messages.iter() {
97            message_broker.entry(msg_type.clone()).or_insert(Vec::new());
98            message_broker.get_mut(msg_type).unwrap().push(_tx.clone());
99        }
100        // target components for which you want to subscribe messages to
101        for target_cmp in target_componets.iter() {
102            component_channels
103                .subscriptions
104                .push(self.publishers.get(target_cmp).unwrap().subscribe());
105        }
106        component.add_component_channels(component_channels);
107        self.components.insert(cmp_type, Some(component));
108    }
109
110    pub fn has_failure_detector(&mut self, timeout: Duration, delay: Duration) -> &mut Node {
111        let mut peers = Vec::new();
112        for peer in self.peers.iter() {
113            if *peer != self.addr {
114                peers.push(peer.clone());
115            }
116        }
117        let fd = FailureDetector::new(
118            self.peers_threashold,
119            timeout,
120            delay,
121            self.addr.clone(),
122            peers,
123        );
124        let messages = vec![MessageType::HeartBeat, MessageType::RequestHeartBeat];
125        self.add_component(
126            Box::new(fd),
127            ComponentTypes::FaillureDetector,
128            messages,
129            Vec::new(),
130        );
131        return self;
132    }
133
134    pub fn has_reliable_delivery(&mut self) -> &mut Node {
135        let rb = ReliableDelivery::new(self.peers.clone());
136        if self
137            .components
138            .contains_key(&ComponentTypes::FaillureDetector)
139        {
140            let messages = vec![MessageType::ReliableDelivery];
141            let target_components = vec![ComponentTypes::FaillureDetector];
142            self.add_component(
143                Box::new(rb),
144                ComponentTypes::ReliableDelivery,
145                messages,
146                target_components,
147            );
148            return self;
149        } else {
150            panic!("Reliable delivery requires Faillure detector consider adding has_faillure_detector method");
151        }
152    }
153
154    pub fn has_uniform_reliable_delivery(&mut self) -> &mut Node {
155        let urb = UniformReliableDelivery::new(self.addr.clone(), self.peers.clone());
156        if self
157            .components
158            .contains_key(&ComponentTypes::FaillureDetector)
159        {
160            let messages = vec![
161                MessageType::UniformReliableDelivery,
162                MessageType::AckDelivery,
163            ];
164            let target_components = vec![ComponentTypes::FaillureDetector];
165            self.add_component(
166                Box::new(urb),
167                ComponentTypes::UniformReliableDelivery,
168                messages,
169                target_components,
170            );
171            return self;
172        } else {
173            panic!("Reliable delivery requires Faillure detector consider adding has_faillure_detector method");
174        }
175    }
176
177    pub fn send(&self, value: &Message, addr: String) {
178        println!("sending request");
179        let stream = TcpStream::connect(addr).expect("From Node: error connecting to Peer");
180        serde_json::to_writer(stream, value).expect("From Node:failed to push value into stream");
181        println!("From Client: Message sent");
182    }
183    pub fn add_peer(&mut self, peer: String) -> &mut Node {
184        self.peers.push(peer);
185        return self;
186    }
187
188    pub fn add_peers(&mut self, peers: Vec<String>) -> &mut Node {
189        self.peers = peers;
190        return self;
191    }
192
193    pub fn broadcast(&mut self, value: &Message) {
194        for peer in self.peers.iter() {
195            let result = TcpStream::connect(peer.clone());
196            match result {
197                Ok(stream) => {
198                    serde_json::to_writer(stream, value)
199                        .expect("FROM Server:failed to push value into stream");
200                }
201                Err(e) => {
202                    println!("Failled Brodcasting Message");
203                    println!("{}", e);
204                }
205            }
206        }
207    }
208
209    pub fn on_receive_message(
210        &mut self,
211        msg: MessageType,
212        callback: Box<dyn Fn(&Message) + Send>,
213    ) -> &mut Node {
214        self.user_callbacks
215            .lock()
216            .unwrap()
217            .entry(msg.clone())
218            .or_insert(Vec::new());
219        self.user_callbacks
220            .lock()
221            .unwrap()
222            .get_mut(&msg)
223            .unwrap()
224            .push(callback);
225        return self;
226    }
227}
228//deserilize the message and treat each case diffrently
229pub fn handle_connection(stream: net::TcpStream) -> Message {
230    let result: Message =
231        serde_json::from_reader(stream).expect("FROM Node: Failed deserializing the Message");
232    return result;
233}
234
235pub fn handle_requests(
236    listner: TcpListener,
237    source: String,
238    _callbacks: Arc<Mutex<HashMap<MessageType, Vec<Box<dyn Fn(&Message) + Send>>>>>,
239    message_broker: Arc<Mutex<HashMap<MessageType, Vec<Sender<Message>>>>>,
240) {
241    println!(
242        "{}: server started succesfully and ready to receive peer requests",
243        "FROM NODE".blue()
244    );
245
246    for stream in listner.incoming() {
247        let stream = stream.expect("From Server:failed connecting to client request");
248        //let client_addr = stream.local_addr().unwrap().to_string(); // used for authentification
249        //match filter_request(client_addr)
250        //pass the handle_connection function to the thread pool
251        // need to optimize this
252        let user_callbacks = _callbacks.clone();
253        let broker = message_broker.clone();
254        thread::spawn(move || {
255            let msg = handle_connection(stream);
256            println!(
257                "{}:Received {:?} from node:{}",
258                "From Node".blue(),
259                msg.message_type,
260                msg.sender
261            );
262            if user_callbacks
263                .lock()
264                .unwrap()
265                .contains_key(&msg.message_type)
266            {
267                for callback in user_callbacks
268                    .lock()
269                    .unwrap()
270                    .get(&msg.message_type)
271                    .unwrap()
272                    .iter()
273                {
274                    callback(&msg);
275                }
276            }
277            for component in broker
278                .lock()
279                .unwrap()
280                .get(&msg.message_type)
281                .unwrap()
282                .iter()
283            {
284                component
285                    .send(msg.clone())
286                    .expect("failled forwarding message to apporpriate component");
287            }
288        });
289    }
290}