ddmw_client/msg/recv.rs
1//! Functions for receiving messages.
2
3use std::future::Future;
4use std::path::PathBuf;
5
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use tokio_stream::StreamExt;
9
10use tokio_util::codec::Framed;
11
12use async_trait::async_trait;
13
14use num::NumCast;
15
16use bytes::{Bytes, BytesMut};
17
18use blather::{codec, KVLines, Params, Telegram};
19
20use crate::err::Error;
21
22
23/// Application channel subscription identifier.
24pub enum SubCh {
25 Num(u8),
26 Name(String)
27}
28
29/// Channel subscription information context.
30pub struct SubInfo {
31 pub ch: SubCh
32}
33
34
35/// Subscribe to an application message channel on a connection to a
36/// subscription interface.
37pub async fn subscribe<C>(
38 conn: &mut Framed<C, blather::Codec>,
39 subinfo: SubInfo
40) -> Result<(), Error>
41where
42 C: AsyncRead + AsyncWrite + Unpin
43{
44 let mut tg = Telegram::new();
45 tg.set_topic("Sub")?;
46 match subinfo.ch {
47 SubCh::Num(ch) => {
48 tg.add_param("Ch", ch)?;
49 }
50 SubCh::Name(nm) => {
51 tg.add_param("Ch", nm)?;
52 }
53 }
54 crate::sendrecv(conn, &tg).await?;
55
56 Ok(())
57}
58
59
60/// Storage type used to request how a messsage's metadata and/or payload
61/// should be stored/parsed.
62pub enum StoreType {
63 /// Don't store
64 None,
65
66 /// Store it as a [`bytes::Bytes`]
67 Bytes,
68
69 /// Store it as a [`bytes::BytesMut`]
70 BytesMut,
71
72 /// Parse and store it in a [`blather::Params`] buffer
73 Params,
74
75 /// Parse and store it as a [`blather::KVLines`] buffer
76 KVLines,
77
78 /// Store it in a file
79 File(PathBuf)
80}
81
82
83/// Storage buffer for metadata and payload.
84pub enum Storage {
85 /// Return data as a [`bytes::Bytes`] buffer.
86 Bytes(Bytes),
87
88 /// Return data as a [`bytes::BytesMut`] buffer.
89 BytesMut(BytesMut),
90
91 /// Return data as a parsed [`Params`] buffer.
92 Params(Params),
93
94 /// Return data as a parsed [`KVLines`] buffer.
95 KVLines(KVLines),
96
97 /// A file whose location was requested by the application.
98 File(PathBuf),
99
100 /// A file whose location was specified by DDMW. It is the responsibility
101 /// of the application to move the file from its current location to an
102 /// application specific storage, or delete the file if it is not relevant.
103 LocalFile(PathBuf)
104}
105
106
107/// Representation of a received message with its optional metadata and
108/// payload.
109pub struct Msg {
110 pub cmd: u32,
111 pub meta: Option<Storage>,
112 pub payload: Option<Storage>
113}
114
115
116/// Message information buffer passed to storate request callback closures.
117pub struct MsgInfo {
118 /// Message command number.
119 pub cmd: u32,
120
121 /// Length of message metadata.
122 pub metalen: u32,
123
124 /// Length of message payload.
125 pub payloadlen: u64
126}
127
128
129/// Receive a single message, allowing a closure to select how to store
130/// metadata and payload based on the message header.
131///
132/// The `storeq` is a closure that, if there's metadata and/or payload, can be
133/// called to allow the application to request how the metadata and payload
134/// will be stored. It's only argument is a reference to a [`Params`] buffer
135/// which was extracted from the incoming `Msg` `Telegram`.
136///
137/// It is up to the closure to return a tuple of two `StorageType` enum values,
138/// where the first one denotes the requested storage type for the metadata,
139/// and the second one denotes the requested storage type for the message
140/// payload.
141///
142/// # Notes
143/// - The `storeq` closure is not called if there's neither metadata nor
144/// payload associated with the incoming message.
145/// - `storeq` is referred to as a _request_ because the function does not
146/// necessarily need to respect the exact choice. Specifically, there are
147/// two special cases:
148/// - If the size of metadata or payload (but not both) is zero, then its
149/// respective member in [`Msg`] will be None. (For instance, specifying a
150/// file will not yield an empty file).
151/// - If the received content was stored as a local file that was stored by
152/// the DDMW server, then the library will always return a
153/// [`Storage::LocalFile`] for this content.
154///
155/// # Example
156///
157/// ```no_run
158/// use std::path::PathBuf;
159/// use tokio::net::TcpStream;
160/// use tokio_util::codec::Framed;
161/// use ddmw_client::{
162/// conn,
163/// msg::{
164/// self,
165/// recv::{StoreType, Msg}
166/// }
167/// };
168///
169/// // Enter an loop which keeps receiving messages until the connection is
170/// // dropped.
171/// async fn get_message(conn: &mut Framed<TcpStream, blather::Codec>) -> Msg {
172/// msg::recv_c(
173/// conn,
174/// |_mi| {
175/// // A new message is arriving; give the application the opportunity to
176/// // choose how to store the message metadata and payload.
177/// // Request file storage.
178/// // Note: This request may be overridden.
179/// let metafile = PathBuf::from("msg.meta");
180/// let payloadfile = PathBuf::from("msg.payload");
181/// Ok((StoreType::File(metafile), StoreType::File(payloadfile)))
182/// },
183/// ).await.unwrap()
184/// }
185/// ```
186pub async fn recv_c<C, S>(
187 conn: &mut Framed<C, blather::Codec>,
188 storeq: S
189) -> Result<Msg, Error>
190where
191 C: AsyncRead + AsyncWrite + Unpin,
192 S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>
193{
194 // Wait for the next frame, and exiect it to be a Telegram.
195 if let Some(o) = conn.next().await {
196 let o = o?;
197 match o {
198 codec::Input::Telegram(tg) => {
199 // Got the expetected Telegram -- make sure that it's has a "Msg"
200 // topic.
201 if let Some(topic) = tg.get_topic() {
202 if topic == "Msg" {
203 // Convert to a Params buffer, since we no longer need the topic
204 let mp = tg.into_params();
205
206 return proc_inbound_msg(conn, mp, storeq).await;
207 } else if topic == "Fail" {
208 return Err(Error::ServerError(tg.into_params()));
209 }
210 }
211 }
212 _ => {
213 return Err(Error::BadState(
214 "Unexpected codec input type.".to_string()
215 ));
216 }
217 }
218 return Err(Error::bad_state("Unexpected reply from server."));
219 }
220
221 Err(Error::Disconnected)
222}
223
224
225/// Enter a loop which will keep receiving messages until connection is closed
226/// or a killswitch is triggered, giving the application the choice of
227/// metadata/payload storage though a closure.
228///
229/// Returns `Ok()` if the loop was terminated by the killswitch.
230///
231/// # Example
232/// The following example illustrates how to write a function that will keep
233/// receiving messages.
234///
235/// ```no_run
236/// use std::path::PathBuf;
237/// use tokio::net::TcpStream;
238/// use tokio_util::codec::Framed;
239/// use ddmw_client::{
240/// conn,
241/// msg::{self, recv::StoreType}
242/// };
243///
244/// // Enter an loop which keeps receiving messages until the connection is
245/// // dropped.
246/// async fn get_messages(conn: &mut Framed<TcpStream, blather::Codec>) {
247/// let mut idx = 0;
248///
249/// msg::recvloop_c(
250/// conn,
251/// None,
252/// |mi| {
253/// // A new message is arriving; give the application the opportunity to
254/// // choose how to store the message metadata and payload.
255///
256/// // Choose what to do with message metadata
257/// let meta_store = if mi.metalen > 1024*1024 {
258/// // Too big
259/// StoreType::None
260/// } else {
261/// // Store it in a memory buffer
262/// StoreType::Bytes
263/// };
264///
265/// // Choose what to do with message payload
266/// let payload_store = if mi.payloadlen > 16*1024*1024 {
267/// // Bigger than 16MB; too big, just ignore it
268/// StoreType::None
269/// } else if mi.payloadlen > 256*1024 {
270/// // It's bigger than 256K -- store it in a file
271/// let payloadfile = format!("{:x}.payload", idx);
272/// idx += 1;
273/// StoreType::File(PathBuf::from(payloadfile))
274/// } else {
275/// // It's small enough to store in a memory buffer
276/// StoreType::Bytes
277/// };
278///
279/// Ok((meta_store, payload_store))
280/// },
281/// |msg| {
282/// // Process message
283/// Ok(())
284/// }
285/// ).await.unwrap();
286/// }
287/// ```
288// ToDo: yield, when it becomes available
289pub async fn recvloop_c<C, S, P>(
290 conn: &mut Framed<C, blather::Codec>,
291 kill: Option<killswitch::Shutdown>,
292 mut storeq: S,
293 procmsg: P
294) -> Result<(), Error>
295where
296 C: AsyncRead + AsyncWrite + Unpin,
297 S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>,
298 P: Fn(Msg) -> Result<(), Error>
299{
300 if let Some(kill) = kill {
301 loop {
302 tokio::select! {
303 msg = recv_c(conn, &mut storeq) => {
304 let msg = msg?;
305 procmsg(msg)?;
306 }
307 _ = kill.wait() => {
308 // An external termination request was received, so break out of loop
309 break;
310 }
311 }
312 }
313 } else {
314 // No killswitch supplied -- just keep running until disconnection
315 loop {
316 let msg = recv_c(conn, &mut storeq).await?;
317 procmsg(msg)?;
318 }
319 }
320
321 Ok(())
322}
323
324
325/// This is the same as [`recvloop_c()`], but it assumes the message
326/// processing closure returns a [`Future`].
327///
328/// # Example
329///
330/// ```no_run
331/// use std::path::PathBuf;
332/// use tokio::net::TcpStream;
333/// use tokio_util::codec::Framed;
334/// use ddmw_client::{
335/// conn,
336/// msg::{self, recv::StoreType}
337/// };
338///
339/// // Enter an loop which keeps receiving messages until the connection is
340/// // dropped.
341/// async fn get_messages(conn: &mut Framed<TcpStream, blather::Codec>) {
342/// let mut idx = 0;
343///
344/// msg::recvloop_ca(
345/// conn,
346/// None,
347/// |mi| {
348/// Ok((StoreType::Bytes, StoreType::Bytes))
349/// },
350/// |msg| {
351/// async {
352/// // Process message
353/// Ok(())
354/// }
355/// }
356/// ).await.unwrap();
357/// }
358/// ```
359pub async fn recvloop_ca<C, S, F, P>(
360 conn: &mut Framed<C, blather::Codec>,
361 kill: Option<killswitch::Shutdown>,
362 mut storeq: S,
363 procmsg: P
364) -> Result<(), Error>
365where
366 C: AsyncRead + AsyncWrite + Unpin,
367 S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>,
368 F: Future<Output = Result<(), Error>>,
369 P: Fn(Msg) -> F
370{
371 if let Some(kill) = kill {
372 loop {
373 tokio::select! {
374 msg = recv_c(conn, &mut storeq) => {
375 let msg = msg?;
376 procmsg(msg).await?;
377 }
378 _ = kill.wait() => {
379 // An external termination request was received, so break out of loop
380 break;
381 }
382 }
383 }
384 } else {
385 // No killswitch supplied -- just keep running until disconnection
386 loop {
387 let msg = recv_c(conn, &mut storeq).await?;
388 procmsg(msg).await?;
389 }
390 }
391
392 Ok(())
393}
394
395
396async fn proc_inbound_msg<C, S>(
397 conn: &mut Framed<C, blather::Codec>,
398 mp: Params,
399 mut storeq: S
400) -> Result<Msg, Error>
401where
402 C: AsyncRead + AsyncWrite + Unpin,
403 S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>
404{
405 let (cmd, metalen, payloadlen) = parse_header(&mp)?;
406
407 // ToDo: Parse mp and check if metadata and/or payload is passed from the
408 // server using a local file path. If it is, then return it to the
409 // application using Storage::LocalFile(PathBuf).
410
411 // If the Params contains either a Len or a MetaLen keyword, then
412 // call the application callback to determine how it wants the data
413 // stored.
414 // ToDo: - If metadata and payload are stored as "local files", then don't
415 // call application; force to Storage::LocalFile
416 let (meta_store, payload_store) = if metalen != 0 || payloadlen != 0 {
417 // Call the application callback, passing a few Msg parameters, to ask it
418 // in what form it would like the metadata and payload.
419
420 let mi = MsgInfo {
421 cmd,
422 metalen,
423 payloadlen
424 };
425
426 let (ms, ps) = storeq(&mi)?;
427 let ms = if metalen != 0 { Some(ms) } else { None };
428 let ps = if payloadlen != 0 { Some(ps) } else { None };
429 (ms, ps)
430 } else {
431 (None, None)
432 };
433
434 //
435 // At this point, if meta_store is None, it means there was no message
436 // metadata, and we'll skip this altogether and return None to the
437 // application for the metadata.
438 //
439 // If meta_store is Some, then request the appropriate type from the
440 // blather's Codec.
441 //
442 let meta = get_content_to(conn, metalen, meta_store).await?;
443
444 let payload = get_content_to(conn, payloadlen, payload_store).await?;
445
446
447 Ok(Msg { cmd, meta, payload })
448}
449
450
451/// Get content of a given size to a chosen storage type.
452///
453/// Returns `Ok(None)` if the size is zero or if the storage type is
454/// `StorageType::None`.
455async fn get_content_to<C, S>(
456 conn: &mut Framed<C, blather::Codec>,
457 size: S,
458 store_type: Option<StoreType>
459) -> Result<Option<Storage>, Error>
460where
461 C: AsyncRead + AsyncWrite + Unpin,
462 S: NumCast
463{
464 if let Some(store_type) = store_type {
465 match store_type {
466 StoreType::None => {
467 // This happens if the Len is non-zero, but the callback says it
468 // doesn't want the data.
469 conn.codec_mut().skip(num::cast(size).unwrap())?;
470 }
471 StoreType::Bytes => {
472 conn.codec_mut().expect_bytes(num::cast(size).unwrap())?;
473 }
474 StoreType::BytesMut => {
475 conn.codec_mut().expect_bytesmut(num::cast(size).unwrap())?;
476 }
477 StoreType::Params => {
478 conn.codec_mut().expect_params();
479 }
480 StoreType::KVLines => {
481 conn.codec_mut().expect_kvlines();
482 }
483 StoreType::File(ref fname) => {
484 conn
485 .codec_mut()
486 .expect_file(fname, num::cast(size).unwrap())?;
487 }
488 }
489
490 get_content(conn).await
491 } else {
492 Ok(None)
493 }
494}
495
496
497/// Translate an incoming frame from the [`blather::Codec`] into a [`Storage`]
498/// type.
499async fn get_content<C>(
500 conn: &mut Framed<C, blather::Codec>
501) -> Result<Option<Storage>, Error>
502where
503 C: AsyncRead + AsyncWrite + Unpin
504{
505 if let Some(o) = conn.next().await {
506 let o = o?;
507 match o {
508 codec::Input::SkipDone => Ok(None),
509 codec::Input::Bytes(bytes) => Ok(Some(Storage::Bytes(bytes))),
510 codec::Input::BytesMut(bytes) => Ok(Some(Storage::BytesMut(bytes))),
511 codec::Input::Params(params) => Ok(Some(Storage::Params(params))),
512 codec::Input::KVLines(kvlines) => Ok(Some(Storage::KVLines(kvlines))),
513 codec::Input::File(fname) => Ok(Some(Storage::File(fname))),
514 _ => Err(Error::bad_state("Unexpected codec input type."))
515 }
516 } else {
517 Err(Error::Disconnected)
518 }
519}
520
521
522/// Implements callback methods for incoming message header and payloads.
523///
524/// The [`recv_h`] function and the [`recvloop_h`] require an object which
525/// implements this trait.
526#[async_trait]
527pub trait Handler {
528 /// Called once a message header has been received to allow the application
529 /// to choose how to store the meta and payload content.
530 async fn on_header(
531 &mut self,
532 mi: &MsgInfo
533 ) -> Result<(StoreType, StoreType), Error>;
534
535 /// Called once the metadata and payload have been received.
536 async fn on_data(&mut self, msg: Msg) -> Result<(), Error>;
537}
538
539
540/// Receive a single message, allowing a [`Handler`] method to select how to
541/// store metadata and payload based on the message header.
542///
543/// Once the message header has been received, parse it and call the supplied
544/// [`Handler`]'s [`on_header()`](Handler::on_header) method to determine how
545/// to store the message's metadata and payload content.
546pub async fn recv_h<C>(
547 conn: &mut Framed<C, blather::Codec>,
548 handler: &mut Box<dyn Handler + Send + Sync>
549) -> Result<Msg, Error>
550where
551 C: AsyncRead + AsyncWrite + Unpin
552{
553 // Wait for the next frame, and exiect it to be a Telegram.
554 if let Some(o) = conn.next().await {
555 let o = o?;
556 match o {
557 codec::Input::Telegram(tg) => {
558 // Got the expetected Telegram -- make sure that it's has a "Msg"
559 // topic.
560 if let Some(topic) = tg.get_topic() {
561 if topic == "Msg" {
562 // Convert to a Params buffer, since we no longer need the topic
563 let mp = tg.into_params();
564
565 return proc_inbound_msg_h(conn, mp, handler).await;
566 } else if topic == "Fail" {
567 return Err(Error::ServerError(tg.into_params()));
568 }
569 }
570 }
571 _ => {
572 return Err(Error::BadState(
573 "Unexpected codec input type.".to_string()
574 ));
575 }
576 }
577 return Err(Error::bad_state("Unexpected reply from server."));
578 }
579
580 Err(Error::Disconnected)
581}
582
583
584/// Given a `Params` buffer of a received `Msg` telegram, return a tuple
585/// containing `(command id, metadata length, payload length)`.
586fn parse_header(mp: &Params) -> Result<(u32, u32, u64), Error> {
587 let cmd = if mp.have("Cmd") {
588 mp.get_param::<u32>("Cmd")?
589 } else {
590 0
591 };
592
593 let metalen = if mp.have("MetaLen") {
594 mp.get_param::<u32>("MetaLen")?
595 } else {
596 0u32
597 };
598
599 let payloadlen = if mp.have("Len") {
600 mp.get_param::<u64>("Len")?
601 } else {
602 0u64
603 };
604
605 Ok((cmd, metalen, payloadlen))
606}
607
608
609/// Get a single message, given a parsed Msg header.
610///
611/// `mp` is the message parameters extracted from a Msg telegram.
612async fn proc_inbound_msg_h<C>(
613 conn: &mut Framed<C, blather::Codec>,
614 mp: Params,
615 handler: &mut Box<dyn Handler + Send + Sync>
616) -> Result<Msg, Error>
617where
618 C: AsyncRead + AsyncWrite + Unpin
619{
620 let (cmd, metalen, payloadlen) = parse_header(&mp)?;
621
622 // ToDo: Parse mp and check if metadata and/or payload is passed from the
623 // server using a local file path. If it is, then return it to the
624 // application using Storage::LocalFile(PathBuf).
625
626 // If the Params contains either a Len or a MetaLen keyword, then
627 // call the application callback to determine how it wants the data
628 // stored.
629 // ToDo: - If metadata and payload are stored as "local files", then don't
630 // call application; force to Storage::LocalFile
631 let (meta_store, payload_store) = if metalen != 0 || payloadlen != 0 {
632 // Call the application callback, passing a few Msg parameters, to ask it
633 // in what form it would like the metadata and payload.
634
635 let mi = MsgInfo {
636 cmd,
637 metalen,
638 payloadlen
639 };
640
641 let (ms, ps) = handler.on_header(&mi).await?;
642 let ms = if metalen != 0 { Some(ms) } else { None };
643 let ps = if payloadlen != 0 { Some(ps) } else { None };
644 (ms, ps)
645 } else {
646 (None, None)
647 };
648
649 //
650 // At this point, if meta_store is None, it means there was no message
651 // metadata, and we'll skip this altogether and return None to the
652 // application for the metadata.
653 //
654 // If meta_store is Some, then request the appropriate type from the
655 // blather's Codec.
656 //
657 let meta = get_content_to(conn, metalen, meta_store).await?;
658
659 let payload = get_content_to(conn, payloadlen, payload_store).await?;
660
661 Ok(Msg { cmd, meta, payload })
662}
663
664
665/// Enter a loop which will keep receiving messages to a [`Handler`] object
666/// until connection is closed or an optional killswitch is triggered.
667///
668/// Returns `Ok()` if the loop was terminated by the killswitch.
669///
670/// # Example
671/// Enter a loop which will receive messages:
672/// - Ignore metadata which is larder than 1MB
673/// - Ignore payloads larger than 16MB, larger than 256KB are stored in a file,
674/// and other payloads are stored in byte buffers.
675///
676/// Messages are only received, but not processed.
677///
678/// ```no_run
679/// use async_trait::async_trait;
680/// use std::path::PathBuf;
681/// use std::str::FromStr;
682/// use ddmw_client::{
683/// conn,
684/// msg::{self, recv::{StoreType, Handler, MsgInfo, Msg, SubInfo, SubCh}}
685/// };
686///
687/// struct RecvProc {
688/// idx: usize
689/// }
690///
691/// #[async_trait]
692/// impl Handler for RecvProc {
693/// async fn on_header(
694/// &mut self,
695/// mi: &MsgInfo
696/// ) -> Result<(StoreType, StoreType), ddmw_client::Error> {
697/// // A new message is arriving; give the application the opportunity
698/// // to choose how to store the message metadata and payload.
699///
700/// // Choose what to do with message metadata
701/// let meta_store = if mi.metalen > 1024*1024 {
702/// // Too big
703/// StoreType::None
704/// } else {
705/// // Store it in a memory buffer
706/// StoreType::Bytes
707/// };
708///
709/// // Choose what to do with message payload
710/// let payload_store = if mi.payloadlen > 16*1024*1024 {
711/// // Bigger than 16MB; too big, just ignore it
712/// StoreType::None
713/// } else if mi.payloadlen > 256*1024 {
714/// // It's bigger than 256K -- store it in a file
715/// let payloadfile = format!("{:x}.payload", self.idx);
716/// StoreType::File(PathBuf::from(payloadfile))
717/// } else {
718/// // It's small enough to store in a memory buffer
719/// StoreType::Bytes
720/// };
721///
722/// Ok((meta_store, payload_store))
723/// }
724///
725/// async fn on_data(&mut self, msg: Msg) -> Result<(), ddmw_client::Error> {
726/// println!("Process message");
727///
728/// // ToDo: Do things with message buffer here
729///
730/// Ok(())
731/// }
732/// }
733///
734/// async fn connect_and_receive_messages() {
735/// let pa = conn::ProtAddr::from_str("127.0.0.1:4100").unwrap();
736/// let mut conn = conn::connect(&pa, None).await.unwrap();
737///
738/// let subinfo = SubInfo { ch: SubCh::Num(11) };
739/// ddmw_client::msg::recv::subscribe(&mut conn, subinfo).await.unwrap();
740///
741/// let mut handler = Box::new(RecvProc { idx: 0 })
742/// as Box<dyn Handler + Sync + Send>;
743///
744/// msg::recvloop_h(&mut conn, None, &mut handler).await.unwrap();
745/// }
746/// ```
747// ToDo: yield, when it becomes available
748pub async fn recvloop_h<C>(
749 conn: &mut Framed<C, blather::Codec>,
750 kill: Option<killswitch::Shutdown>,
751 handler: &mut Box<dyn Handler + Send + Sync>
752) -> Result<(), Error>
753where
754 C: AsyncRead + AsyncWrite + Unpin
755{
756 if let Some(kill) = kill {
757 loop {
758 tokio::select! {
759 msg = recv_h(conn, handler) => {
760 let msg = msg?;
761 handler.on_data(msg).await?;
762 }
763 _ = kill.wait() => {
764 // An external termination request was received, so break out of loop
765 break;
766 }
767 }
768 }
769 } else {
770 // No killswitch supplied -- just keep running until disconnection
771 loop {
772 let msg = recv_h(conn, handler).await?;
773 handler.on_data(msg).await?;
774 }
775 }
776
777 Ok(())
778}
779
780// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :