rigela_utils/
pipe.rs

1/*
2 * Copyright (c) 2024. The RigelA open source project team and
3 * its contributors reserve all rights.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software distributed under the
10 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and limitations under the License.
12 */
13
14use log::error;
15use serde::{Deserialize, Serialize};
16use serde_json_bytes::serde_json::{from_slice, to_vec};
17use std::fmt::{Display, Formatter};
18use tokio::{
19    io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
20    net::windows::named_pipe::{ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions},
21    time::{sleep, Duration},
22};
23
24/**
25 * 连接到一个管道。
26 * `pipe_name` 管道名称。
27 * */
28pub async fn client_connect<T>(pipe_name: &str) -> PipeStream<T, NamedPipeClient>
29where
30    T: for<'de> Deserialize<'de> + Serialize,
31{
32    // 使用循环方法连接管道,因为可能在连接的时候管道还没创建完毕
33    let client = loop {
34        // 推迟一秒连接,尽量确保管道创建完毕
35        sleep(Duration::from_millis(1000)).await;
36
37        match ClientOptions::new().open(pipe_name) {
38            Ok(x) => break x,
39            Err(e) => {
40                error!("Can't open the named pipe ({}). {}", pipe_name, e);
41                continue;
42            }
43        }
44    };
45    PipeStream::new(client)
46}
47
48/**
49 * 创建一个管道服务器,并等待一个客户端连接。
50 * `pipe_name` 管道名称。
51 * */
52pub async fn server_run<T>(pipe_name: &str) -> PipeStream<T, NamedPipeServer>
53where
54    T: for<'de> Deserialize<'de> + Serialize,
55{
56    let server = ServerOptions::new().create(pipe_name).unwrap();
57    server.connect().await.unwrap();
58    PipeStream::new(server)
59}
60
61#[derive(Debug)]
62pub struct PipeStream<R, T>
63where
64    R: for<'de> Deserialize<'de> + Serialize,
65    T: AsyncRead + AsyncWrite,
66{
67    _packet: Option<R>,
68    reader: BufReader<T>,
69}
70
71impl<R, T> PipeStream<R, T>
72where
73    R: for<'de> Deserialize<'de> + Serialize,
74    T: AsyncRead + AsyncWrite + Unpin,
75{
76    /**
77     * 创建一个管道的流,用于发送和接收数据。
78     * 其中传输的数据是实现了Deserialize 和 Serialize接口的struct。
79     * */
80    pub fn new(stream: T) -> Self {
81        let reader = BufReader::new(stream);
82        Self {
83            _packet: None,
84            reader,
85        }
86    }
87
88    /**
89     * 接收一个数据包。
90     * */
91    pub async fn recv(&mut self) -> Result<R, PipeStreamError> {
92        let mut buf = Vec::new();
93        if let Ok(x) = self.reader.read_until(b'\n', &mut buf).await {
94            if x < 1 {
95                return Err(PipeStreamError::ReadEof);
96            }
97        };
98        let r = from_slice(&buf);
99        if let Err(e) = r {
100            return Err(PipeStreamError::DecodeError(e.to_string()));
101        }
102        Ok(r.unwrap())
103    }
104
105    /**
106     * 发送一个数据包。
107     * `packet` 实现了序列化接口的数据。
108     * */
109    pub async fn send(&mut self, packet: &R) -> Result<(), PipeStreamError> {
110        let mut data = to_vec(&packet).unwrap();
111        data.push(b'\n');
112        if let Err(e) = self.reader.write_all(&data).await {
113            Err(PipeStreamError::WriteError(format!(
114                "Can't send the data. {}",
115                e
116            )))
117        } else {
118            Ok(())
119        }
120    }
121}
122
123#[derive(Debug)]
124pub enum PipeStreamError {
125    ReadEof,
126    WriteError(String),
127    DecodeError(String),
128}
129
130impl Display for PipeStreamError {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        let msg = match self {
133            PipeStreamError::ReadEof => "Read eof.".to_string(),
134            PipeStreamError::DecodeError(e) => e.to_string(),
135            PipeStreamError::WriteError(e) => e.to_string(),
136        };
137        write!(f, "{}", msg)
138    }
139}