use std::sync::Arc;
use crossbeam::channel::Receiver;
use fast_able::vec::SyncVec;
use ipc_channel::platform::{OsIpcOneShotServer, OsIpcSender};
use crate::{
types::{IpcExt, LpcExt},
utils::sleep_1kms,
R,
};
pub struct LpcServer {
#[allow(dead_code)]
rt: std::thread::JoinHandle<()>,
senders: Arc<SyncVec<OsIpcSender>>,
recver: Receiver<(Vec<u8>, Vec<OsIpcSender>)>,
}
impl LpcServer {
pub fn new(key_file: impl Into<String>) -> LpcServer {
let key_file = key_file.into();
let senders = Arc::new(SyncVec::new());
let senders_c = senders.clone();
let (sender, recver) = crossbeam::channel::unbounded();
let rt = std::thread::spawn(move || loop {
let key_file_c = key_file.clone();
let (server, key) = OsIpcOneShotServer::new().unwrap();
std::fs::write(key_file_c, key).unwrap();
let (recv, res, channels, _) = match server.accept() {
Ok(s) => s,
Err(e) => {
println!("Error: {:?}", e);
sleep_1kms();
continue;
}
};
senders_c.clear();
for mut ele in channels {
senders_c.push(ele.to_sender());
}
if res != b"Ready" {
sleep_1kms();
continue;
}
info!("lpc server connect succeed: {key_file}");
loop {
let d = recv.recv().map(|x| {
(
x.0,
x.1.into_iter()
.map(|mut x| x.to_sender())
.collect::<Vec<_>>(),
)
});
let d = match d {
Ok(d) => d,
Err(e) => {
println!("Error: {:?}", e);
sleep_1kms();
break;
}
};
if let Err(e) = sender.send(d) {
error!("sender.send: {:?}", e);
}
}
});
let s = Self {
rt,
senders,
recver,
};
return s;
}
}
impl LpcExt for LpcServer {
fn recver<F: Fn(Vec<u8>, Option<OsIpcSender>) + Send + Clone + 'static>(&self, call: F) {
let reves = self.recver.clone();
std::thread::spawn(move || loop {
let r = reves.recv().map(|x| {
let (data, mut senders) = x;
if b"PING" == &data[..] {
for ele in senders {
_ = ele.send(b"PONG", vec![], vec![]);
}
return;
}
let send = if senders.is_empty() {
None
} else {
Some(senders.remove(0))
};
call(data, send);
});
if let Err(e) = r {
error!("recver: {:?}", e);
sleep_1kms();
}
});
}
fn send<T: Into<Vec<u8>>>(&self, data: T) -> R {
let data = data.into();
match self.senders.get(0) {
None => Err(format!("index: 0 not found"))?,
Some(ele) => ele.send_data(data),
}
}
fn send_res<T: Into<Vec<u8>>>(&self, data: T) -> R<Vec<u8>> {
let data = data.into();
match self.senders.get(0) {
None => Err(format!("index: 0 not found"))?,
Some(ele) => ele.send_data_res(data),
}
}
}
impl LpcServer {
pub fn send_index<T: Into<Vec<u8>>>(&self, data: T, index: usize, is_res: bool) -> R<Vec<u8>> {
let data = data.into();
match self.senders.get(index) {
None => Err(format!("index: {index} not found"))?,
Some(ele) => {
if is_res {
return ele.send_data_res(data)
} else {
ele.send_data(data)?;
return Ok(vec![]);
}
}
}
}
pub fn send_all<T: Into<Vec<u8>> + Clone>(&self, data: T) -> R {
for ele in self.senders.iter() {
ele.send_data_res(data.clone())?;
}
return Ok(());
}
}