1use std::fs;
2use std::path::PathBuf;
3
4use tokio::io::{AsyncRead, AsyncWrite};
5use tokio::net::TcpStream;
6
7#[cfg(unix)]
8use tokio::net::UnixStream;
9
10use tokio_util::codec::Framed;
11
12use futures::sink::SinkExt;
13
14use bytes::Bytes;
15
16use blather::{Params, Telegram};
17
18use crate::err::Error;
19
20
21pub enum InputType {
22 Params(Params),
23 File(PathBuf),
24 VecBuf(Vec<u8>),
25 Bytes(Bytes)
26}
27
28pub enum Endpoint {
29 TcpSockAddr(String),
30
31 #[cfg(unix)]
32 UdsPath(PathBuf)
33}
34
35pub struct ConnTransport {
36 pub msgif: Endpoint,
37 pub authinfo: Option<crate::auth::AuthInfo>,
38 pub ch: u8
39}
40
41pub struct Transport {
42 pub ch: u8
43}
44
45pub struct MsgInfo {
46 pub cmd: u32,
47 pub meta: Option<InputType>,
48 pub payload: Option<InputType>
49}
50
51
52pub async fn connsend(
54 xfer: ConnTransport,
55 mi: &MsgInfo
56) -> Result<String, Error> {
57 match xfer.msgif {
58 Endpoint::TcpSockAddr(sa) => {
59 let stream = TcpStream::connect(sa).await?;
60 let mut framed = Framed::new(stream, blather::Codec::new());
61 if let Some(ref authinfo) = xfer.authinfo {
62 let _ = crate::auth::authenticate(&mut framed, authinfo).await?;
63 }
64 send(&mut framed, &Transport { ch: xfer.ch }, mi).await
65 }
66 #[cfg(unix)]
67 Endpoint::UdsPath(sa) => {
68 let stream = UnixStream::connect(sa).await?;
69 let mut framed = Framed::new(stream, blather::Codec::new());
70 if let Some(ref authinfo) = xfer.authinfo {
71 let _ = crate::auth::authenticate(&mut framed, authinfo).await?;
72 }
73 send(&mut framed, &Transport { ch: xfer.ch }, mi).await
74 }
75 }
76}
77
78
79pub async fn send<T: AsyncRead + AsyncWrite + Unpin>(
83 conn: &mut Framed<T, blather::Codec>,
84 xfer: &Transport,
85 mi: &MsgInfo
86) -> Result<String, Error> {
87 let metalen = get_meta_size(&mi)?;
88 let payloadlen = get_payload_size(&mi)?;
89
90 let mut tg = Telegram::new_topic("Msg")?;
91 tg.add_param("_Ch", xfer.ch)?;
92 if mi.cmd != 0 {
93 tg.add_param("Cmd", mi.cmd)?;
94 }
95 if metalen != 0 {
96 tg.add_param("MetaLen", metalen)?;
97 }
98 if payloadlen != 0 {
99 tg.add_param("Len", payloadlen)?;
100 }
101 let params = crate::sendrecv(conn, &tg).await?;
102
103 let xferid = match params.get_str("XferId") {
105 Some(xferid) => xferid.to_string(),
106 None => {
107 let e = "Missing expected transfer identifier";
108 return Err(Error::MissingData(String::from(e)));
109 }
110 };
111
112 if let Some(meta) = &mi.meta {
113 send_content(conn, meta).await?;
114 crate::expect_okfail(conn).await?;
115 }
116
117 if let Some(payload) = &mi.payload {
118 send_content(conn, payload).await?;
119 crate::expect_okfail(conn).await?;
120 }
121
122 Ok(xferid)
123}
124
125
126fn get_meta_size(mi: &MsgInfo) -> Result<u32, Error> {
127 let sz = match &mi.meta {
128 Some(meta) => match meta {
129 InputType::Params(params) => params.calc_buf_size(),
130 InputType::File(f) => {
131 let metadata = fs::metadata(&f)?;
132 metadata.len() as usize
133 }
134 InputType::VecBuf(v) => v.len(),
135 InputType::Bytes(b) => b.len()
136 },
137 None => 0
138 };
139
140 if sz > u32::MAX as usize {
141 }
143
144 Ok(sz as u32)
145}
146
147
148fn get_payload_size(mi: &MsgInfo) -> Result<u64, Error> {
149 let sz = match &mi.payload {
150 Some(payload) => match payload {
151 InputType::Params(params) => params.calc_buf_size(),
152 InputType::File(f) => {
153 let metadata = fs::metadata(&f)?;
154 metadata.len() as usize
155 }
156 InputType::VecBuf(v) => v.len(),
157 InputType::Bytes(b) => b.len()
158 },
159 None => 0
160 };
161
162 Ok(sz as u64)
163}
164
165
166async fn send_content<T>(
167 conn: &mut Framed<T, blather::Codec>,
168 data: &InputType
169) -> Result<(), Error>
170where
171 T: AsyncRead + AsyncWrite + Unpin
172{
173 match data {
174 InputType::Params(params) => Ok(conn.send(params).await?),
175 InputType::File(fname) => {
176 let mut f = tokio::fs::File::open(fname).await?;
177 let _ = tokio::io::copy(&mut f, conn.get_mut()).await?;
178 Ok(())
179 }
180 InputType::VecBuf(v) => Ok(conn.send(v.as_slice()).await?),
181 InputType::Bytes(b) => Ok(conn.send(b.as_ref()).await?)
182 }
183}
184
185