lpc 1.0.2

Local Procedure Call for ipc channel impl
Documentation
use std::{
    cell::UnsafeCell,
    sync::{
        atomic::{AtomicPtr, Ordering},
        Arc, Mutex,
    },
};

use fast_able::vec::SyncVec;
use ipc_channel::platform::{OsIpcChannel, OsIpcReceiver, OsIpcSender};

use crate::{
    types::{IpcExt, LpcExt},
    utils::sleep_1kms,
    R,
};

pub struct LpcClient {
    receivers: SyncVec<OsIpcReceiver>,
    sender: Arc<Mutex<Option<OsIpcSender>>>,
    #[allow(dead_code)]
    rt: std::thread::JoinHandle<()>,
}

impl LpcClient {
    pub fn new(key_file: impl Into<String>, rece_count: usize) -> R<Self> {
        let mut receivers = SyncVec::new();
        let mut txs = vec![];
        for _ in 0..rece_count {
            let (tx, rx) = ipc_channel::platform::channel()?;
            receivers.push(rx);
            txs.push(tx);
        }
        // let receivers = UnsafeCell::new(receivers);
        let sender = Arc::new(Mutex::new(None));

        let key_file = key_file.into();
        let sender_c = sender.clone();
        let sender_c2 = sender.clone();

        let loop_f = move || -> R {
            let key_file_c = key_file.clone();
            let key = std::fs::read(key_file_c)?;
            let key = String::from_utf8(key)?;
            let txs = txs
                .clone()
                .into_iter()
                .map(|x| OsIpcChannel::Sender(x))
                .collect::<Vec<_>>();
            let tx = ipc_channel::platform::OsIpcSender::connect(key)?;
            tx.send(b"Ready", txs, vec![])
                .map_err(|e| format!("{e:?}"))?;
            sender_c.lock().unwrap().replace(tx);
            info!("lpc client connect succeed: {key_file}");

            Ok(())
        };

        if let Err(e) = loop_f() {
            warn!("Failed to check [lpc] service, will reconnect: {e:?}");
        }

        let rt = std::thread::spawn(move || loop {
            let click = if let Ok(lock) = sender_c2.try_lock() {
                match lock.as_ref() {
                    Some(v) => {
                        let r = v.send_data_res("PING");
                        match r {
                            Ok(v) => {
                                if v != b"PONG" {
                                    error!("lpc PING is return: {:?}", v);
                                    false
                                } else {
                                    true
                                }
                            }
                            Err(e) => {
                                error!("lpc PING send_data: {:?}", e);
                                false
                            }
                        }
                    }
                    None => {
                        debug!("lpc PING for sender is none");
                        false
                    }
                }
            } else {
                debug!("Failed to check [lpc] service, will reconnect: try_lock error");
                true
            };
            if !click {
                warn!("Failed to check [lpc] service, will reconnect");
                std::thread::sleep(std::time::Duration::from_millis(3000));
                if let Err(e) = loop_f() {
                    error!("ipc connect Error: {:?}", e);
                }
            } else {
                std::thread::sleep(std::time::Duration::from_millis(30 * 1000));
            }
        });
        let r = Self {
            receivers,
            rt,
            sender,
        };
        Ok(r)
    }
}

impl LpcExt for LpcClient {
    /// 接收数据
    fn recver<F: Fn(Vec<u8>, Option<OsIpcSender>) + Send + Clone + 'static>(&self, call: F) {
        for rev in &self.receivers {
            let call_c = call.clone();
            let rev = rev.consume();
            std::thread::spawn(move || loop {
                let r = rev.recv().map(|(data, senders, _)| {
                    let mut senders = senders
                        .into_iter()
                        .map(|mut x| x.to_sender())
                        .collect::<Vec<_>>();
                    let send = if senders.is_empty() {
                        None
                    } else {
                        Some(senders.remove(0))
                    };
                    call_c(data, send);
                });
                if let Err(e) = r {
                    error!("recver: {:?}", e);
                    sleep_1kms();
                }
            });
        }
    }

    /// 发送数据
    fn send<T: Into<Vec<u8>>>(&self, data: T) -> R {
        let lock = self
            .sender
            .lock()
            .map_err(|e| format!("lock error: {e:?}"))?;
        match lock.as_ref() {
            Some(v) => v.send_data(data),
            None => {
                return Err(format!("sender is none"))?;
            }
        }
    }

    fn send_res<T: Into<Vec<u8>>>(&self, data: T) -> R<Vec<u8>> {
        let lock = self
            .sender
            .lock()
            .map_err(|e| format!("lock error: {e:?}"))?;
        match lock.as_ref() {
            Some(v) => v.send_data_res(data),
            None => {
                return Err(format!("sender is none"))?;
            }
        }
    }
}