1use std::sync::Arc;
2
3use crossbeam::channel::Receiver;
4use fast_able::vec::SyncVec;
5use ipc_channel::platform::{OsIpcOneShotServer, OsIpcSender};
6
7use crate::{
8 types::{IpcExt, LpcExt},
9 utils::sleep_1kms,
10 R,
11};
12
13pub struct LpcServer {
14 #[allow(dead_code)]
15 rt: std::thread::JoinHandle<()>,
16 senders: Arc<SyncVec<OsIpcSender>>,
17 recver: Receiver<(Vec<u8>, Vec<OsIpcSender>)>,
18}
19
20impl LpcServer {
21 pub fn new(key_file: impl Into<String>) -> LpcServer {
23 let key_file = key_file.into();
26 let senders = Arc::new(SyncVec::new());
27
28 let senders_c = senders.clone();
29 let (sender, recver) = crossbeam::channel::unbounded();
30
31 let rt = std::thread::spawn(move || loop {
33 let key_file_c = key_file.clone();
34 let (server, key) = OsIpcOneShotServer::new().unwrap();
35 std::fs::write(key_file_c, key).unwrap();
36
37 let (recv, res, channels, _) = match server.accept() {
39 Ok(s) => s,
40 Err(e) => {
41 println!("Error: {:?}", e);
42 sleep_1kms();
43 continue;
44 }
45 };
46
47 senders_c.clear();
48
49 for mut ele in channels {
51 senders_c.push(ele.to_sender());
52 }
53
54 if res != b"Ready" {
56 sleep_1kms();
57 continue;
58 }
59
60 info!("lpc server connect succeed: {key_file}");
61
62 loop {
64 let d = recv.recv().map(|x| {
65 (
66 x.0,
67 x.1.into_iter()
68 .map(|mut x| x.to_sender())
69 .collect::<Vec<_>>(),
70 )
71 });
72 let d = match d {
73 Ok(d) => d,
74 Err(e) => {
75 println!("Error: {:?}", e);
76 sleep_1kms();
77 break;
78 }
79 };
80 if let Err(e) = sender.send(d) {
81 error!("sender.send: {:?}", e);
82 }
83 }
84 });
85 let s = Self {
86 rt,
87 senders,
88 recver,
89 };
90 return s;
91 }
94}
95
96impl LpcExt for LpcServer {
97 fn recver<F: Fn(Vec<u8>, Option<OsIpcSender>) + Send + Clone + 'static>(&self, call: F) {
99 let reves = self.recver.clone();
100 std::thread::spawn(move || loop {
101 let r = reves.recv().map(|x| {
102 let (data, mut senders) = x;
103 if b"PING" == &data[..] {
104 for ele in senders {
105 _ = ele.send(b"PONG", vec![], vec![]);
106 }
107 return;
108 }
109 let send = if senders.is_empty() {
110 None
111 } else {
112 Some(senders.remove(0))
113 };
114 call(data, send);
115 });
116 if let Err(e) = r {
117 error!("recver: {:?}", e);
118 sleep_1kms();
119 }
120 });
121 }
122
123 fn send<T: Into<Vec<u8>>>(&self, data: T) -> R {
125 let data = data.into();
126 match self.senders.get(0) {
127 None => Err(format!("index: 0 not found"))?,
128 Some(ele) => ele.send_data(data),
129 }
130 }
131
132 fn send_res<T: Into<Vec<u8>>>(&self, data: T) -> R<Vec<u8>> {
133 let data = data.into();
134 match self.senders.get(0) {
135 None => Err(format!("index: 0 not found"))?,
136 Some(ele) => ele.send_data_res(data),
137 }
138 }
139}
140
141impl LpcServer {
142 pub fn send_index<T: Into<Vec<u8>>>(&self, data: T, index: usize, is_res: bool) -> R<Vec<u8>> {
144 let data = data.into();
145 match self.senders.get(index) {
146 None => Err(format!("index: {index} not found"))?,
147 Some(ele) => {
148 if is_res {
149 return ele.send_data_res(data)
150 } else {
151 ele.send_data(data)?;
152 return Ok(vec![]);
153 }
154 }
155 }
156 }
157
158 pub fn send_all<T: Into<Vec<u8>> + Clone>(&self, data: T) -> R {
160 for ele in self.senders.iter() {
161 ele.send_data_res(data.clone())?;
162 }
163 return Ok(());
164 }
165}