Skip to main content

rustypipes/
client.rs

1//! ## Client
2//!
3//! `client` is the module which takes care of managin the OctopipesClient struct and
4//! then all the functions useful for the user to interface with an Octopipes Server
5
6//
7//   RustyPipes
8//   Developed by Christian Visintin
9//
10// MIT License
11// Copyright (c) 2019-2020 Christian Visintin
12// Permission is hereby granted, free of charge, to any person obtaining a copy
13// of this software and associated documentation files (the "Software"), to deal
14// in the Software without restriction, including without limitation the rights
15// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16// copies of the Software, and to permit persons to whom the Software is
17// furnished to do so, subject to the following conditions:
18// The above copyright notice and this permission notice shall be included in all
19// copies or substantial portions of the Software.
20// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26// SOFTWARE.
27//
28
29use super::OctopipesCapError;
30use super::OctopipesCapMessage;
31use super::OctopipesClient;
32use super::OctopipesError;
33use super::OctopipesMessage;
34use super::OctopipesOptions;
35use super::OctopipesProtocolVersion;
36use super::OctopipesState;
37
38use std::sync::{mpsc, Arc, Mutex};
39use std::thread;
40
41use super::cap;
42use super::pipes;
43use super::serializer;
44
45impl OctopipesClient {
46    /// ### OctopipesClient Constructor
47    ///
48    /// `new` is constructor for OctopipesClient. OctopipesClient is actually a wrapper for a Client
49    pub fn new(
50        client_id: String,
51        cap_pipe: String,
52        version: OctopipesProtocolVersion,
53    ) -> OctopipesClient {
54        OctopipesClient {
55            id: client_id,
56            version: version,
57            cap_pipe: cap_pipe,
58            tx_pipe: None,
59            rx_pipe: None,
60            state: Arc::new(Mutex::new(OctopipesState::Initialized)),
61            client_loop: None,
62            client_receiver: None,
63            on_received_fn: None,
64            on_sent_fn: None,
65            on_subscribed_fn: None,
66            on_unsubscribed_fn: None,
67        }
68    }
69
70    //Thread operations
71
72    /// ###  loop_start
73    ///
74    /// `loop_start` starts the client loop thread which checks if new messages are available
75    pub fn loop_start(&mut self) -> Result<(), OctopipesError> {
76        let mut client_state = self.state.lock().unwrap();
77        match *client_state {
78            OctopipesState::Subscribed => {
79                //Create threaded client
80                if self.rx_pipe.is_none() || self.tx_pipe.is_none() {
81                    return Err(OctopipesError::Uninitialized);
82                }
83                //Set state to running
84                *client_state = OctopipesState::Running;
85                let this_state_rc = Arc::clone(&self.state);
86                let rx_pipe: String = self.rx_pipe.as_ref().unwrap().clone();
87                let tx_pipe: String = self.tx_pipe.as_ref().unwrap().clone();
88                let version: OctopipesProtocolVersion = self.version;
89                let client_id: String = self.id.clone();
90                let (client_sender, client_receiver) = mpsc::channel();
91                self.client_receiver = Some(client_receiver);
92                self.client_loop = Some(thread::spawn(move || {
93                    let mut terminate_thread: bool = false;
94                    while !terminate_thread {
95                        {
96                            let current_state = this_state_rc.lock().unwrap();
97                            if *current_state != OctopipesState::Running {
98                                terminate_thread = true;
99                            }
100                        }
101                        //Try to read (Read for 500 ms and sleep for 100ms)
102                        match pipes::pipe_read(&rx_pipe, 500) {
103                            Ok(data) => {
104                                match data {
105                                    None => {
106                                        thread::sleep(std::time::Duration::from_millis(100));
107                                        continue; //Just go on
108                                    },
109                                    Some(data) => {
110                                        //Otherwise parse message and send to callback
111                                        match serializer::decode_message(data) {
112                                            Ok(message) => {
113                                                //If message has ACK, send ACK back
114                                                if message.options.intersects(OctopipesOptions::RCK) {
115                                                    //if RCK is set, send ACK back
116                                                    let message_origin: Option<String> =
117                                                        match message.origin.as_ref() {
118                                                            Some(origin) => Some(origin.clone()),
119                                                            None => None,
120                                                        };
121                                                    //Prepare message
122                                                    let mut message: OctopipesMessage =
123                                                        OctopipesMessage::new(
124                                                            &version,
125                                                            &Some(client_id.clone()),
126                                                            &message_origin,
127                                                            message.ttl,
128                                                            OctopipesOptions::ACK,
129                                                            vec![],
130                                                        );
131                                                    //Encode message
132                                                    match serializer::encode_message(&mut message) {
133                                                        Ok(data_out) => {
134                                                            //Write message to CAP
135                                                            let _ =
136                                                                pipes::pipe_write(&tx_pipe, 5000, data_out);
137                                                        }
138                                                        Err(..) => { /*Ignore error*/ }
139                                                    }
140                                                }
141                                                //Send message
142                                                if let Err(_) = client_sender.send(Ok(message)) {
143                                                    break; //Terminate thread
144                                                }
145                                            }
146                                            Err(err) => {
147                                                if let Err(_) = client_sender.send(Err(err)) {
148                                                    break; //Terminate thread
149                                                }
150                                            }
151                                        }
152                                    }
153                                }
154                            }
155                            Err(_) => {
156                                if let Err(_) = client_sender.send(Err(OctopipesError::ReadFailed))
157                                {
158                                    break; //Terminate thread
159                                }
160                            }
161                        }
162                    }
163                    thread::sleep(std::time::Duration::from_millis(100));
164                    //Exit
165                }));
166                Ok(())
167            }
168            OctopipesState::Running => Err(OctopipesError::ThreadAlreadyRunning),
169            _ => Err(OctopipesError::NotSubscribed),
170        }
171    }
172
173    /// ###  loop_stop
174    ///
175    /// `loop_stop` stops the client loop thread
176    pub fn loop_stop(&mut self) -> Result<(), OctopipesError> {
177        let mut client_state = self.state.lock().unwrap();
178        match *client_state {
179            OctopipesState::Running => {
180                //Stop thread
181                *client_state = OctopipesState::Stopped;
182                drop(client_state); //Otherwise the other thread will never read the state
183                
184                //Take joinable out of Option and then Join thread (NOTE: Using take prevents errors!)
185                self.client_loop.take().map(thread::JoinHandle::join);
186                Ok(())
187            }
188            _ => Ok(()),
189        }
190    }
191
192    //subscription functions
193
194    /// ###  subscribe
195    ///
196    /// `subscribe` subscribe to Octopipes server; the client will subscribe to the groups described in the subscription_list
197
198    pub fn subscribe(
199        &mut self,
200        subscription_list: &Vec<String>,
201    ) -> Result<OctopipesCapError, OctopipesError> {
202        //Prepare subscribe message
203        let payload: Vec<u8> = cap::encode_subscription(subscription_list);
204        //Send message through the CAP
205        match self.send_cap(payload) {
206            Err(err) => Err(err),
207            Ok(..) => {
208                //Wait for ASSIGNMENT
209                match pipes::pipe_read(&self.cap_pipe, 5000) {
210                    Err(..) => Err(OctopipesError::ReadFailed),
211                    Ok(data_in) => {
212                        match data_in {
213                            None => return Err(OctopipesError::NoDataAvailable),
214                            Some(data_in) => {
215                                //Parse message
216                                match serializer::decode_message(data_in) {
217                                    Err(err) => Err(err),
218                                    Ok(response) => {
219                                        //Check if message type is ASSIGNMENT
220                                        match cap::get_cap_message_type(&response.data) {
221                                            Ok(message_type) => {
222                                                match message_type {
223                                                    OctopipesCapMessage::Assignment => {
224                                                        //Ok, is an ASSIGNMENT
225                                                        //Parse assignment params
226                                                        match cap::decode_assignment(&response.data) {
227                                                            Ok((cap_error, pipe_tx, pipe_rx)) => {
228                                                                //Assign params
229                                                                if cap_error != OctopipesCapError::NoError {
230                                                                    return Ok(cap_error);
231                                                                }
232                                                                self.tx_pipe = pipe_tx;
233                                                                self.rx_pipe = pipe_rx;
234                                                                let mut client_state =
235                                                                    self.state.lock().unwrap();
236                                                                *client_state = OctopipesState::Subscribed;
237                                                                Ok(OctopipesCapError::NoError)
238                                                            }
239                                                            Err(err) => Err(err),
240                                                        }
241                                                    }
242                                                    _ => Err(OctopipesError::BadPacket),
243                                                }
244                                            }
245                                            Err(err) => Err(err),
246                                        }
247                                    }
248                                }
249                            }
250                        }
251                    }
252                }
253            }
254        }
255    }
256
257    /// ###  unsubscribe
258    ///
259    /// `unsubscribe` unsubscribe from Octopipes server; if thread is running it will be stopped
260
261    pub fn unsubscribe(&mut self) -> Result<(), OctopipesError> {
262        {
263            let client_state = self.state.lock().unwrap();
264            if *client_state != OctopipesState::Subscribed
265                && *client_state != OctopipesState::Running
266            {
267                return Err(OctopipesError::NotSubscribed);
268            }
269        }
270        //Prepare message
271        let payload: Vec<u8> = cap::encode_unsubscription();
272        match self.send_cap(payload) {
273            Err(err) => return Err(err),
274            Ok(..) => {}
275        }
276        //Stop loop
277        match self.loop_stop() {
278            Ok(..) => {}
279            Err(err) => return Err(err),
280        }
281        //Call on unsubscribed
282        match self.on_unsubscribed_fn {
283            Some(on_unsub) => {
284                (on_unsub)();
285            }
286            None => {}
287        }
288        //Set state to UNSUBSCRIBED
289        let mut client_state = self.state.lock().unwrap();
290        *client_state = OctopipesState::Unsubscribed;
291        Ok(())
292    }
293
294    //Send message functions
295
296    /// ###  send_cap
297    ///
298    /// `send_cap` sends a message to server through the CAP
299
300    fn send_cap(&self, payload: Vec<u8>) -> Result<(), OctopipesError> {
301        //Prepare message
302        let mut message: OctopipesMessage = OctopipesMessage::new(
303            &self.version,
304            &Some(self.id.clone()),
305            &None,
306            60,
307            OctopipesOptions::empty(),
308            payload,
309        );
310        //Encode message
311        match serializer::encode_message(&mut message) {
312            Ok(data_out) => {
313                //Write message to cap
314                match pipes::pipe_write(&self.cap_pipe, 5000, data_out) {
315                    Ok(..) => Ok(()),
316                    Err(..) => Err(OctopipesError::WriteFailed),
317                }
318            }
319            Err(err) => Err(err),
320        }
321    }
322
323    /// ###  send
324    ///
325    /// `send` sends a message to a certain remote
326
327    pub fn send(&self, remote: &String, data: Vec<u8>) -> Result<(), OctopipesError> {
328        self.send_ex(remote, data, 0, OctopipesOptions::empty())
329    }
330
331    /// ###  send_ex
332    ///
333    /// `send_ex` sends a message to a certain remote with extended options
334
335    pub fn send_ex(
336        &self,
337        remote: &String,
338        data: Vec<u8>,
339        ttl: u8,
340        options: OctopipesOptions,
341    ) -> Result<(), OctopipesError> {
342        {
343            let client_state = self.state.lock().unwrap();
344            if *client_state != OctopipesState::Running
345                && *client_state != OctopipesState::Subscribed
346            {
347                return Err(OctopipesError::NotSubscribed);
348            }
349            if self.tx_pipe.is_none() {
350                return Err(OctopipesError::NotSubscribed);
351            }
352        }
353        //Prepare message
354        let mut message: OctopipesMessage = OctopipesMessage::new(
355            &self.version,
356            &Some(self.id.clone()),
357            &Some(remote.clone()),
358            ttl,
359            options,
360            data,
361        );
362        //Encode message
363        match serializer::encode_message(&mut message) {
364            Ok(data_out) => {
365                //Write message to cap
366                match pipes::pipe_write(&self.tx_pipe.as_ref().unwrap(), 5000, data_out) {
367                    Ok(..) => {
368                        //If on sent callback is set, call on sent
369                        {
370                            if self.on_sent_fn.as_ref().is_some() {
371                                (self.on_sent_fn.as_ref().unwrap())(&message);
372                            }
373                        }
374                        Ok(())
375                    }
376                    Err(..) => Err(OctopipesError::WriteFailed),
377                }
378            }
379            Err(err) => Err(err),
380        }
381    }
382
383    //@! Message readers
384    /// ###  get_next_message
385    ///
386    /// `get_next_message` Gets the next available message on the receiver
387    /// If a message is available Ok(message) is returned
388    /// If no message is available Ok(None) is returned
389    /// If there was an error while reading inbox, Err(OctopipesError) is returned
390    pub fn get_next_message(&self) -> Result<Option<OctopipesMessage>, OctopipesError> {
391        {
392            //Check if thread is running
393            let current_state = self.state.lock().unwrap();
394            if *current_state != OctopipesState::Running {
395                return Err(OctopipesError::Uninitialized);
396            }
397        }
398        //Try receive
399        match self.client_receiver.as_ref() {
400            None => Err(OctopipesError::Uninitialized),
401            Some(receiver) => match receiver.try_recv() {
402                Ok(payload) => match payload {
403                    Ok(message) => Ok(Some(message)),
404                    Err(error) => Err(error),
405                },
406                Err(error) => match error {
407                    mpsc::TryRecvError::Empty => Ok(None),
408                    _ => Err(OctopipesError::ThreadError),
409                },
410            },
411        }
412    }
413
414    /// ###  get_all_message
415    ///
416    /// `get_all_message` Gets all the available messages on the receiver
417    /// If there was no error while reading the inbox a vector with all the messages is returned (could have length 0)
418    /// If there was an error while reading inbox, Err(OctopipesError) is returned
419    pub fn get_all_message(&self) -> Result<Vec<OctopipesMessage>, OctopipesError> {
420        let mut inbox: Vec<OctopipesMessage> = Vec::new();
421        loop {
422            match self.get_next_message() {
423                Err(error) => return Err(error),
424                Ok(ret) => match ret {
425                    Some(message) => inbox.push(message),
426                    None => break,
427                },
428            }
429        }
430        Ok(inbox)
431    }
432
433    //Callbacks setters
434
435    /// ###  set_on_received_callback
436    ///
437    /// `set_on_received_callback` sets the function to call on message received
438    pub fn set_on_received_callback(
439        &mut self,
440        callback: fn(Result<&OctopipesMessage, &OctopipesError>),
441    ) {
442        self.on_received_fn = Some(callback);
443    }
444
445    /// ###  set_on_sent_callbacl
446    ///
447    /// `set_on_sent_callbacl` sets the function to call when a message is sent
448    pub fn set_on_sent_callback(&mut self, callback: fn(&OctopipesMessage)) {
449        self.on_sent_fn = Some(callback);
450    }
451
452    /// ###  set_on_subscribed
453    ///
454    /// `set_on_subscribed` sets the function to call on a successful subscription to the Octopipes Server
455    pub fn set_on_subscribed(&mut self, callback: fn()) {
456        self.on_subscribed_fn = Some(callback);
457    }
458
459    /// ###  set_on_unsubscribed
460    ///
461    /// `set_on_unsubscribed` sets the function to call on a successful unsubscription from Octopipes server
462    pub fn set_on_unsubscribed(&mut self, callback: fn()) {
463        self.on_unsubscribed_fn = Some(callback);
464    }
465}
466
467impl Drop for OctopipesClient {
468    fn drop(&mut self) {
469        //Stop thread
470        match self.loop_stop() {
471            Ok(_) => drop(self),
472            Err(error) => panic!(error), //Don't worry, it won't panic
473        }
474    }
475}