ddmw_client/msg/
send.rs

1//! Functions for sending messages.
2
3use std::borrow::Borrow;
4use std::fs;
5use std::path::PathBuf;
6
7use tokio::io::{AsyncRead, AsyncWrite};
8
9use tokio_util::codec::Framed;
10
11use futures::sink::SinkExt;
12
13use bytes::Bytes;
14
15use blather::{Params, Telegram};
16
17use protwrap::ProtAddr;
18
19use crate::auth::Auth;
20use crate::conn;
21use crate::types::AppChannel;
22
23use crate::err::Error;
24
25
26pub enum InputType {
27  Params(Params),
28  File(PathBuf),
29  VecBuf(Vec<u8>),
30  Bytes(Bytes)
31}
32
33impl InputType {
34  fn get_size(&self) -> Result<usize, Error> {
35    match self {
36      InputType::Params(params) => Ok(params.calc_buf_size()),
37      InputType::File(f) => {
38        let metadata = fs::metadata(f)?;
39        Ok(metadata.len() as usize)
40      }
41      InputType::VecBuf(v) => Ok(v.len()),
42      InputType::Bytes(b) => Ok(b.len())
43    }
44  }
45}
46
47
48pub struct Transport {
49  pub ch: AppChannel
50}
51
52pub struct MsgInfo {
53  pub cmd: u32,
54  pub meta: Option<InputType>,
55  pub payload: Option<InputType>
56}
57
58
59impl MsgInfo {
60  fn get_meta_size(&self) -> Result<u32, Error> {
61    let sz = match &self.meta {
62      Some(meta) => meta.get_size()?,
63      None => 0
64    };
65
66    if sz > u32::MAX as usize {
67      // ToDo: Return out of bounds error
68    }
69
70    Ok(sz as u32)
71  }
72
73
74  fn get_payload_size(&self) -> Result<u64, Error> {
75    let sz = match &self.payload {
76      Some(payload) => payload.get_size()?,
77      None => 0
78    };
79
80    Ok(sz as u64)
81  }
82}
83
84
85/// Connect, optionally authenticate, send message and disconnect.
86///
87/// This is a convenience function for application that don't need to keep a
88/// connection open, and only needs to send a message occasionally.
89pub async fn connsend<P, X, M>(
90  pa: &ProtAddr,
91  auth: Option<&Auth>,
92  xfer: X,
93  mi: M
94) -> Result<String, Error>
95where
96  X: Borrow<Transport>,
97  M: Borrow<MsgInfo>
98{
99  let mut conn = conn::connect(pa, auth).await?;
100
101  send(&mut conn, xfer, mi).await
102}
103
104
105/// Send a message, including (if applicable) its metadata and payload.
106///
107/// On successful completion returns the transfer identifier.
108pub async fn send<T, X, M>(
109  conn: &mut Framed<T, blather::Codec>,
110  xfer: X,
111  mi: M
112) -> Result<String, Error>
113where
114  T: AsyncRead + AsyncWrite + Unpin,
115  X: Borrow<Transport>,
116  M: Borrow<MsgInfo>
117{
118  let xfer = xfer.borrow();
119  let mi = mi.borrow();
120
121  //
122  // Determine length of metadata and payload
123  //
124  let metalen = mi.get_meta_size()?;
125  let payloadlen = mi.get_payload_size()?;
126
127  //
128  // Prepare the Msg telegram
129  //
130  let mut tg = Telegram::new_topic("Msg")?;
131  tg.add_param("_Ch", xfer.ch.to_string())?;
132  if mi.cmd != 0 {
133    tg.add_param("Cmd", mi.cmd)?;
134  }
135  if metalen != 0 {
136    tg.add_param("MetaLen", metalen)?;
137  }
138  if payloadlen != 0 {
139    tg.add_param("Len", payloadlen)?;
140  }
141
142  //
143  // Request the message transfer
144  //
145  let params = crate::sendrecv(conn, &tg).await?;
146
147  //
148  // Extract the transfer identifier assigned to this message
149  //
150  let xferid = match params.get_str("XferId") {
151    Some(xferid) => xferid.to_string(),
152    None => {
153      let e = "Missing expected transfer identifier from server reply";
154      return Err(Error::MissingData(String::from(e)));
155    }
156  };
157
158  //
159  // Transmit metadata, if applicable, and wait for the server to ACK it
160  //
161  if let Some(meta) = &mi.meta {
162    send_content(conn, meta).await?;
163    crate::expect_okfail(conn).await?;
164  }
165
166  //
167  // Transmit payload, if applicable, and wait for the server to ACK it
168  //
169  if let Some(payload) = &mi.payload {
170    send_content(conn, payload).await?;
171    crate::expect_okfail(conn).await?;
172  }
173
174  Ok(xferid)
175}
176
177
178async fn send_content<T>(
179  conn: &mut Framed<T, blather::Codec>,
180  data: &InputType
181) -> Result<(), Error>
182where
183  T: AsyncRead + AsyncWrite + Unpin
184{
185  match data {
186    InputType::Params(params) => Ok(conn.send(params).await?),
187    InputType::File(fname) => {
188      let mut f = tokio::fs::File::open(fname).await?;
189      let _ = tokio::io::copy(&mut f, conn.get_mut()).await?;
190      Ok(())
191    }
192    InputType::VecBuf(v) => Ok(conn.send(v.as_slice()).await?),
193    InputType::Bytes(b) => Ok(conn.send(b.as_ref()).await?)
194  }
195}
196
197// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :