catenis_api_client/notification/
ws.rs

1use std::{
2    thread::{
3        self, JoinHandle,
4    },
5    sync::mpsc::{
6        self,
7        SyncSender, TryRecvError,
8    },
9    borrow::Cow
10};
11use reqwest::{
12    header::{
13        AUTHORIZATION, SEC_WEBSOCKET_PROTOCOL,
14        HeaderValue,
15    },
16};
17use tungstenite::{
18    self,
19    Message,
20    protocol::{
21        frame::coding::CloseCode,
22    },
23    client::{
24        IntoClientRequest,
25    },
26    stream::{
27        MaybeTlsStream,
28    },
29};
30use serde::{
31    Serialize,
32};
33
34use super::*;
35use crate::{
36    CatenisClient,
37    api::{
38        NotificationEvent,
39    },
40    Result, Error, X_BCOT_TIMESTAMP,
41    error::GenericError,
42};
43
44pub use tungstenite::protocol::CloseFrame;
45
46pub(crate) const NOTIFY_WS_PROTOCOL: &str = "notify.catenis.io";
47pub(crate) const NOTIFY_WS_CHANNEL_OPEN: &str = "NOTIFICATION_CHANNEL_OPEN";
48
49pub(crate) fn format_vec_limit<T>(v: Vec<T>, limit: usize) -> String
50    where
51        T: std::fmt::Debug
52{
53    let mut txt = format!("{:?}", &v);
54
55    if v.len() > limit {
56        txt = String::from(&txt[..txt.len() - 1]) + ", ...]";
57    }
58
59    txt
60}
61
62#[derive(Debug, Serialize)]
63#[serde(rename_all = "kebab-case")]
64pub(crate) struct WsNotifyChannelAuthentication {
65    pub(crate) x_bcot_timestamp: String,
66    pub(crate) authorization: String,
67}
68
69pub(crate) enum WsNotifyChannelCommand {
70    Close,
71    Drop,
72}
73
74pub(crate) enum NotifyEventHandlerMessage {
75    Drop,
76    NotifyEvent(WsNotifyChannelEvent),
77}
78
79/// Events to monitor on a WebSocket notification channel.
80#[derive(Debug)]
81pub enum WsNotifyChannelEvent {
82    /// An error took place in the WebSocket notification channel.
83    Error(Error),
84    /// The underlying WebSocket connection has been closed, and thus the notification channel
85    /// itself too. It may contain the returned close code and reason.
86    Close(Option<CloseFrame<'static>>),
87    /// WebSocket notification channel successfully open and ready to send notifications.
88    Open,
89    /// New incoming notification.
90    Notify(NotificationMessage)
91}
92
93/// Represents a Catenis WebSocket notification channel.
94///
95/// This is used to receive notifications from the Catenis system.
96///
97/// An instance of this object should be obtained from a [`CatenisClient`] object via its
98/// [`new_ws_notify_channel`](CatenisClient::new_ws_notify_channel) method.
99#[derive(Debug, Clone)]
100pub struct WsNotifyChannel{
101    pub(crate) api_client: CatenisClient,
102    pub(crate) event: NotificationEvent,
103    tx: Option<SyncSender<WsNotifyChannelCommand>>,
104}
105
106impl WsNotifyChannel {
107    pub(crate) fn new(api_client: &CatenisClient, event: NotificationEvent) -> Self {
108        WsNotifyChannel {
109            api_client: api_client.clone(),
110            event,
111            tx: None,
112        }
113    }
114
115    /// Open the WebSocket notification channel setting up a handler to monitor the activity on
116    /// that channel.
117    ///
118    /// > **Note**: this is a non-blocking operation. The provided handler function is run on its
119    /// own thread.
120    ///
121    /// # Example
122    ///
123    /// ```no_run
124    /// use std::sync::{Arc, Mutex};
125    /// use catenis_api_client::{
126    ///     CatenisClient, ClientOptions, Environment, Result,
127    ///     api::NotificationEvent,
128    ///     notification::WsNotifyChannelEvent,
129    /// };
130    ///
131    /// # fn main() -> Result<()> {
132    /// let ctn_client = CatenisClient::new_with_options(
133    ///     Some((
134    ///         "drc3XdxNtzoucpw9xiRp",
135    ///         concat!(
136    ///             "4c1749c8e86f65e0a73e5fb19f2aa9e74a716bc22d7956bf3072b4bc3fbfe2a0",
137    ///             "d138ad0d4bcfee251e4e5f54d6e92b8fd4eb36958a7aeaeeb51e8d2fcc4552c3"
138    ///         ),
139    ///     ).into()),
140    ///     &[
141    ///         ClientOptions::Environment(Environment::Sandbox),
142    ///     ],
143    /// )?;
144    ///
145    /// // Instantiate WebSocket notification channel object for New Message Received
146    /// //  notification event
147    /// let notify_channel = Arc::new(Mutex::new(
148    ///     ctn_client.new_ws_notify_channel(NotificationEvent::NewMsgReceived)
149    /// ));
150    /// let notify_channel_2 = notify_channel.clone();
151    ///
152    /// let notify_thread = notify_channel.lock().unwrap()
153    ///     // Open WebSocket notification channel and monitor events on it
154    ///     .open(move |event: WsNotifyChannelEvent| {
155    ///         let notify_channel = notify_channel_2.lock().unwrap();
156    ///
157    ///         match event {
158    ///             WsNotifyChannelEvent::Error(err) => {
159    ///                 println!("WebSocket notification channel error: {:?}", err);
160    ///             },
161    ///             WsNotifyChannelEvent::Open => {
162    ///                 println!("WebSocket notification channel open");
163    ///             },
164    ///             WsNotifyChannelEvent::Close(close_info) => {
165    ///                 println!("WebSocket notification channel closed: {:?}", close_info);
166    ///             },
167    ///             WsNotifyChannelEvent::Notify(notify_msg) => {
168    ///                 println!("Received notification (new message read): {:?}", notify_msg);
169    ///                 notify_channel.close();
170    ///             },
171    ///         }
172    ///     })?;
173    /// # Ok(())
174    /// # }
175    /// ```
176    pub fn open<F>(&mut self, notify_event_handler: F) -> Result<JoinHandle<()>>
177        where
178            F: Fn(WsNotifyChannelEvent) + Send + 'static
179    {
180        // Prepare to connect to Catenis WebSocket notification service
181        //  Note: this request is only used to assemble the URL for the notification service
182        //      and generate the required data for authentication with the notification service.
183        //      The actual request used to open a WebSocket connection is created below
184        //      (from this request's URL).
185        let mut auth_req = self.api_client.get_ws_request(
186            "notify/ws/:event_name",
187            Some(&[("event_name", self.event.to_string().as_str())])
188        )?;
189
190        self.api_client.sign_request(&mut auth_req)?;
191
192        let ws_notify_auth_msg_json = serde_json::to_string(
193            &WsNotifyChannelAuthentication {
194                x_bcot_timestamp: auth_req.headers()
195                    .get(X_BCOT_TIMESTAMP)
196                    .unwrap_or(&HeaderValue::from_static(""))
197                    .to_str()?
198                    .into(),
199                authorization: auth_req.headers()
200                    .get(AUTHORIZATION)
201                    .unwrap_or(&HeaderValue::from_static(""))
202                    .to_str()?
203                    .into()
204            }
205        )?;
206
207        // Create request to open WebSocket connection
208        let mut req = auth_req.url().as_str().into_client_request()?;
209
210        // Add HTTP header specifying the expected WebSocket subprotocol
211        req.headers_mut().insert(SEC_WEBSOCKET_PROTOCOL, HeaderValue::from_static(NOTIFY_WS_PROTOCOL));
212
213        // Try to establish WebSocket connection
214        let (mut ws, _) = tungstenite::connect(req)
215            .map_err(|err| Error::new_client_error(
216                Some("Failed to establish WebSocket connection"),
217                Some(err)
218            ))?;
219
220        // Set read timeout for WebSocket connection
221        match ws.get_ref() {
222            MaybeTlsStream::Plain(stream) =>  stream,
223            MaybeTlsStream::NativeTls(tls_stream) => tls_stream.get_ref(),
224            &_ => panic!("Unexpected TLS stream type"),
225        }.set_read_timeout(Some(std::time::Duration::from_millis(500)))
226            .map_err(|err| Error::new_client_error(
227                Some("Failed to set read timeout for WebSocket connection"),
228                Some(err)
229            ))?;
230
231        // Prepare to create thread to run WebSocket connection
232        let (tx, rx) = mpsc::sync_channel(128);
233
234        // Save communication channel with WebSocket thread
235        self.tx = Some(tx);
236
237        Ok(thread::spawn(move || {
238            // Create notification event handler thread
239            let (h_tx, h_rx) = mpsc::channel();
240
241            thread::spawn(move || {
242                loop {
243                    match h_rx.recv() {
244                        Ok(msg) => {
245                            match msg {
246                                NotifyEventHandlerMessage::Drop => {
247                                    // Request to exit thread. So just do it
248                                    break;
249                                },
250                                NotifyEventHandlerMessage::NotifyEvent(event) => {
251                                    // Call handler passing notification event
252                                    notify_event_handler(event);
253                                }
254                            }
255                        },
256                        Err(_) => {
257                            // Lost communication with parent thread. End this thread
258                            break;
259                        },
260                    }
261                }
262            });
263
264            // Send authentication message
265            if let Err(err) = ws.write_message(Message::Text(ws_notify_auth_msg_json)) {
266                let ctn_error = if let tungstenite::error::Error::ConnectionClosed = err {
267                    // WebSocket connection has been closed
268                    Error::new_client_error(
269                        Some("Failed to send WebSocket notification channel authentication message; WebSocket connection closed unexpectedly"),
270                        None::<GenericError>
271                    )
272                } else {
273                    // Any other error
274                    Error::new_client_error(
275                        Some("Failed to send WebSocket notification channel authentication message"),
276                        Some(err)
277                    )
278                };
279
280                // Send error message to notification event handler thread...
281                h_tx.send(
282                    NotifyEventHandlerMessage::NotifyEvent(
283                        WsNotifyChannelEvent::Error(ctn_error)
284                    )
285                ).unwrap_or(());
286
287                // and exit current thread (requesting child thread to exit too)
288                h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
289                return;
290            }
291
292            loop {
293                // Receive data from WebSocket connection
294                match ws.read_message() {
295                    Ok(msg) => {
296                        match msg {
297                            Message::Text(text) => {
298                                // A text message was received
299                                if text == NOTIFY_WS_CHANNEL_OPEN {
300                                    // WebSocket notification channel open and ready to send
301                                    //  notification. Send open message to notification event
302                                    //  handler thread
303                                    h_tx.send(
304                                        NotifyEventHandlerMessage::NotifyEvent(
305                                            WsNotifyChannelEvent::Open
306                                        )
307                                    ).unwrap_or(());
308                                } else {
309                                    // Parse received message
310                                    match serde_json::from_str(text.as_str()) {
311                                        Ok(notify_message) => {
312                                            // Send notify message to notification event handler
313                                            //  thread
314                                            h_tx.send(
315                                                NotifyEventHandlerMessage::NotifyEvent(
316                                                    WsNotifyChannelEvent::Notify(notify_message)
317                                                )
318                                            ).unwrap_or(());
319                                        },
320                                        Err(_) => {
321                                            // Unexpected notification message. Force closing of
322                                            //  WebSocket notification channel reporting error
323                                            //  condition
324                                            if let Err(err) = ws.close(Some(CloseFrame {
325                                                code: CloseCode::Library(4000),
326                                                reason: Cow::from(format!("Unexpected notification message received: {}", text))
327                                            })) {
328                                                if let tungstenite::error::Error::ConnectionClosed = err {
329                                                    // WebSocket connection has already been closed. Just exit
330                                                    //  current thread (requesting child thread to exit too)
331                                                    h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
332                                                    return;
333                                                } else {
334                                                    // Any other error. Send error message to notification
335                                                    //  event handler thread...
336                                                    h_tx.send(
337                                                        NotifyEventHandlerMessage::NotifyEvent(
338                                                            WsNotifyChannelEvent::Error(
339                                                                Error::new_client_error(
340                                                                    Some("Failed to close WebSocket connection"),
341                                                                    Some(err)
342                                                                )
343                                                            )
344                                                        )
345                                                    ).unwrap_or(());
346
347                                                    // and exit current thread (requesting child thread to exit too)
348                                                    h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
349                                                    return;
350                                                }
351                                            }
352                                        },
353                                    }
354                                }
355                            },
356                            Message::Binary(bin) => {
357                                // A binary message was received. This is unexpected, so
358                                //  force closing of WebSocket notification channel reporting
359                                //  the error condition
360                                if let Err(err) = ws.close(Some(CloseFrame {
361                                    code: CloseCode::Unsupported,
362                                    reason: Cow::from(format!("Unexpected binary message received: {}", format_vec_limit(bin, 20)))
363                                })) {
364                                    if let tungstenite::error::Error::ConnectionClosed = err {
365                                        // WebSocket connection has already been closed. Just exit
366                                        //  current thread (requesting child thread to exit too)
367                                        h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
368                                        return;
369                                    } else {
370                                        // Any other error. Send error message to notification
371                                        //  event handler thread...
372                                        h_tx.send(
373                                            NotifyEventHandlerMessage::NotifyEvent(
374                                                WsNotifyChannelEvent::Error(
375                                                    Error::new_client_error(
376                                                        Some("Failed to close WebSocket connection"),
377                                                        Some(err)
378                                                    )
379                                                )
380                                            )
381                                        ).unwrap_or(());
382
383                                        // and exit current thread (requesting child thread to exit too)
384                                        h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
385                                        return;
386                                    }
387                                }
388                            },
389                            Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => (),
390                            Message::Close(close_info) => {
391                                // WebSocket connection is being closed. Send close message
392                                //  to notification event handler thread...
393                                h_tx.send(
394                                    NotifyEventHandlerMessage::NotifyEvent(
395                                        WsNotifyChannelEvent::Close(close_info)
396                                    )
397                                ).unwrap_or(());
398
399                                // and continue precessing normally until receiving confirmation
400                                //  (via Error::ConnectionClosed) that WebSocket connection has
401                                //  been closed
402                            }
403                        }
404                    },
405                    Err(err) => {
406                        let mut err_to_report = None;
407                        let mut exit = false;
408
409                        match &err {
410                            tungstenite::error::Error::Io(io_err) => {
411                                match io_err.kind() {
412                                    std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
413                                        // Timeout reading data from WebSocket connection. Just
414                                        //  continue processing
415                                    },
416                                    _ => {
417                                        // Any other I/O error. Indicate that error should be
418                                        //  reported and thread exited
419                                        err_to_report = Some(err);
420                                        exit = true;
421                                    }
422                                }
423                            },
424                            tungstenite::error::Error::ConnectionClosed => {
425                                // WebSocket connection has been closed. Indicate that
426                                //  thread should be exited
427                                exit = true;
428                            },
429                            _ => {
430                                // Any other error. Indicate that error should be
431                                //  reported and thread exited
432                                err_to_report = Some(err);
433                                exit = true;
434                            }
435                        }
436
437                        if let Some(err) = err_to_report {
438                            // Send error message to notification event
439                            //  handler thread
440                            h_tx.send(
441                                NotifyEventHandlerMessage::NotifyEvent(
442                                    WsNotifyChannelEvent::Error(
443                                        Error::new_client_error(
444                                            Some("Failed to send WebSocket notification channel authentication message"),
445                                            Some(err)
446                                        )
447                                    )
448                                )
449                            ).unwrap_or(());
450                        }
451
452                        if exit {
453                            // Exit current thread (requesting child thread to exit too)
454                            h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
455                            return;
456                        }
457                    }
458                }
459
460                // Check for command from main thread
461                match rx.try_recv() {
462                    Ok(msg) => {
463                        match msg {
464                            WsNotifyChannelCommand::Drop => {
465                                // Exit current thread (requesting child thread to exit too)
466                                h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
467                                return;
468                            },
469                            WsNotifyChannelCommand::Close => {
470                                // Close WebSocket connection
471                                if let Err(err) = ws.close(Some(CloseFrame {
472                                    code: CloseCode::Normal,
473                                    reason: Cow::from("")
474                                })) {
475                                    if let tungstenite::error::Error::ConnectionClosed = err {
476                                        // WebSocket connection has already been closed. Just exit
477                                        //  current thread (requesting child thread to exit too)
478                                        h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
479                                        return;
480                                    } else {
481                                        // Any other error. Send error message to notification
482                                        //  event handler thread...
483                                        h_tx.send(
484                                            NotifyEventHandlerMessage::NotifyEvent(
485                                                WsNotifyChannelEvent::Error(
486                                                    Error::new_client_error(
487                                                        Some("Failed to close WebSocket connection"),
488                                                        Some(err)
489                                                    )
490                                                )
491                                            )
492                                        ).unwrap_or(());
493
494                                        // and exit current thread (requesting child thread to exit too)
495                                        h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
496                                        return;
497                                    }
498                                }
499                            },
500                        }
501                    },
502                    Err(err) => {
503                        match err {
504                            TryRecvError::Disconnected => {
505                                // Lost communication with main thread. Exit current thread
506                                //  (requesting child thread to exit too)
507                                h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
508                                return;
509                            },
510                            TryRecvError::Empty => {
511                                // No data to be received now. Just continue processing
512                            }
513                        }
514                    },
515                }
516            }
517        }))
518    }
519
520    /// Close the WebSocket notification channel.
521    pub fn close(&self) {
522        if let Some(tx) = &self.tx {
523            // Send command to notification event handler thread to close WebSocket
524            //  notification channel
525            tx.send(WsNotifyChannelCommand::Close).unwrap_or(());
526        }
527    }
528}
529
530impl Drop for WsNotifyChannel {
531    fn drop(&mut self) {
532        if let Some(tx) = &self.tx {
533            // Send command to notification event handler thread to stop it
534            tx.send(WsNotifyChannelCommand::Drop).unwrap_or(());
535        }
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542
543    #[test]
544    fn it_serialize_ws_notify_channel_authentication() {
545        let ws_notify_channel_authentication = WsNotifyChannelAuthentication {
546            x_bcot_timestamp: String::from("20201210T203848Z"),
547            authorization: String::from("CTN1-HMAC-SHA256 Credential=drc3XdxNtzoucpw9xiRp/20201210/ctn1_request, Signature=7c8a878788b0bf6ddcc38f47a590ed6b261cb18a0261fefb42f9db1ee2fcb866"),
548        };
549
550        let json = serde_json::to_string(&ws_notify_channel_authentication).unwrap();
551
552        assert_eq!(json, r#"{"x-bcot-timestamp":"20201210T203848Z","authorization":"CTN1-HMAC-SHA256 Credential=drc3XdxNtzoucpw9xiRp/20201210/ctn1_request, Signature=7c8a878788b0bf6ddcc38f47a590ed6b261cb18a0261fefb42f9db1ee2fcb866"}"#);
553    }
554
555    #[test]
556    fn it_process_ws_notify_channel_events() {
557        use std::sync::{Arc, Mutex};
558        use crate::*;
559
560        let ctn_client = CatenisClient::new_with_options(
561            Some((
562                "drc3XdxNtzoucpw9xiRp",
563                "4c1749c8e86f65e0a73e5fb19f2aa9e74a716bc22d7956bf3072b4bc3fbfe2a0d138ad0d4bcfee251e4e5f54d6e92b8fd4eb36958a7aeaeeb51e8d2fcc4552c3",
564            ).into()),
565            &[
566                ClientOptions::Host("localhost:3000"),
567                ClientOptions::Secure(false),
568                ClientOptions::UseCompression(false)
569            ],
570        ).unwrap();
571
572        // Open WebSocket notification channel closing it after first notify message is received
573        let notify_channel = Arc::new(Mutex::new(
574            ctn_client.new_ws_notify_channel(NotificationEvent::NewMsgReceived)
575        ));
576        let notify_channel_2 = notify_channel.clone();
577
578        let notify_thread = notify_channel.lock().unwrap()
579            // Note: we need to access a reference of notify_channel inside the notify_event_handler
580            //  closure. That's why we need to wrap it around Arc<Mutex<>> (see above)
581            .open(move |event: WsNotifyChannelEvent| {
582                let notify_channel = notify_channel_2.lock().unwrap();
583
584                match event {
585                    WsNotifyChannelEvent::Error(err) => {
586                        println!(">>>>>> WebSocket Notification Channel: Error event: {:?}", err);
587                    },
588                    WsNotifyChannelEvent::Open => {
589                        println!(">>>>>> WebSocket Notification Channel: Open event");
590                    },
591                    WsNotifyChannelEvent::Close(close_info) => {
592                        println!(">>>>>> WebSocket Notification Channel: Close event: {:?}", close_info);
593                    },
594                    WsNotifyChannelEvent::Notify(notify_msg) => {
595                        println!(">>>>>> WebSocket Notification Channel: Notify event: {:?}", notify_msg);
596                        notify_channel.close();
597                    },
598                }
599            }).unwrap();
600
601        // Set up timeout to close WebSocket notification channel if no notify message
602        //  is received within a given period of time
603        let notify_channel_3 = notify_channel.clone();
604
605        thread::spawn(move || {
606            thread::sleep(std::time::Duration::from_secs(30));
607
608            notify_channel_3.lock().unwrap()
609                .close();
610        });
611
612        // Wait for notification thread to end
613        notify_thread.join().unwrap();
614    }
615}