catenis_api_client/notification/ws.rs
1use std::{
2 thread::{
3 self, JoinHandle,
4 },
5 sync::mpsc::{
6 self,
7 SyncSender, TryRecvError,
8 },
9 borrow::Cow
10};
11use reqwest::{
12 header::{
13 AUTHORIZATION, SEC_WEBSOCKET_PROTOCOL,
14 HeaderValue,
15 },
16};
17use tungstenite::{
18 self,
19 Message,
20 protocol::{
21 frame::coding::CloseCode,
22 },
23 client::{
24 IntoClientRequest,
25 },
26 stream::{
27 MaybeTlsStream,
28 },
29};
30use serde::{
31 Serialize,
32};
33
34use super::*;
35use crate::{
36 CatenisClient,
37 api::{
38 NotificationEvent,
39 },
40 Result, Error, X_BCOT_TIMESTAMP,
41 error::GenericError,
42};
43
44pub use tungstenite::protocol::CloseFrame;
45
46pub(crate) const NOTIFY_WS_PROTOCOL: &str = "notify.catenis.io";
47pub(crate) const NOTIFY_WS_CHANNEL_OPEN: &str = "NOTIFICATION_CHANNEL_OPEN";
48
49pub(crate) fn format_vec_limit<T>(v: Vec<T>, limit: usize) -> String
50 where
51 T: std::fmt::Debug
52{
53 let mut txt = format!("{:?}", &v);
54
55 if v.len() > limit {
56 txt = String::from(&txt[..txt.len() - 1]) + ", ...]";
57 }
58
59 txt
60}
61
62#[derive(Debug, Serialize)]
63#[serde(rename_all = "kebab-case")]
64pub(crate) struct WsNotifyChannelAuthentication {
65 pub(crate) x_bcot_timestamp: String,
66 pub(crate) authorization: String,
67}
68
69pub(crate) enum WsNotifyChannelCommand {
70 Close,
71 Drop,
72}
73
74pub(crate) enum NotifyEventHandlerMessage {
75 Drop,
76 NotifyEvent(WsNotifyChannelEvent),
77}
78
79/// Events to monitor on a WebSocket notification channel.
80#[derive(Debug)]
81pub enum WsNotifyChannelEvent {
82 /// An error took place in the WebSocket notification channel.
83 Error(Error),
84 /// The underlying WebSocket connection has been closed, and thus the notification channel
85 /// itself too. It may contain the returned close code and reason.
86 Close(Option<CloseFrame<'static>>),
87 /// WebSocket notification channel successfully open and ready to send notifications.
88 Open,
89 /// New incoming notification.
90 Notify(NotificationMessage)
91}
92
93/// Represents a Catenis WebSocket notification channel.
94///
95/// This is used to receive notifications from the Catenis system.
96///
97/// An instance of this object should be obtained from a [`CatenisClient`] object via its
98/// [`new_ws_notify_channel`](CatenisClient::new_ws_notify_channel) method.
99#[derive(Debug, Clone)]
100pub struct WsNotifyChannel{
101 pub(crate) api_client: CatenisClient,
102 pub(crate) event: NotificationEvent,
103 tx: Option<SyncSender<WsNotifyChannelCommand>>,
104}
105
106impl WsNotifyChannel {
107 pub(crate) fn new(api_client: &CatenisClient, event: NotificationEvent) -> Self {
108 WsNotifyChannel {
109 api_client: api_client.clone(),
110 event,
111 tx: None,
112 }
113 }
114
115 /// Open the WebSocket notification channel setting up a handler to monitor the activity on
116 /// that channel.
117 ///
118 /// > **Note**: this is a non-blocking operation. The provided handler function is run on its
119 /// own thread.
120 ///
121 /// # Example
122 ///
123 /// ```no_run
124 /// use std::sync::{Arc, Mutex};
125 /// use catenis_api_client::{
126 /// CatenisClient, ClientOptions, Environment, Result,
127 /// api::NotificationEvent,
128 /// notification::WsNotifyChannelEvent,
129 /// };
130 ///
131 /// # fn main() -> Result<()> {
132 /// let ctn_client = CatenisClient::new_with_options(
133 /// Some((
134 /// "drc3XdxNtzoucpw9xiRp",
135 /// concat!(
136 /// "4c1749c8e86f65e0a73e5fb19f2aa9e74a716bc22d7956bf3072b4bc3fbfe2a0",
137 /// "d138ad0d4bcfee251e4e5f54d6e92b8fd4eb36958a7aeaeeb51e8d2fcc4552c3"
138 /// ),
139 /// ).into()),
140 /// &[
141 /// ClientOptions::Environment(Environment::Sandbox),
142 /// ],
143 /// )?;
144 ///
145 /// // Instantiate WebSocket notification channel object for New Message Received
146 /// // notification event
147 /// let notify_channel = Arc::new(Mutex::new(
148 /// ctn_client.new_ws_notify_channel(NotificationEvent::NewMsgReceived)
149 /// ));
150 /// let notify_channel_2 = notify_channel.clone();
151 ///
152 /// let notify_thread = notify_channel.lock().unwrap()
153 /// // Open WebSocket notification channel and monitor events on it
154 /// .open(move |event: WsNotifyChannelEvent| {
155 /// let notify_channel = notify_channel_2.lock().unwrap();
156 ///
157 /// match event {
158 /// WsNotifyChannelEvent::Error(err) => {
159 /// println!("WebSocket notification channel error: {:?}", err);
160 /// },
161 /// WsNotifyChannelEvent::Open => {
162 /// println!("WebSocket notification channel open");
163 /// },
164 /// WsNotifyChannelEvent::Close(close_info) => {
165 /// println!("WebSocket notification channel closed: {:?}", close_info);
166 /// },
167 /// WsNotifyChannelEvent::Notify(notify_msg) => {
168 /// println!("Received notification (new message read): {:?}", notify_msg);
169 /// notify_channel.close();
170 /// },
171 /// }
172 /// })?;
173 /// # Ok(())
174 /// # }
175 /// ```
176 pub fn open<F>(&mut self, notify_event_handler: F) -> Result<JoinHandle<()>>
177 where
178 F: Fn(WsNotifyChannelEvent) + Send + 'static
179 {
180 // Prepare to connect to Catenis WebSocket notification service
181 // Note: this request is only used to assemble the URL for the notification service
182 // and generate the required data for authentication with the notification service.
183 // The actual request used to open a WebSocket connection is created below
184 // (from this request's URL).
185 let mut auth_req = self.api_client.get_ws_request(
186 "notify/ws/:event_name",
187 Some(&[("event_name", self.event.to_string().as_str())])
188 )?;
189
190 self.api_client.sign_request(&mut auth_req)?;
191
192 let ws_notify_auth_msg_json = serde_json::to_string(
193 &WsNotifyChannelAuthentication {
194 x_bcot_timestamp: auth_req.headers()
195 .get(X_BCOT_TIMESTAMP)
196 .unwrap_or(&HeaderValue::from_static(""))
197 .to_str()?
198 .into(),
199 authorization: auth_req.headers()
200 .get(AUTHORIZATION)
201 .unwrap_or(&HeaderValue::from_static(""))
202 .to_str()?
203 .into()
204 }
205 )?;
206
207 // Create request to open WebSocket connection
208 let mut req = auth_req.url().as_str().into_client_request()?;
209
210 // Add HTTP header specifying the expected WebSocket subprotocol
211 req.headers_mut().insert(SEC_WEBSOCKET_PROTOCOL, HeaderValue::from_static(NOTIFY_WS_PROTOCOL));
212
213 // Try to establish WebSocket connection
214 let (mut ws, _) = tungstenite::connect(req)
215 .map_err(|err| Error::new_client_error(
216 Some("Failed to establish WebSocket connection"),
217 Some(err)
218 ))?;
219
220 // Set read timeout for WebSocket connection
221 match ws.get_ref() {
222 MaybeTlsStream::Plain(stream) => stream,
223 MaybeTlsStream::NativeTls(tls_stream) => tls_stream.get_ref(),
224 &_ => panic!("Unexpected TLS stream type"),
225 }.set_read_timeout(Some(std::time::Duration::from_millis(500)))
226 .map_err(|err| Error::new_client_error(
227 Some("Failed to set read timeout for WebSocket connection"),
228 Some(err)
229 ))?;
230
231 // Prepare to create thread to run WebSocket connection
232 let (tx, rx) = mpsc::sync_channel(128);
233
234 // Save communication channel with WebSocket thread
235 self.tx = Some(tx);
236
237 Ok(thread::spawn(move || {
238 // Create notification event handler thread
239 let (h_tx, h_rx) = mpsc::channel();
240
241 thread::spawn(move || {
242 loop {
243 match h_rx.recv() {
244 Ok(msg) => {
245 match msg {
246 NotifyEventHandlerMessage::Drop => {
247 // Request to exit thread. So just do it
248 break;
249 },
250 NotifyEventHandlerMessage::NotifyEvent(event) => {
251 // Call handler passing notification event
252 notify_event_handler(event);
253 }
254 }
255 },
256 Err(_) => {
257 // Lost communication with parent thread. End this thread
258 break;
259 },
260 }
261 }
262 });
263
264 // Send authentication message
265 if let Err(err) = ws.write_message(Message::Text(ws_notify_auth_msg_json)) {
266 let ctn_error = if let tungstenite::error::Error::ConnectionClosed = err {
267 // WebSocket connection has been closed
268 Error::new_client_error(
269 Some("Failed to send WebSocket notification channel authentication message; WebSocket connection closed unexpectedly"),
270 None::<GenericError>
271 )
272 } else {
273 // Any other error
274 Error::new_client_error(
275 Some("Failed to send WebSocket notification channel authentication message"),
276 Some(err)
277 )
278 };
279
280 // Send error message to notification event handler thread...
281 h_tx.send(
282 NotifyEventHandlerMessage::NotifyEvent(
283 WsNotifyChannelEvent::Error(ctn_error)
284 )
285 ).unwrap_or(());
286
287 // and exit current thread (requesting child thread to exit too)
288 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
289 return;
290 }
291
292 loop {
293 // Receive data from WebSocket connection
294 match ws.read_message() {
295 Ok(msg) => {
296 match msg {
297 Message::Text(text) => {
298 // A text message was received
299 if text == NOTIFY_WS_CHANNEL_OPEN {
300 // WebSocket notification channel open and ready to send
301 // notification. Send open message to notification event
302 // handler thread
303 h_tx.send(
304 NotifyEventHandlerMessage::NotifyEvent(
305 WsNotifyChannelEvent::Open
306 )
307 ).unwrap_or(());
308 } else {
309 // Parse received message
310 match serde_json::from_str(text.as_str()) {
311 Ok(notify_message) => {
312 // Send notify message to notification event handler
313 // thread
314 h_tx.send(
315 NotifyEventHandlerMessage::NotifyEvent(
316 WsNotifyChannelEvent::Notify(notify_message)
317 )
318 ).unwrap_or(());
319 },
320 Err(_) => {
321 // Unexpected notification message. Force closing of
322 // WebSocket notification channel reporting error
323 // condition
324 if let Err(err) = ws.close(Some(CloseFrame {
325 code: CloseCode::Library(4000),
326 reason: Cow::from(format!("Unexpected notification message received: {}", text))
327 })) {
328 if let tungstenite::error::Error::ConnectionClosed = err {
329 // WebSocket connection has already been closed. Just exit
330 // current thread (requesting child thread to exit too)
331 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
332 return;
333 } else {
334 // Any other error. Send error message to notification
335 // event handler thread...
336 h_tx.send(
337 NotifyEventHandlerMessage::NotifyEvent(
338 WsNotifyChannelEvent::Error(
339 Error::new_client_error(
340 Some("Failed to close WebSocket connection"),
341 Some(err)
342 )
343 )
344 )
345 ).unwrap_or(());
346
347 // and exit current thread (requesting child thread to exit too)
348 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
349 return;
350 }
351 }
352 },
353 }
354 }
355 },
356 Message::Binary(bin) => {
357 // A binary message was received. This is unexpected, so
358 // force closing of WebSocket notification channel reporting
359 // the error condition
360 if let Err(err) = ws.close(Some(CloseFrame {
361 code: CloseCode::Unsupported,
362 reason: Cow::from(format!("Unexpected binary message received: {}", format_vec_limit(bin, 20)))
363 })) {
364 if let tungstenite::error::Error::ConnectionClosed = err {
365 // WebSocket connection has already been closed. Just exit
366 // current thread (requesting child thread to exit too)
367 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
368 return;
369 } else {
370 // Any other error. Send error message to notification
371 // event handler thread...
372 h_tx.send(
373 NotifyEventHandlerMessage::NotifyEvent(
374 WsNotifyChannelEvent::Error(
375 Error::new_client_error(
376 Some("Failed to close WebSocket connection"),
377 Some(err)
378 )
379 )
380 )
381 ).unwrap_or(());
382
383 // and exit current thread (requesting child thread to exit too)
384 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
385 return;
386 }
387 }
388 },
389 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => (),
390 Message::Close(close_info) => {
391 // WebSocket connection is being closed. Send close message
392 // to notification event handler thread...
393 h_tx.send(
394 NotifyEventHandlerMessage::NotifyEvent(
395 WsNotifyChannelEvent::Close(close_info)
396 )
397 ).unwrap_or(());
398
399 // and continue precessing normally until receiving confirmation
400 // (via Error::ConnectionClosed) that WebSocket connection has
401 // been closed
402 }
403 }
404 },
405 Err(err) => {
406 let mut err_to_report = None;
407 let mut exit = false;
408
409 match &err {
410 tungstenite::error::Error::Io(io_err) => {
411 match io_err.kind() {
412 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
413 // Timeout reading data from WebSocket connection. Just
414 // continue processing
415 },
416 _ => {
417 // Any other I/O error. Indicate that error should be
418 // reported and thread exited
419 err_to_report = Some(err);
420 exit = true;
421 }
422 }
423 },
424 tungstenite::error::Error::ConnectionClosed => {
425 // WebSocket connection has been closed. Indicate that
426 // thread should be exited
427 exit = true;
428 },
429 _ => {
430 // Any other error. Indicate that error should be
431 // reported and thread exited
432 err_to_report = Some(err);
433 exit = true;
434 }
435 }
436
437 if let Some(err) = err_to_report {
438 // Send error message to notification event
439 // handler thread
440 h_tx.send(
441 NotifyEventHandlerMessage::NotifyEvent(
442 WsNotifyChannelEvent::Error(
443 Error::new_client_error(
444 Some("Failed to send WebSocket notification channel authentication message"),
445 Some(err)
446 )
447 )
448 )
449 ).unwrap_or(());
450 }
451
452 if exit {
453 // Exit current thread (requesting child thread to exit too)
454 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
455 return;
456 }
457 }
458 }
459
460 // Check for command from main thread
461 match rx.try_recv() {
462 Ok(msg) => {
463 match msg {
464 WsNotifyChannelCommand::Drop => {
465 // Exit current thread (requesting child thread to exit too)
466 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
467 return;
468 },
469 WsNotifyChannelCommand::Close => {
470 // Close WebSocket connection
471 if let Err(err) = ws.close(Some(CloseFrame {
472 code: CloseCode::Normal,
473 reason: Cow::from("")
474 })) {
475 if let tungstenite::error::Error::ConnectionClosed = err {
476 // WebSocket connection has already been closed. Just exit
477 // current thread (requesting child thread to exit too)
478 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
479 return;
480 } else {
481 // Any other error. Send error message to notification
482 // event handler thread...
483 h_tx.send(
484 NotifyEventHandlerMessage::NotifyEvent(
485 WsNotifyChannelEvent::Error(
486 Error::new_client_error(
487 Some("Failed to close WebSocket connection"),
488 Some(err)
489 )
490 )
491 )
492 ).unwrap_or(());
493
494 // and exit current thread (requesting child thread to exit too)
495 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
496 return;
497 }
498 }
499 },
500 }
501 },
502 Err(err) => {
503 match err {
504 TryRecvError::Disconnected => {
505 // Lost communication with main thread. Exit current thread
506 // (requesting child thread to exit too)
507 h_tx.send(NotifyEventHandlerMessage::Drop).unwrap_or(());
508 return;
509 },
510 TryRecvError::Empty => {
511 // No data to be received now. Just continue processing
512 }
513 }
514 },
515 }
516 }
517 }))
518 }
519
520 /// Close the WebSocket notification channel.
521 pub fn close(&self) {
522 if let Some(tx) = &self.tx {
523 // Send command to notification event handler thread to close WebSocket
524 // notification channel
525 tx.send(WsNotifyChannelCommand::Close).unwrap_or(());
526 }
527 }
528}
529
530impl Drop for WsNotifyChannel {
531 fn drop(&mut self) {
532 if let Some(tx) = &self.tx {
533 // Send command to notification event handler thread to stop it
534 tx.send(WsNotifyChannelCommand::Drop).unwrap_or(());
535 }
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542
543 #[test]
544 fn it_serialize_ws_notify_channel_authentication() {
545 let ws_notify_channel_authentication = WsNotifyChannelAuthentication {
546 x_bcot_timestamp: String::from("20201210T203848Z"),
547 authorization: String::from("CTN1-HMAC-SHA256 Credential=drc3XdxNtzoucpw9xiRp/20201210/ctn1_request, Signature=7c8a878788b0bf6ddcc38f47a590ed6b261cb18a0261fefb42f9db1ee2fcb866"),
548 };
549
550 let json = serde_json::to_string(&ws_notify_channel_authentication).unwrap();
551
552 assert_eq!(json, r#"{"x-bcot-timestamp":"20201210T203848Z","authorization":"CTN1-HMAC-SHA256 Credential=drc3XdxNtzoucpw9xiRp/20201210/ctn1_request, Signature=7c8a878788b0bf6ddcc38f47a590ed6b261cb18a0261fefb42f9db1ee2fcb866"}"#);
553 }
554
555 #[test]
556 fn it_process_ws_notify_channel_events() {
557 use std::sync::{Arc, Mutex};
558 use crate::*;
559
560 let ctn_client = CatenisClient::new_with_options(
561 Some((
562 "drc3XdxNtzoucpw9xiRp",
563 "4c1749c8e86f65e0a73e5fb19f2aa9e74a716bc22d7956bf3072b4bc3fbfe2a0d138ad0d4bcfee251e4e5f54d6e92b8fd4eb36958a7aeaeeb51e8d2fcc4552c3",
564 ).into()),
565 &[
566 ClientOptions::Host("localhost:3000"),
567 ClientOptions::Secure(false),
568 ClientOptions::UseCompression(false)
569 ],
570 ).unwrap();
571
572 // Open WebSocket notification channel closing it after first notify message is received
573 let notify_channel = Arc::new(Mutex::new(
574 ctn_client.new_ws_notify_channel(NotificationEvent::NewMsgReceived)
575 ));
576 let notify_channel_2 = notify_channel.clone();
577
578 let notify_thread = notify_channel.lock().unwrap()
579 // Note: we need to access a reference of notify_channel inside the notify_event_handler
580 // closure. That's why we need to wrap it around Arc<Mutex<>> (see above)
581 .open(move |event: WsNotifyChannelEvent| {
582 let notify_channel = notify_channel_2.lock().unwrap();
583
584 match event {
585 WsNotifyChannelEvent::Error(err) => {
586 println!(">>>>>> WebSocket Notification Channel: Error event: {:?}", err);
587 },
588 WsNotifyChannelEvent::Open => {
589 println!(">>>>>> WebSocket Notification Channel: Open event");
590 },
591 WsNotifyChannelEvent::Close(close_info) => {
592 println!(">>>>>> WebSocket Notification Channel: Close event: {:?}", close_info);
593 },
594 WsNotifyChannelEvent::Notify(notify_msg) => {
595 println!(">>>>>> WebSocket Notification Channel: Notify event: {:?}", notify_msg);
596 notify_channel.close();
597 },
598 }
599 }).unwrap();
600
601 // Set up timeout to close WebSocket notification channel if no notify message
602 // is received within a given period of time
603 let notify_channel_3 = notify_channel.clone();
604
605 thread::spawn(move || {
606 thread::sleep(std::time::Duration::from_secs(30));
607
608 notify_channel_3.lock().unwrap()
609 .close();
610 });
611
612 // Wait for notification thread to end
613 notify_thread.join().unwrap();
614 }
615}