lpc/
server.rs

1use std::sync::Arc;
2
3use crossbeam::channel::Receiver;
4use fast_able::vec::SyncVec;
5use ipc_channel::platform::{OsIpcOneShotServer, OsIpcSender};
6
7use crate::{
8    types::{IpcExt, LpcExt},
9    utils::sleep_1kms,
10    R,
11};
12
13pub struct LpcServer {
14    #[allow(dead_code)]
15    rt: std::thread::JoinHandle<()>,
16    senders: Arc<SyncVec<OsIpcSender>>,
17    recver: Receiver<(Vec<u8>, Vec<OsIpcSender>)>,
18}
19
20impl LpcServer {
21    /// "./Server_key"
22    pub fn new(key_file: impl Into<String>) -> LpcServer {
23        // println!("Server key: {name}");
24        // std::fs::write("./Server_key", name).unwrap();
25        let key_file = key_file.into();
26        let senders = Arc::new(SyncVec::new());
27
28        let senders_c = senders.clone();
29        let (sender, recver) = crossbeam::channel::unbounded();
30
31        // 连接中断后,会重新连接
32        let rt = std::thread::spawn(move || loop {
33            let key_file_c = key_file.clone();
34            let (server, key) = OsIpcOneShotServer::new().unwrap();
35            std::fs::write(key_file_c, key).unwrap();
36
37            // 等待连接
38            let (recv, res, channels, _) = match server.accept() {
39                Ok(s) => s,
40                Err(e) => {
41                    println!("Error: {:?}", e);
42                    sleep_1kms();
43                    continue;
44                }
45            };
46
47            senders_c.clear();
48
49            // 保存连接(发送者)
50            for mut ele in channels {
51                senders_c.push(ele.to_sender());
52            }
53
54            // 验证连接
55            if res != b"Ready" {
56                sleep_1kms();
57                continue;
58            }
59
60            info!("lpc server connect succeed: {key_file}");
61
62            // 循环接收数据
63            loop {
64                let d = recv.recv().map(|x| {
65                    (
66                        x.0,
67                        x.1.into_iter()
68                            .map(|mut x| x.to_sender())
69                            .collect::<Vec<_>>(),
70                    )
71                });
72                let d = match d {
73                    Ok(d) => d,
74                    Err(e) => {
75                        println!("Error: {:?}", e);
76                        sleep_1kms();
77                        break;
78                    }
79                };
80                if let Err(e) = sender.send(d) {
81                    error!("sender.send: {:?}", e);
82                }
83            }
84        });
85        let s = Self {
86            rt,
87            senders,
88            recver,
89        };
90        return s;
91        // println!("Server key: {name}");
92        // std::fs::write("./Server_key", name).unwrap();
93    }
94}
95
96impl LpcExt for LpcServer {
97    /// 接收数据
98    fn recver<F: Fn(Vec<u8>, Option<OsIpcSender>) + Send + Clone + 'static>(&self, call: F) {
99        let reves = self.recver.clone();
100        std::thread::spawn(move || loop {
101            let r = reves.recv().map(|x| {
102                let (data, mut senders) = x;
103                if b"PING" == &data[..] {
104                    for ele in senders {
105                        _ = ele.send(b"PONG", vec![], vec![]);
106                    }
107                    return;
108                }
109                let send = if senders.is_empty() {
110                    None
111                } else {
112                    Some(senders.remove(0))
113                };
114                call(data, send);
115            });
116            if let Err(e) = r {
117                error!("recver: {:?}", e);
118                sleep_1kms();
119            }
120        });
121    }
122
123    /// 发送数据
124    fn send<T: Into<Vec<u8>>>(&self, data: T) -> R {
125        let data = data.into();
126        match self.senders.get(0) {
127            None => Err(format!("index: 0 not found"))?,
128            Some(ele) => ele.send_data(data),
129        }
130    }
131
132    fn send_res<T: Into<Vec<u8>>>(&self, data: T) -> R<Vec<u8>> {
133        let data = data.into();
134        match self.senders.get(0) {
135            None => Err(format!("index: 0 not found"))?,
136            Some(ele) => ele.send_data_res(data),
137        }
138    }
139}
140
141impl LpcServer {
142    /// 发送数据, 指定发送者
143    pub fn send_index<T: Into<Vec<u8>>>(&self, data: T, index: usize, is_res: bool) -> R<Vec<u8>> {
144        let data = data.into();
145        match self.senders.get(index) {
146            None => Err(format!("index: {index} not found"))?,
147            Some(ele) => {
148                if is_res {
149                    return ele.send_data_res(data)
150                } else {
151                    ele.send_data(data)?;
152                    return Ok(vec![]);
153                }
154            }
155        }
156    }
157
158    /// 发送数据, 给所有发送者
159    pub fn send_all<T: Into<Vec<u8>> + Clone>(&self, data: T) -> R {
160        for ele in self.senders.iter() {
161            ele.send_data_res(data.clone())?;
162        }
163        return Ok(());
164    }
165}