umbral_socket/stream/
client.rs1use std::io::{self, Result};
2use std::sync::Arc;
3
4use bytes::Bytes;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::UnixStream;
7use tokio::sync::Mutex;
8
9struct ClientState {
10 stream: Mutex<Option<UnixStream>>,
11 socket: String,
12}
13
14#[derive(Clone)]
15pub struct UmbralClient {
16 state: Arc<ClientState>,
17}
18
19impl UmbralClient {
20 pub fn new(socket: &str) -> UmbralClient {
21 let state = Arc::new(ClientState {
22 stream: Mutex::new(None),
23 socket: String::from(socket),
24 });
25 return UmbralClient { state };
26 }
27
28 pub async fn send(&self, method: &str, payload: &str) -> Result<Bytes> {
29 let message = format!("{}[%]{}", method, payload);
30 let mut stream_guard = self.state.stream.lock().await;
31 if stream_guard.is_none() {
32 match UnixStream::connect(&self.state.socket).await {
33 Ok(stream) => *stream_guard = Some(stream),
34 Err(e) => return Err(e),
35 }
36 }
37
38 if let Some(stream) = stream_guard.as_mut() {
39 if let Err(e) = stream.write_all(message.as_bytes()).await {
40 *stream_guard = None;
41 return Err(e);
42 }
43
44 let mut len_bytes = [0u8; 4];
45 if let Err(e) = stream.read_exact(&mut len_bytes).await {
46 *stream_guard = None;
47 return Err(e);
48 }
49 let len = u32::from_be_bytes(len_bytes);
50
51 let mut response_buffer = vec![0u8; len as usize];
52 if let Err(e) = stream.read_exact(&mut response_buffer).await {
53 *stream_guard = None;
54 return Err(e);
55 }
56
57 return Ok(Bytes::from(response_buffer));
58 }
59
60 Err(io::Error::new(io::ErrorKind::Other, "Failed to get stream"))
61 }
62}