1use log::error;
15use serde::{Deserialize, Serialize};
16use serde_json_bytes::serde_json::{from_slice, to_vec};
17use std::fmt::{Display, Formatter};
18use tokio::{
19 io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
20 net::windows::named_pipe::{ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions},
21 time::{sleep, Duration},
22};
23
24pub async fn client_connect<T>(pipe_name: &str) -> PipeStream<T, NamedPipeClient>
29where
30 T: for<'de> Deserialize<'de> + Serialize,
31{
32 let client = loop {
34 sleep(Duration::from_millis(1000)).await;
36
37 match ClientOptions::new().open(pipe_name) {
38 Ok(x) => break x,
39 Err(e) => {
40 error!("Can't open the named pipe ({}). {}", pipe_name, e);
41 continue;
42 }
43 }
44 };
45 PipeStream::new(client)
46}
47
48pub async fn server_run<T>(pipe_name: &str) -> PipeStream<T, NamedPipeServer>
53where
54 T: for<'de> Deserialize<'de> + Serialize,
55{
56 let server = ServerOptions::new().create(pipe_name).unwrap();
57 server.connect().await.unwrap();
58 PipeStream::new(server)
59}
60
61#[derive(Debug)]
62pub struct PipeStream<R, T>
63where
64 R: for<'de> Deserialize<'de> + Serialize,
65 T: AsyncRead + AsyncWrite,
66{
67 _packet: Option<R>,
68 reader: BufReader<T>,
69}
70
71impl<R, T> PipeStream<R, T>
72where
73 R: for<'de> Deserialize<'de> + Serialize,
74 T: AsyncRead + AsyncWrite + Unpin,
75{
76 pub fn new(stream: T) -> Self {
81 let reader = BufReader::new(stream);
82 Self {
83 _packet: None,
84 reader,
85 }
86 }
87
88 pub async fn recv(&mut self) -> Result<R, PipeStreamError> {
92 let mut buf = Vec::new();
93 if let Ok(x) = self.reader.read_until(b'\n', &mut buf).await {
94 if x < 1 {
95 return Err(PipeStreamError::ReadEof);
96 }
97 };
98 let r = from_slice(&buf);
99 if let Err(e) = r {
100 return Err(PipeStreamError::DecodeError(e.to_string()));
101 }
102 Ok(r.unwrap())
103 }
104
105 pub async fn send(&mut self, packet: &R) -> Result<(), PipeStreamError> {
110 let mut data = to_vec(&packet).unwrap();
111 data.push(b'\n');
112 if let Err(e) = self.reader.write_all(&data).await {
113 Err(PipeStreamError::WriteError(format!(
114 "Can't send the data. {}",
115 e
116 )))
117 } else {
118 Ok(())
119 }
120 }
121}
122
123#[derive(Debug)]
124pub enum PipeStreamError {
125 ReadEof,
126 WriteError(String),
127 DecodeError(String),
128}
129
130impl Display for PipeStreamError {
131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132 let msg = match self {
133 PipeStreamError::ReadEof => "Read eof.".to_string(),
134 PipeStreamError::DecodeError(e) => e.to_string(),
135 PipeStreamError::WriteError(e) => e.to_string(),
136 };
137 write!(f, "{}", msg)
138 }
139}