1use std::borrow::Borrow;
4use std::fs;
5use std::path::PathBuf;
6
7use tokio::io::{AsyncRead, AsyncWrite};
8
9use tokio_util::codec::Framed;
10
11use futures::sink::SinkExt;
12
13use bytes::Bytes;
14
15use blather::{Params, Telegram};
16
17use protwrap::ProtAddr;
18
19use crate::auth::Auth;
20use crate::conn;
21use crate::types::AppChannel;
22
23use crate::err::Error;
24
25
26pub enum InputType {
27 Params(Params),
28 File(PathBuf),
29 VecBuf(Vec<u8>),
30 Bytes(Bytes)
31}
32
33impl InputType {
34 fn get_size(&self) -> Result<usize, Error> {
35 match self {
36 InputType::Params(params) => Ok(params.calc_buf_size()),
37 InputType::File(f) => {
38 let metadata = fs::metadata(f)?;
39 Ok(metadata.len() as usize)
40 }
41 InputType::VecBuf(v) => Ok(v.len()),
42 InputType::Bytes(b) => Ok(b.len())
43 }
44 }
45}
46
47
48pub struct Transport {
49 pub ch: AppChannel
50}
51
52pub struct MsgInfo {
53 pub cmd: u32,
54 pub meta: Option<InputType>,
55 pub payload: Option<InputType>
56}
57
58
59impl MsgInfo {
60 fn get_meta_size(&self) -> Result<u32, Error> {
61 let sz = match &self.meta {
62 Some(meta) => meta.get_size()?,
63 None => 0
64 };
65
66 if sz > u32::MAX as usize {
67 }
69
70 Ok(sz as u32)
71 }
72
73
74 fn get_payload_size(&self) -> Result<u64, Error> {
75 let sz = match &self.payload {
76 Some(payload) => payload.get_size()?,
77 None => 0
78 };
79
80 Ok(sz as u64)
81 }
82}
83
84
85pub async fn connsend<P, X, M>(
90 pa: &ProtAddr,
91 auth: Option<&Auth>,
92 xfer: X,
93 mi: M
94) -> Result<String, Error>
95where
96 X: Borrow<Transport>,
97 M: Borrow<MsgInfo>
98{
99 let mut conn = conn::connect(pa, auth).await?;
100
101 send(&mut conn, xfer, mi).await
102}
103
104
105pub async fn send<T, X, M>(
109 conn: &mut Framed<T, blather::Codec>,
110 xfer: X,
111 mi: M
112) -> Result<String, Error>
113where
114 T: AsyncRead + AsyncWrite + Unpin,
115 X: Borrow<Transport>,
116 M: Borrow<MsgInfo>
117{
118 let xfer = xfer.borrow();
119 let mi = mi.borrow();
120
121 let metalen = mi.get_meta_size()?;
125 let payloadlen = mi.get_payload_size()?;
126
127 let mut tg = Telegram::new_topic("Msg")?;
131 tg.add_param("_Ch", xfer.ch.to_string())?;
132 if mi.cmd != 0 {
133 tg.add_param("Cmd", mi.cmd)?;
134 }
135 if metalen != 0 {
136 tg.add_param("MetaLen", metalen)?;
137 }
138 if payloadlen != 0 {
139 tg.add_param("Len", payloadlen)?;
140 }
141
142 let params = crate::sendrecv(conn, &tg).await?;
146
147 let xferid = match params.get_str("XferId") {
151 Some(xferid) => xferid.to_string(),
152 None => {
153 let e = "Missing expected transfer identifier from server reply";
154 return Err(Error::MissingData(String::from(e)));
155 }
156 };
157
158 if let Some(meta) = &mi.meta {
162 send_content(conn, meta).await?;
163 crate::expect_okfail(conn).await?;
164 }
165
166 if let Some(payload) = &mi.payload {
170 send_content(conn, payload).await?;
171 crate::expect_okfail(conn).await?;
172 }
173
174 Ok(xferid)
175}
176
177
178async fn send_content<T>(
179 conn: &mut Framed<T, blather::Codec>,
180 data: &InputType
181) -> Result<(), Error>
182where
183 T: AsyncRead + AsyncWrite + Unpin
184{
185 match data {
186 InputType::Params(params) => Ok(conn.send(params).await?),
187 InputType::File(fname) => {
188 let mut f = tokio::fs::File::open(fname).await?;
189 let _ = tokio::io::copy(&mut f, conn.get_mut()).await?;
190 Ok(())
191 }
192 InputType::VecBuf(v) => Ok(conn.send(v.as_slice()).await?),
193 InputType::Bytes(b) => Ok(conn.send(b.as_ref()).await?)
194 }
195}
196
197