ddmw_client/msg/
recv.rs

1//! Functions for receiving messages.
2
3use std::future::Future;
4use std::path::PathBuf;
5
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use tokio_stream::StreamExt;
9
10use tokio_util::codec::Framed;
11
12use async_trait::async_trait;
13
14use num::NumCast;
15
16use bytes::{Bytes, BytesMut};
17
18use blather::{codec, KVLines, Params, Telegram};
19
20use crate::err::Error;
21
22
23/// Application channel subscription identifier.
24pub enum SubCh {
25  Num(u8),
26  Name(String)
27}
28
29/// Channel subscription information context.
30pub struct SubInfo {
31  pub ch: SubCh
32}
33
34
35/// Subscribe to an application message channel on a connection to a
36/// subscription interface.
37pub async fn subscribe<C>(
38  conn: &mut Framed<C, blather::Codec>,
39  subinfo: SubInfo
40) -> Result<(), Error>
41where
42  C: AsyncRead + AsyncWrite + Unpin
43{
44  let mut tg = Telegram::new();
45  tg.set_topic("Sub")?;
46  match subinfo.ch {
47    SubCh::Num(ch) => {
48      tg.add_param("Ch", ch)?;
49    }
50    SubCh::Name(nm) => {
51      tg.add_param("Ch", nm)?;
52    }
53  }
54  crate::sendrecv(conn, &tg).await?;
55
56  Ok(())
57}
58
59
60/// Storage type used to request how a messsage's metadata and/or payload
61/// should be stored/parsed.
62pub enum StoreType {
63  /// Don't store
64  None,
65
66  /// Store it as a [`bytes::Bytes`]
67  Bytes,
68
69  /// Store it as a [`bytes::BytesMut`]
70  BytesMut,
71
72  /// Parse and store it in a [`blather::Params`] buffer
73  Params,
74
75  /// Parse and store it as a [`blather::KVLines`] buffer
76  KVLines,
77
78  /// Store it in a file
79  File(PathBuf)
80}
81
82
83/// Storage buffer for metadata and payload.
84pub enum Storage {
85  /// Return data as a [`bytes::Bytes`] buffer.
86  Bytes(Bytes),
87
88  /// Return data as a [`bytes::BytesMut`] buffer.
89  BytesMut(BytesMut),
90
91  /// Return data as a parsed [`Params`] buffer.
92  Params(Params),
93
94  /// Return data as a parsed [`KVLines`] buffer.
95  KVLines(KVLines),
96
97  /// A file whose location was requested by the application.
98  File(PathBuf),
99
100  /// A file whose location was specified by DDMW.  It is the responsibility
101  /// of the application to move the file from its current location to an
102  /// application specific storage, or delete the file if it is not relevant.
103  LocalFile(PathBuf)
104}
105
106
107/// Representation of a received message with its optional metadata and
108/// payload.
109pub struct Msg {
110  pub cmd: u32,
111  pub meta: Option<Storage>,
112  pub payload: Option<Storage>
113}
114
115
116/// Message information buffer passed to storate request callback closures.
117pub struct MsgInfo {
118  /// Message command number.
119  pub cmd: u32,
120
121  /// Length of message metadata.
122  pub metalen: u32,
123
124  /// Length of message payload.
125  pub payloadlen: u64
126}
127
128
129/// Receive a single message, allowing a closure to select how to store
130/// metadata and payload based on the message header.
131///
132/// The `storeq` is a closure that, if there's metadata and/or payload, can be
133/// called to allow the application to request how the metadata and payload
134/// will be stored.  It's only argument is a reference to a [`Params`] buffer
135/// which was extracted from the incoming `Msg` `Telegram`.
136///
137/// It is up to the closure to return a tuple of two `StorageType` enum values,
138/// where the first one denotes the requested storage type for the metadata,
139/// and the second one denotes the requested storage type for the message
140/// payload.
141///
142/// # Notes
143/// - The `storeq` closure is not called if there's neither metadata nor
144///   payload associated with the incoming message.
145/// - `storeq` is referred to as a _request_ because the function does not
146///   necessarily need to respect the exact choice.  Specifically, there are
147///   two special cases:
148///   - If the size of metadata or payload (but not both) is zero, then its
149///     respective member in [`Msg`] will be None.  (For instance, specifying a
150///     file will not yield an empty file).
151///   - If the received content was stored as a local file that was stored by
152///     the DDMW server, then the library will always return a
153///     [`Storage::LocalFile`] for this content.
154///
155/// # Example
156///
157/// ```no_run
158/// use std::path::PathBuf;
159/// use tokio::net::TcpStream;
160/// use tokio_util::codec::Framed;
161/// use ddmw_client::{
162///   conn,
163///   msg::{
164///     self,
165///     recv::{StoreType, Msg}
166///   }
167/// };
168///
169/// // Enter an loop which keeps receiving messages until the connection is
170/// // dropped.
171/// async fn get_message(conn: &mut Framed<TcpStream, blather::Codec>) -> Msg {
172///   msg::recv_c(
173///     conn,
174///     |_mi| {
175///       // A new message is arriving; give the application the opportunity to
176///       // choose how to store the message metadata and payload.
177///       // Request file storage.
178///       // Note:  This request may be overridden.
179///       let metafile = PathBuf::from("msg.meta");
180///       let payloadfile = PathBuf::from("msg.payload");
181///       Ok((StoreType::File(metafile), StoreType::File(payloadfile)))
182///     },
183///   ).await.unwrap()
184/// }
185/// ```
186pub async fn recv_c<C, S>(
187  conn: &mut Framed<C, blather::Codec>,
188  storeq: S
189) -> Result<Msg, Error>
190where
191  C: AsyncRead + AsyncWrite + Unpin,
192  S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>
193{
194  // Wait for the next frame, and exiect it to be a Telegram.
195  if let Some(o) = conn.next().await {
196    let o = o?;
197    match o {
198      codec::Input::Telegram(tg) => {
199        // Got the expetected Telegram -- make sure that it's has a "Msg"
200        // topic.
201        if let Some(topic) = tg.get_topic() {
202          if topic == "Msg" {
203            // Convert to a Params buffer, since we no longer need the topic
204            let mp = tg.into_params();
205
206            return proc_inbound_msg(conn, mp, storeq).await;
207          } else if topic == "Fail" {
208            return Err(Error::ServerError(tg.into_params()));
209          }
210        }
211      }
212      _ => {
213        return Err(Error::BadState(
214          "Unexpected codec input type.".to_string()
215        ));
216      }
217    }
218    return Err(Error::bad_state("Unexpected reply from server."));
219  }
220
221  Err(Error::Disconnected)
222}
223
224
225/// Enter a loop which will keep receiving messages until connection is closed
226/// or a killswitch is triggered, giving the application the choice of
227/// metadata/payload storage though a closure.
228///
229/// Returns `Ok()` if the loop was terminated by the killswitch.
230///
231/// # Example
232/// The following example illustrates how to write a function that will keep
233/// receiving messages.
234///
235/// ```no_run
236/// use std::path::PathBuf;
237/// use tokio::net::TcpStream;
238/// use tokio_util::codec::Framed;
239/// use ddmw_client::{
240///   conn,
241///   msg::{self, recv::StoreType}
242/// };
243///
244/// // Enter an loop which keeps receiving messages until the connection is
245/// // dropped.
246/// async fn get_messages(conn: &mut Framed<TcpStream, blather::Codec>) {
247///   let mut idx = 0;
248///
249///   msg::recvloop_c(
250///     conn,
251///     None,
252///     |mi| {
253///       // A new message is arriving; give the application the opportunity to
254///       // choose how to store the message metadata and payload.
255///
256///       // Choose what to do with message metadata
257///       let meta_store = if mi.metalen > 1024*1024 {
258///         // Too big
259///         StoreType::None
260///       } else {
261///         // Store it in a memory buffer
262///         StoreType::Bytes
263///       };
264///
265///       // Choose what to do with message payload
266///       let payload_store = if mi.payloadlen > 16*1024*1024 {
267///         // Bigger than 16MB; too big, just ignore it
268///         StoreType::None
269///       } else if mi.payloadlen > 256*1024 {
270///         // It's bigger than 256K -- store it in a file
271///         let payloadfile = format!("{:x}.payload", idx);
272///         idx += 1;
273///         StoreType::File(PathBuf::from(payloadfile))
274///       } else {
275///         // It's small enough to store in a memory buffer
276///         StoreType::Bytes
277///       };
278///
279///       Ok((meta_store, payload_store))
280///     },
281///     |msg| {
282///       // Process message
283///       Ok(())
284///     }
285///   ).await.unwrap();
286/// }
287/// ```
288// ToDo: yield, when it becomes available
289pub async fn recvloop_c<C, S, P>(
290  conn: &mut Framed<C, blather::Codec>,
291  kill: Option<killswitch::Shutdown>,
292  mut storeq: S,
293  procmsg: P
294) -> Result<(), Error>
295where
296  C: AsyncRead + AsyncWrite + Unpin,
297  S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>,
298  P: Fn(Msg) -> Result<(), Error>
299{
300  if let Some(kill) = kill {
301    loop {
302      tokio::select! {
303        msg = recv_c(conn, &mut storeq) => {
304          let msg = msg?;
305          procmsg(msg)?;
306        }
307        _ = kill.wait() => {
308          // An external termination request was received, so break out of loop
309          break;
310        }
311      }
312    }
313  } else {
314    // No killswitch supplied -- just keep running until disconnection
315    loop {
316      let msg = recv_c(conn, &mut storeq).await?;
317      procmsg(msg)?;
318    }
319  }
320
321  Ok(())
322}
323
324
325/// This is the same as [`recvloop_c()`], but it assumes the message
326/// processing closure returns a [`Future`].
327///
328/// # Example
329///
330/// ```no_run
331/// use std::path::PathBuf;
332/// use tokio::net::TcpStream;
333/// use tokio_util::codec::Framed;
334/// use ddmw_client::{
335///   conn,
336///   msg::{self, recv::StoreType}
337/// };
338///
339/// // Enter an loop which keeps receiving messages until the connection is
340/// // dropped.
341/// async fn get_messages(conn: &mut Framed<TcpStream, blather::Codec>) {
342///   let mut idx = 0;
343///
344///   msg::recvloop_ca(
345///     conn,
346///     None,
347///     |mi| {
348///       Ok((StoreType::Bytes, StoreType::Bytes))
349///     },
350///     |msg| {
351///       async {
352///         // Process message
353///         Ok(())
354///       }
355///     }
356///   ).await.unwrap();
357/// }
358/// ```
359pub async fn recvloop_ca<C, S, F, P>(
360  conn: &mut Framed<C, blather::Codec>,
361  kill: Option<killswitch::Shutdown>,
362  mut storeq: S,
363  procmsg: P
364) -> Result<(), Error>
365where
366  C: AsyncRead + AsyncWrite + Unpin,
367  S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>,
368  F: Future<Output = Result<(), Error>>,
369  P: Fn(Msg) -> F
370{
371  if let Some(kill) = kill {
372    loop {
373      tokio::select! {
374        msg = recv_c(conn, &mut storeq) => {
375          let msg = msg?;
376          procmsg(msg).await?;
377        }
378        _ = kill.wait() => {
379          // An external termination request was received, so break out of loop
380          break;
381        }
382      }
383    }
384  } else {
385    // No killswitch supplied -- just keep running until disconnection
386    loop {
387      let msg = recv_c(conn, &mut storeq).await?;
388      procmsg(msg).await?;
389    }
390  }
391
392  Ok(())
393}
394
395
396async fn proc_inbound_msg<C, S>(
397  conn: &mut Framed<C, blather::Codec>,
398  mp: Params,
399  mut storeq: S
400) -> Result<Msg, Error>
401where
402  C: AsyncRead + AsyncWrite + Unpin,
403  S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>
404{
405  let (cmd, metalen, payloadlen) = parse_header(&mp)?;
406
407  // ToDo: Parse mp and check if metadata and/or payload is passed from the
408  //       server using a local file path.  If it is, then return it to the
409  //       application using Storage::LocalFile(PathBuf).
410
411  // If the Params contains either a Len or a MetaLen keyword, then
412  // call the application callback to determine how it wants the data
413  // stored.
414  // ToDo: - If metadata and payload are stored as "local files", then don't
415  //         call application; force to Storage::LocalFile
416  let (meta_store, payload_store) = if metalen != 0 || payloadlen != 0 {
417    // Call the application callback, passing a few Msg parameters, to ask it
418    // in what form it would like the metadata and payload.
419
420    let mi = MsgInfo {
421      cmd,
422      metalen,
423      payloadlen
424    };
425
426    let (ms, ps) = storeq(&mi)?;
427    let ms = if metalen != 0 { Some(ms) } else { None };
428    let ps = if payloadlen != 0 { Some(ps) } else { None };
429    (ms, ps)
430  } else {
431    (None, None)
432  };
433
434  //
435  // At this point, if meta_store is None, it means there was no message
436  // metadata, and we'll skip this altogether and return None to the
437  // application for the metadata.
438  //
439  // If meta_store is Some, then request the appropriate type from the
440  // blather's Codec.
441  //
442  let meta = get_content_to(conn, metalen, meta_store).await?;
443
444  let payload = get_content_to(conn, payloadlen, payload_store).await?;
445
446
447  Ok(Msg { cmd, meta, payload })
448}
449
450
451/// Get content of a given size to a chosen storage type.
452///
453/// Returns `Ok(None)` if the size is zero or if the storage type is
454/// `StorageType::None`.
455async fn get_content_to<C, S>(
456  conn: &mut Framed<C, blather::Codec>,
457  size: S,
458  store_type: Option<StoreType>
459) -> Result<Option<Storage>, Error>
460where
461  C: AsyncRead + AsyncWrite + Unpin,
462  S: NumCast
463{
464  if let Some(store_type) = store_type {
465    match store_type {
466      StoreType::None => {
467        // This happens if the Len is non-zero, but the callback says it
468        // doesn't want the data.
469        conn.codec_mut().skip(num::cast(size).unwrap())?;
470      }
471      StoreType::Bytes => {
472        conn.codec_mut().expect_bytes(num::cast(size).unwrap())?;
473      }
474      StoreType::BytesMut => {
475        conn.codec_mut().expect_bytesmut(num::cast(size).unwrap())?;
476      }
477      StoreType::Params => {
478        conn.codec_mut().expect_params();
479      }
480      StoreType::KVLines => {
481        conn.codec_mut().expect_kvlines();
482      }
483      StoreType::File(ref fname) => {
484        conn
485          .codec_mut()
486          .expect_file(fname, num::cast(size).unwrap())?;
487      }
488    }
489
490    get_content(conn).await
491  } else {
492    Ok(None)
493  }
494}
495
496
497/// Translate an incoming frame from the [`blather::Codec`] into a [`Storage`]
498/// type.
499async fn get_content<C>(
500  conn: &mut Framed<C, blather::Codec>
501) -> Result<Option<Storage>, Error>
502where
503  C: AsyncRead + AsyncWrite + Unpin
504{
505  if let Some(o) = conn.next().await {
506    let o = o?;
507    match o {
508      codec::Input::SkipDone => Ok(None),
509      codec::Input::Bytes(bytes) => Ok(Some(Storage::Bytes(bytes))),
510      codec::Input::BytesMut(bytes) => Ok(Some(Storage::BytesMut(bytes))),
511      codec::Input::Params(params) => Ok(Some(Storage::Params(params))),
512      codec::Input::KVLines(kvlines) => Ok(Some(Storage::KVLines(kvlines))),
513      codec::Input::File(fname) => Ok(Some(Storage::File(fname))),
514      _ => Err(Error::bad_state("Unexpected codec input type."))
515    }
516  } else {
517    Err(Error::Disconnected)
518  }
519}
520
521
522/// Implements callback methods for incoming message header and payloads.
523///
524/// The [`recv_h`] function and the [`recvloop_h`] require an object which
525/// implements this trait.
526#[async_trait]
527pub trait Handler {
528  /// Called once a message header has been received to allow the application
529  /// to choose how to store the meta and payload content.
530  async fn on_header(
531    &mut self,
532    mi: &MsgInfo
533  ) -> Result<(StoreType, StoreType), Error>;
534
535  /// Called once the metadata and payload have been received.
536  async fn on_data(&mut self, msg: Msg) -> Result<(), Error>;
537}
538
539
540/// Receive a single message, allowing a [`Handler`] method to select how to
541/// store metadata and payload based on the message header.
542///
543/// Once the message header has been received, parse it and call the supplied
544/// [`Handler`]'s [`on_header()`](Handler::on_header) method to determine how
545/// to store the message's metadata and payload content.
546pub async fn recv_h<C>(
547  conn: &mut Framed<C, blather::Codec>,
548  handler: &mut Box<dyn Handler + Send + Sync>
549) -> Result<Msg, Error>
550where
551  C: AsyncRead + AsyncWrite + Unpin
552{
553  // Wait for the next frame, and exiect it to be a Telegram.
554  if let Some(o) = conn.next().await {
555    let o = o?;
556    match o {
557      codec::Input::Telegram(tg) => {
558        // Got the expetected Telegram -- make sure that it's has a "Msg"
559        // topic.
560        if let Some(topic) = tg.get_topic() {
561          if topic == "Msg" {
562            // Convert to a Params buffer, since we no longer need the topic
563            let mp = tg.into_params();
564
565            return proc_inbound_msg_h(conn, mp, handler).await;
566          } else if topic == "Fail" {
567            return Err(Error::ServerError(tg.into_params()));
568          }
569        }
570      }
571      _ => {
572        return Err(Error::BadState(
573          "Unexpected codec input type.".to_string()
574        ));
575      }
576    }
577    return Err(Error::bad_state("Unexpected reply from server."));
578  }
579
580  Err(Error::Disconnected)
581}
582
583
584/// Given a `Params` buffer of a received `Msg` telegram, return a tuple
585/// containing `(command id, metadata length, payload length)`.
586fn parse_header(mp: &Params) -> Result<(u32, u32, u64), Error> {
587  let cmd = if mp.have("Cmd") {
588    mp.get_param::<u32>("Cmd")?
589  } else {
590    0
591  };
592
593  let metalen = if mp.have("MetaLen") {
594    mp.get_param::<u32>("MetaLen")?
595  } else {
596    0u32
597  };
598
599  let payloadlen = if mp.have("Len") {
600    mp.get_param::<u64>("Len")?
601  } else {
602    0u64
603  };
604
605  Ok((cmd, metalen, payloadlen))
606}
607
608
609/// Get a single message, given a parsed Msg header.
610///
611/// `mp` is the message parameters extracted from a Msg telegram.
612async fn proc_inbound_msg_h<C>(
613  conn: &mut Framed<C, blather::Codec>,
614  mp: Params,
615  handler: &mut Box<dyn Handler + Send + Sync>
616) -> Result<Msg, Error>
617where
618  C: AsyncRead + AsyncWrite + Unpin
619{
620  let (cmd, metalen, payloadlen) = parse_header(&mp)?;
621
622  // ToDo: Parse mp and check if metadata and/or payload is passed from the
623  //       server using a local file path.  If it is, then return it to the
624  //       application using Storage::LocalFile(PathBuf).
625
626  // If the Params contains either a Len or a MetaLen keyword, then
627  // call the application callback to determine how it wants the data
628  // stored.
629  // ToDo: - If metadata and payload are stored as "local files", then don't
630  //         call application; force to Storage::LocalFile
631  let (meta_store, payload_store) = if metalen != 0 || payloadlen != 0 {
632    // Call the application callback, passing a few Msg parameters, to ask it
633    // in what form it would like the metadata and payload.
634
635    let mi = MsgInfo {
636      cmd,
637      metalen,
638      payloadlen
639    };
640
641    let (ms, ps) = handler.on_header(&mi).await?;
642    let ms = if metalen != 0 { Some(ms) } else { None };
643    let ps = if payloadlen != 0 { Some(ps) } else { None };
644    (ms, ps)
645  } else {
646    (None, None)
647  };
648
649  //
650  // At this point, if meta_store is None, it means there was no message
651  // metadata, and we'll skip this altogether and return None to the
652  // application for the metadata.
653  //
654  // If meta_store is Some, then request the appropriate type from the
655  // blather's Codec.
656  //
657  let meta = get_content_to(conn, metalen, meta_store).await?;
658
659  let payload = get_content_to(conn, payloadlen, payload_store).await?;
660
661  Ok(Msg { cmd, meta, payload })
662}
663
664
665/// Enter a loop which will keep receiving messages to a [`Handler`] object
666/// until connection is closed or an optional killswitch is triggered.
667///
668/// Returns `Ok()` if the loop was terminated by the killswitch.
669///
670/// # Example
671/// Enter a loop which will receive messages:
672/// - Ignore metadata which is larder than 1MB
673/// - Ignore payloads larger than 16MB, larger than 256KB are stored in a file,
674///   and other payloads are stored in byte buffers.
675///
676/// Messages are only received, but not processed.
677///
678/// ```no_run
679/// use async_trait::async_trait;
680/// use std::path::PathBuf;
681/// use std::str::FromStr;
682/// use ddmw_client::{
683///   conn,
684///   msg::{self, recv::{StoreType, Handler, MsgInfo, Msg, SubInfo, SubCh}}
685/// };
686///
687/// struct RecvProc {
688///   idx: usize
689/// }
690///
691/// #[async_trait]
692/// impl Handler for RecvProc {
693///   async fn on_header(
694///     &mut self,
695///     mi: &MsgInfo
696///   ) -> Result<(StoreType, StoreType), ddmw_client::Error> {
697///     // A new message is arriving; give the application the opportunity
698///     // to choose how to store the message metadata and payload.
699///
700///     // Choose what to do with message metadata
701///     let meta_store = if mi.metalen > 1024*1024 {
702///       // Too big
703///       StoreType::None
704///     } else {
705///       // Store it in a memory buffer
706///       StoreType::Bytes
707///     };
708///
709///     // Choose what to do with message payload
710///     let payload_store = if mi.payloadlen > 16*1024*1024 {
711///       // Bigger than 16MB; too big, just ignore it
712///       StoreType::None
713///     } else if mi.payloadlen > 256*1024 {
714///       // It's bigger than 256K -- store it in a file
715///       let payloadfile = format!("{:x}.payload", self.idx);
716///       StoreType::File(PathBuf::from(payloadfile))
717///     } else {
718///       // It's small enough to store in a memory buffer
719///       StoreType::Bytes
720///     };
721///
722///     Ok((meta_store, payload_store))
723///   }
724///
725///   async fn on_data(&mut self, msg: Msg) -> Result<(), ddmw_client::Error> {
726///     println!("Process message");
727///
728///     // ToDo: Do things with message buffer here
729///
730///     Ok(())
731///   }
732/// }
733///
734/// async fn connect_and_receive_messages() {
735///   let pa = conn::ProtAddr::from_str("127.0.0.1:4100").unwrap();
736///   let mut conn = conn::connect(&pa, None).await.unwrap();
737///
738///   let subinfo = SubInfo { ch: SubCh::Num(11) };
739///   ddmw_client::msg::recv::subscribe(&mut conn, subinfo).await.unwrap();
740///
741///   let mut handler = Box::new(RecvProc { idx: 0 })
742///       as Box<dyn Handler + Sync + Send>;
743///
744///   msg::recvloop_h(&mut conn, None, &mut handler).await.unwrap();
745/// }
746/// ```
747// ToDo: yield, when it becomes available
748pub async fn recvloop_h<C>(
749  conn: &mut Framed<C, blather::Codec>,
750  kill: Option<killswitch::Shutdown>,
751  handler: &mut Box<dyn Handler + Send + Sync>
752) -> Result<(), Error>
753where
754  C: AsyncRead + AsyncWrite + Unpin
755{
756  if let Some(kill) = kill {
757    loop {
758      tokio::select! {
759        msg = recv_h(conn, handler) => {
760          let msg = msg?;
761          handler.on_data(msg).await?;
762        }
763        _ = kill.wait() => {
764          // An external termination request was received, so break out of loop
765          break;
766        }
767      }
768    }
769  } else {
770    // No killswitch supplied -- just keep running until disconnection
771    loop {
772      let msg = recv_h(conn, handler).await?;
773      handler.on_data(msg).await?;
774    }
775  }
776
777  Ok(())
778}
779
780// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :