Skip to main content

tokio_ddmw/
msg.rs

1use std::fs;
2use std::path::PathBuf;
3
4use tokio::io::{AsyncRead, AsyncWrite};
5use tokio::net::TcpStream;
6
7#[cfg(unix)]
8use tokio::net::UnixStream;
9
10use tokio_util::codec::Framed;
11
12use futures::sink::SinkExt;
13
14use bytes::Bytes;
15
16use blather::{Params, Telegram};
17
18use crate::err::Error;
19
20
21pub enum InputType {
22  Params(Params),
23  File(PathBuf),
24  VecBuf(Vec<u8>),
25  Bytes(Bytes)
26}
27
28pub enum Endpoint {
29  TcpSockAddr(String),
30
31  #[cfg(unix)]
32  UdsPath(PathBuf)
33}
34
35pub struct ConnTransport {
36  pub msgif: Endpoint,
37  pub authinfo: Option<crate::auth::AuthInfo>,
38  pub ch: u8
39}
40
41pub struct Transport {
42  pub ch: u8
43}
44
45pub struct MsgInfo {
46  pub cmd: u32,
47  pub meta: Option<InputType>,
48  pub payload: Option<InputType>
49}
50
51
52/// Connect, optionally authenticate, send message and disconnect
53pub async fn connsend(
54  xfer: ConnTransport,
55  mi: &MsgInfo
56) -> Result<String, Error> {
57  match xfer.msgif {
58    Endpoint::TcpSockAddr(sa) => {
59      let stream = TcpStream::connect(sa).await?;
60      let mut framed = Framed::new(stream, blather::Codec::new());
61      if let Some(ref authinfo) = xfer.authinfo {
62        let _ = crate::auth::authenticate(&mut framed, authinfo).await?;
63      }
64      send(&mut framed, &Transport { ch: xfer.ch }, mi).await
65    }
66    #[cfg(unix)]
67    Endpoint::UdsPath(sa) => {
68      let stream = UnixStream::connect(sa).await?;
69      let mut framed = Framed::new(stream, blather::Codec::new());
70      if let Some(ref authinfo) = xfer.authinfo {
71        let _ = crate::auth::authenticate(&mut framed, authinfo).await?;
72      }
73      send(&mut framed, &Transport { ch: xfer.ch }, mi).await
74    }
75  }
76}
77
78
79/// Send a message, including (if applicable) its metadata and payload.
80///
81/// On successful completion returns the transfer identifier.
82pub async fn send<T: AsyncRead + AsyncWrite + Unpin>(
83  conn: &mut Framed<T, blather::Codec>,
84  xfer: &Transport,
85  mi: &MsgInfo
86) -> Result<String, Error> {
87  let metalen = get_meta_size(&mi)?;
88  let payloadlen = get_payload_size(&mi)?;
89
90  let mut tg = Telegram::new_topic("Msg")?;
91  tg.add_param("_Ch", xfer.ch)?;
92  if mi.cmd != 0 {
93    tg.add_param("Cmd", mi.cmd)?;
94  }
95  if metalen != 0 {
96    tg.add_param("MetaLen", metalen)?;
97  }
98  if payloadlen != 0 {
99    tg.add_param("Len", payloadlen)?;
100  }
101  let params = crate::sendrecv(conn, &tg).await?;
102
103  // Extract the transfer identifier assigned to this message
104  let xferid = match params.get_str("XferId") {
105    Some(xferid) => xferid.to_string(),
106    None => {
107      let e = "Missing expected transfer identifier";
108      return Err(Error::MissingData(String::from(e)));
109    }
110  };
111
112  if let Some(meta) = &mi.meta {
113    send_content(conn, meta).await?;
114    crate::expect_okfail(conn).await?;
115  }
116
117  if let Some(payload) = &mi.payload {
118    send_content(conn, payload).await?;
119    crate::expect_okfail(conn).await?;
120  }
121
122  Ok(xferid)
123}
124
125
126fn get_meta_size(mi: &MsgInfo) -> Result<u32, Error> {
127  let sz = match &mi.meta {
128    Some(meta) => match meta {
129      InputType::Params(params) => params.calc_buf_size(),
130      InputType::File(f) => {
131        let metadata = fs::metadata(&f)?;
132        metadata.len() as usize
133      }
134      InputType::VecBuf(v) => v.len(),
135      InputType::Bytes(b) => b.len()
136    },
137    None => 0
138  };
139
140  if sz > u32::MAX as usize {
141    // ToDo: Return out of bounds error
142  }
143
144  Ok(sz as u32)
145}
146
147
148fn get_payload_size(mi: &MsgInfo) -> Result<u64, Error> {
149  let sz = match &mi.payload {
150    Some(payload) => match payload {
151      InputType::Params(params) => params.calc_buf_size(),
152      InputType::File(f) => {
153        let metadata = fs::metadata(&f)?;
154        metadata.len() as usize
155      }
156      InputType::VecBuf(v) => v.len(),
157      InputType::Bytes(b) => b.len()
158    },
159    None => 0
160  };
161
162  Ok(sz as u64)
163}
164
165
166async fn send_content<T>(
167  conn: &mut Framed<T, blather::Codec>,
168  data: &InputType
169) -> Result<(), Error>
170where
171  T: AsyncRead + AsyncWrite + Unpin
172{
173  match data {
174    InputType::Params(params) => Ok(conn.send(params).await?),
175    InputType::File(fname) => {
176      let mut f = tokio::fs::File::open(fname).await?;
177      let _ = tokio::io::copy(&mut f, conn.get_mut()).await?;
178      Ok(())
179    }
180    InputType::VecBuf(v) => Ok(conn.send(v.as_slice()).await?),
181    InputType::Bytes(b) => Ok(conn.send(b.as_ref()).await?)
182  }
183}
184
185// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :