use std::{
cell::UnsafeCell,
sync::{
atomic::{AtomicPtr, Ordering},
Arc, Mutex,
},
};
use fast_able::vec::SyncVec;
use ipc_channel::platform::{OsIpcChannel, OsIpcReceiver, OsIpcSender};
use crate::{
types::{IpcExt, LpcExt},
utils::sleep_1kms,
R,
};
pub struct LpcClient {
receivers: SyncVec<OsIpcReceiver>,
sender: Arc<Mutex<Option<OsIpcSender>>>,
#[allow(dead_code)]
rt: std::thread::JoinHandle<()>,
}
impl LpcClient {
pub fn new(key_file: impl Into<String>, rece_count: usize) -> R<Self> {
let mut receivers = SyncVec::new();
let mut txs = vec![];
for _ in 0..rece_count {
let (tx, rx) = ipc_channel::platform::channel()?;
receivers.push(rx);
txs.push(tx);
}
let sender = Arc::new(Mutex::new(None));
let key_file = key_file.into();
let sender_c = sender.clone();
let sender_c2 = sender.clone();
let loop_f = move || -> R {
let key_file_c = key_file.clone();
let key = std::fs::read(key_file_c)?;
let key = String::from_utf8(key)?;
let txs = txs
.clone()
.into_iter()
.map(|x| OsIpcChannel::Sender(x))
.collect::<Vec<_>>();
let tx = ipc_channel::platform::OsIpcSender::connect(key)?;
tx.send(b"Ready", txs, vec![])
.map_err(|e| format!("{e:?}"))?;
sender_c.lock().unwrap().replace(tx);
info!("lpc client connect succeed: {key_file}");
Ok(())
};
if let Err(e) = loop_f() {
warn!("Failed to check [lpc] service, will reconnect: {e:?}");
}
let rt = std::thread::spawn(move || loop {
let click = if let Ok(lock) = sender_c2.try_lock() {
match lock.as_ref() {
Some(v) => {
let r = v.send_data_res("PING");
match r {
Ok(v) => {
if v != b"PONG" {
error!("lpc PING is return: {:?}", v);
false
} else {
true
}
}
Err(e) => {
error!("lpc PING send_data: {:?}", e);
false
}
}
}
None => {
debug!("lpc PING for sender is none");
false
}
}
} else {
debug!("Failed to check [lpc] service, will reconnect: try_lock error");
true
};
if !click {
warn!("Failed to check [lpc] service, will reconnect");
std::thread::sleep(std::time::Duration::from_millis(3000));
if let Err(e) = loop_f() {
error!("ipc connect Error: {:?}", e);
}
} else {
std::thread::sleep(std::time::Duration::from_millis(30 * 1000));
}
});
let r = Self {
receivers,
rt,
sender,
};
Ok(r)
}
}
impl LpcExt for LpcClient {
fn recver<F: Fn(Vec<u8>, Option<OsIpcSender>) + Send + Clone + 'static>(&self, call: F) {
for rev in &self.receivers {
let call_c = call.clone();
let rev = rev.consume();
std::thread::spawn(move || loop {
let r = rev.recv().map(|(data, senders, _)| {
let mut senders = senders
.into_iter()
.map(|mut x| x.to_sender())
.collect::<Vec<_>>();
let send = if senders.is_empty() {
None
} else {
Some(senders.remove(0))
};
call_c(data, send);
});
if let Err(e) = r {
error!("recver: {:?}", e);
sleep_1kms();
}
});
}
}
fn send<T: Into<Vec<u8>>>(&self, data: T) -> R {
let lock = self
.sender
.lock()
.map_err(|e| format!("lock error: {e:?}"))?;
match lock.as_ref() {
Some(v) => v.send_data(data),
None => {
return Err(format!("sender is none"))?;
}
}
}
fn send_res<T: Into<Vec<u8>>>(&self, data: T) -> R<Vec<u8>> {
let lock = self
.sender
.lock()
.map_err(|e| format!("lock error: {e:?}"))?;
match lock.as_ref() {
Some(v) => v.send_data_res(data),
None => {
return Err(format!("sender is none"))?;
}
}
}
}