rustypipes/client.rs
1//! ## Client
2//!
3//! `client` is the module which takes care of managin the OctopipesClient struct and
4//! then all the functions useful for the user to interface with an Octopipes Server
5
6//
7// RustyPipes
8// Developed by Christian Visintin
9//
10// MIT License
11// Copyright (c) 2019-2020 Christian Visintin
12// Permission is hereby granted, free of charge, to any person obtaining a copy
13// of this software and associated documentation files (the "Software"), to deal
14// in the Software without restriction, including without limitation the rights
15// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16// copies of the Software, and to permit persons to whom the Software is
17// furnished to do so, subject to the following conditions:
18// The above copyright notice and this permission notice shall be included in all
19// copies or substantial portions of the Software.
20// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26// SOFTWARE.
27//
28
29use super::OctopipesCapError;
30use super::OctopipesCapMessage;
31use super::OctopipesClient;
32use super::OctopipesError;
33use super::OctopipesMessage;
34use super::OctopipesOptions;
35use super::OctopipesProtocolVersion;
36use super::OctopipesState;
37
38use std::sync::{mpsc, Arc, Mutex};
39use std::thread;
40
41use super::cap;
42use super::pipes;
43use super::serializer;
44
45impl OctopipesClient {
46 /// ### OctopipesClient Constructor
47 ///
48 /// `new` is constructor for OctopipesClient. OctopipesClient is actually a wrapper for a Client
49 pub fn new(
50 client_id: String,
51 cap_pipe: String,
52 version: OctopipesProtocolVersion,
53 ) -> OctopipesClient {
54 OctopipesClient {
55 id: client_id,
56 version: version,
57 cap_pipe: cap_pipe,
58 tx_pipe: None,
59 rx_pipe: None,
60 state: Arc::new(Mutex::new(OctopipesState::Initialized)),
61 client_loop: None,
62 client_receiver: None,
63 on_received_fn: None,
64 on_sent_fn: None,
65 on_subscribed_fn: None,
66 on_unsubscribed_fn: None,
67 }
68 }
69
70 //Thread operations
71
72 /// ### loop_start
73 ///
74 /// `loop_start` starts the client loop thread which checks if new messages are available
75 pub fn loop_start(&mut self) -> Result<(), OctopipesError> {
76 let mut client_state = self.state.lock().unwrap();
77 match *client_state {
78 OctopipesState::Subscribed => {
79 //Create threaded client
80 if self.rx_pipe.is_none() || self.tx_pipe.is_none() {
81 return Err(OctopipesError::Uninitialized);
82 }
83 //Set state to running
84 *client_state = OctopipesState::Running;
85 let this_state_rc = Arc::clone(&self.state);
86 let rx_pipe: String = self.rx_pipe.as_ref().unwrap().clone();
87 let tx_pipe: String = self.tx_pipe.as_ref().unwrap().clone();
88 let version: OctopipesProtocolVersion = self.version;
89 let client_id: String = self.id.clone();
90 let (client_sender, client_receiver) = mpsc::channel();
91 self.client_receiver = Some(client_receiver);
92 self.client_loop = Some(thread::spawn(move || {
93 let mut terminate_thread: bool = false;
94 while !terminate_thread {
95 {
96 let current_state = this_state_rc.lock().unwrap();
97 if *current_state != OctopipesState::Running {
98 terminate_thread = true;
99 }
100 }
101 //Try to read (Read for 500 ms and sleep for 100ms)
102 match pipes::pipe_read(&rx_pipe, 500) {
103 Ok(data) => {
104 match data {
105 None => {
106 thread::sleep(std::time::Duration::from_millis(100));
107 continue; //Just go on
108 },
109 Some(data) => {
110 //Otherwise parse message and send to callback
111 match serializer::decode_message(data) {
112 Ok(message) => {
113 //If message has ACK, send ACK back
114 if message.options.intersects(OctopipesOptions::RCK) {
115 //if RCK is set, send ACK back
116 let message_origin: Option<String> =
117 match message.origin.as_ref() {
118 Some(origin) => Some(origin.clone()),
119 None => None,
120 };
121 //Prepare message
122 let mut message: OctopipesMessage =
123 OctopipesMessage::new(
124 &version,
125 &Some(client_id.clone()),
126 &message_origin,
127 message.ttl,
128 OctopipesOptions::ACK,
129 vec![],
130 );
131 //Encode message
132 match serializer::encode_message(&mut message) {
133 Ok(data_out) => {
134 //Write message to CAP
135 let _ =
136 pipes::pipe_write(&tx_pipe, 5000, data_out);
137 }
138 Err(..) => { /*Ignore error*/ }
139 }
140 }
141 //Send message
142 if let Err(_) = client_sender.send(Ok(message)) {
143 break; //Terminate thread
144 }
145 }
146 Err(err) => {
147 if let Err(_) = client_sender.send(Err(err)) {
148 break; //Terminate thread
149 }
150 }
151 }
152 }
153 }
154 }
155 Err(_) => {
156 if let Err(_) = client_sender.send(Err(OctopipesError::ReadFailed))
157 {
158 break; //Terminate thread
159 }
160 }
161 }
162 }
163 thread::sleep(std::time::Duration::from_millis(100));
164 //Exit
165 }));
166 Ok(())
167 }
168 OctopipesState::Running => Err(OctopipesError::ThreadAlreadyRunning),
169 _ => Err(OctopipesError::NotSubscribed),
170 }
171 }
172
173 /// ### loop_stop
174 ///
175 /// `loop_stop` stops the client loop thread
176 pub fn loop_stop(&mut self) -> Result<(), OctopipesError> {
177 let mut client_state = self.state.lock().unwrap();
178 match *client_state {
179 OctopipesState::Running => {
180 //Stop thread
181 *client_state = OctopipesState::Stopped;
182 drop(client_state); //Otherwise the other thread will never read the state
183
184 //Take joinable out of Option and then Join thread (NOTE: Using take prevents errors!)
185 self.client_loop.take().map(thread::JoinHandle::join);
186 Ok(())
187 }
188 _ => Ok(()),
189 }
190 }
191
192 //subscription functions
193
194 /// ### subscribe
195 ///
196 /// `subscribe` subscribe to Octopipes server; the client will subscribe to the groups described in the subscription_list
197
198 pub fn subscribe(
199 &mut self,
200 subscription_list: &Vec<String>,
201 ) -> Result<OctopipesCapError, OctopipesError> {
202 //Prepare subscribe message
203 let payload: Vec<u8> = cap::encode_subscription(subscription_list);
204 //Send message through the CAP
205 match self.send_cap(payload) {
206 Err(err) => Err(err),
207 Ok(..) => {
208 //Wait for ASSIGNMENT
209 match pipes::pipe_read(&self.cap_pipe, 5000) {
210 Err(..) => Err(OctopipesError::ReadFailed),
211 Ok(data_in) => {
212 match data_in {
213 None => return Err(OctopipesError::NoDataAvailable),
214 Some(data_in) => {
215 //Parse message
216 match serializer::decode_message(data_in) {
217 Err(err) => Err(err),
218 Ok(response) => {
219 //Check if message type is ASSIGNMENT
220 match cap::get_cap_message_type(&response.data) {
221 Ok(message_type) => {
222 match message_type {
223 OctopipesCapMessage::Assignment => {
224 //Ok, is an ASSIGNMENT
225 //Parse assignment params
226 match cap::decode_assignment(&response.data) {
227 Ok((cap_error, pipe_tx, pipe_rx)) => {
228 //Assign params
229 if cap_error != OctopipesCapError::NoError {
230 return Ok(cap_error);
231 }
232 self.tx_pipe = pipe_tx;
233 self.rx_pipe = pipe_rx;
234 let mut client_state =
235 self.state.lock().unwrap();
236 *client_state = OctopipesState::Subscribed;
237 Ok(OctopipesCapError::NoError)
238 }
239 Err(err) => Err(err),
240 }
241 }
242 _ => Err(OctopipesError::BadPacket),
243 }
244 }
245 Err(err) => Err(err),
246 }
247 }
248 }
249 }
250 }
251 }
252 }
253 }
254 }
255 }
256
257 /// ### unsubscribe
258 ///
259 /// `unsubscribe` unsubscribe from Octopipes server; if thread is running it will be stopped
260
261 pub fn unsubscribe(&mut self) -> Result<(), OctopipesError> {
262 {
263 let client_state = self.state.lock().unwrap();
264 if *client_state != OctopipesState::Subscribed
265 && *client_state != OctopipesState::Running
266 {
267 return Err(OctopipesError::NotSubscribed);
268 }
269 }
270 //Prepare message
271 let payload: Vec<u8> = cap::encode_unsubscription();
272 match self.send_cap(payload) {
273 Err(err) => return Err(err),
274 Ok(..) => {}
275 }
276 //Stop loop
277 match self.loop_stop() {
278 Ok(..) => {}
279 Err(err) => return Err(err),
280 }
281 //Call on unsubscribed
282 match self.on_unsubscribed_fn {
283 Some(on_unsub) => {
284 (on_unsub)();
285 }
286 None => {}
287 }
288 //Set state to UNSUBSCRIBED
289 let mut client_state = self.state.lock().unwrap();
290 *client_state = OctopipesState::Unsubscribed;
291 Ok(())
292 }
293
294 //Send message functions
295
296 /// ### send_cap
297 ///
298 /// `send_cap` sends a message to server through the CAP
299
300 fn send_cap(&self, payload: Vec<u8>) -> Result<(), OctopipesError> {
301 //Prepare message
302 let mut message: OctopipesMessage = OctopipesMessage::new(
303 &self.version,
304 &Some(self.id.clone()),
305 &None,
306 60,
307 OctopipesOptions::empty(),
308 payload,
309 );
310 //Encode message
311 match serializer::encode_message(&mut message) {
312 Ok(data_out) => {
313 //Write message to cap
314 match pipes::pipe_write(&self.cap_pipe, 5000, data_out) {
315 Ok(..) => Ok(()),
316 Err(..) => Err(OctopipesError::WriteFailed),
317 }
318 }
319 Err(err) => Err(err),
320 }
321 }
322
323 /// ### send
324 ///
325 /// `send` sends a message to a certain remote
326
327 pub fn send(&self, remote: &String, data: Vec<u8>) -> Result<(), OctopipesError> {
328 self.send_ex(remote, data, 0, OctopipesOptions::empty())
329 }
330
331 /// ### send_ex
332 ///
333 /// `send_ex` sends a message to a certain remote with extended options
334
335 pub fn send_ex(
336 &self,
337 remote: &String,
338 data: Vec<u8>,
339 ttl: u8,
340 options: OctopipesOptions,
341 ) -> Result<(), OctopipesError> {
342 {
343 let client_state = self.state.lock().unwrap();
344 if *client_state != OctopipesState::Running
345 && *client_state != OctopipesState::Subscribed
346 {
347 return Err(OctopipesError::NotSubscribed);
348 }
349 if self.tx_pipe.is_none() {
350 return Err(OctopipesError::NotSubscribed);
351 }
352 }
353 //Prepare message
354 let mut message: OctopipesMessage = OctopipesMessage::new(
355 &self.version,
356 &Some(self.id.clone()),
357 &Some(remote.clone()),
358 ttl,
359 options,
360 data,
361 );
362 //Encode message
363 match serializer::encode_message(&mut message) {
364 Ok(data_out) => {
365 //Write message to cap
366 match pipes::pipe_write(&self.tx_pipe.as_ref().unwrap(), 5000, data_out) {
367 Ok(..) => {
368 //If on sent callback is set, call on sent
369 {
370 if self.on_sent_fn.as_ref().is_some() {
371 (self.on_sent_fn.as_ref().unwrap())(&message);
372 }
373 }
374 Ok(())
375 }
376 Err(..) => Err(OctopipesError::WriteFailed),
377 }
378 }
379 Err(err) => Err(err),
380 }
381 }
382
383 //@! Message readers
384 /// ### get_next_message
385 ///
386 /// `get_next_message` Gets the next available message on the receiver
387 /// If a message is available Ok(message) is returned
388 /// If no message is available Ok(None) is returned
389 /// If there was an error while reading inbox, Err(OctopipesError) is returned
390 pub fn get_next_message(&self) -> Result<Option<OctopipesMessage>, OctopipesError> {
391 {
392 //Check if thread is running
393 let current_state = self.state.lock().unwrap();
394 if *current_state != OctopipesState::Running {
395 return Err(OctopipesError::Uninitialized);
396 }
397 }
398 //Try receive
399 match self.client_receiver.as_ref() {
400 None => Err(OctopipesError::Uninitialized),
401 Some(receiver) => match receiver.try_recv() {
402 Ok(payload) => match payload {
403 Ok(message) => Ok(Some(message)),
404 Err(error) => Err(error),
405 },
406 Err(error) => match error {
407 mpsc::TryRecvError::Empty => Ok(None),
408 _ => Err(OctopipesError::ThreadError),
409 },
410 },
411 }
412 }
413
414 /// ### get_all_message
415 ///
416 /// `get_all_message` Gets all the available messages on the receiver
417 /// If there was no error while reading the inbox a vector with all the messages is returned (could have length 0)
418 /// If there was an error while reading inbox, Err(OctopipesError) is returned
419 pub fn get_all_message(&self) -> Result<Vec<OctopipesMessage>, OctopipesError> {
420 let mut inbox: Vec<OctopipesMessage> = Vec::new();
421 loop {
422 match self.get_next_message() {
423 Err(error) => return Err(error),
424 Ok(ret) => match ret {
425 Some(message) => inbox.push(message),
426 None => break,
427 },
428 }
429 }
430 Ok(inbox)
431 }
432
433 //Callbacks setters
434
435 /// ### set_on_received_callback
436 ///
437 /// `set_on_received_callback` sets the function to call on message received
438 pub fn set_on_received_callback(
439 &mut self,
440 callback: fn(Result<&OctopipesMessage, &OctopipesError>),
441 ) {
442 self.on_received_fn = Some(callback);
443 }
444
445 /// ### set_on_sent_callbacl
446 ///
447 /// `set_on_sent_callbacl` sets the function to call when a message is sent
448 pub fn set_on_sent_callback(&mut self, callback: fn(&OctopipesMessage)) {
449 self.on_sent_fn = Some(callback);
450 }
451
452 /// ### set_on_subscribed
453 ///
454 /// `set_on_subscribed` sets the function to call on a successful subscription to the Octopipes Server
455 pub fn set_on_subscribed(&mut self, callback: fn()) {
456 self.on_subscribed_fn = Some(callback);
457 }
458
459 /// ### set_on_unsubscribed
460 ///
461 /// `set_on_unsubscribed` sets the function to call on a successful unsubscription from Octopipes server
462 pub fn set_on_unsubscribed(&mut self, callback: fn()) {
463 self.on_unsubscribed_fn = Some(callback);
464 }
465}
466
467impl Drop for OctopipesClient {
468 fn drop(&mut self) {
469 //Stop thread
470 match self.loop_stop() {
471 Ok(_) => drop(self),
472 Err(error) => panic!(error), //Don't worry, it won't panic
473 }
474 }
475}