lpc 1.0.2

Local Procedure Call for ipc channel impl
Documentation
use std::sync::Arc;

use crossbeam::channel::Receiver;
use fast_able::vec::SyncVec;
use ipc_channel::platform::{OsIpcOneShotServer, OsIpcSender};

use crate::{
    types::{IpcExt, LpcExt},
    utils::sleep_1kms,
    R,
};

pub struct LpcServer {
    #[allow(dead_code)]
    rt: std::thread::JoinHandle<()>,
    senders: Arc<SyncVec<OsIpcSender>>,
    recver: Receiver<(Vec<u8>, Vec<OsIpcSender>)>,
}

impl LpcServer {
    /// "./Server_key"
    pub fn new(key_file: impl Into<String>) -> LpcServer {
        // println!("Server key: {name}");
        // std::fs::write("./Server_key", name).unwrap();
        let key_file = key_file.into();
        let senders = Arc::new(SyncVec::new());

        let senders_c = senders.clone();
        let (sender, recver) = crossbeam::channel::unbounded();

        // 连接中断后,会重新连接
        let rt = std::thread::spawn(move || loop {
            let key_file_c = key_file.clone();
            let (server, key) = OsIpcOneShotServer::new().unwrap();
            std::fs::write(key_file_c, key).unwrap();

            // 等待连接
            let (recv, res, channels, _) = match server.accept() {
                Ok(s) => s,
                Err(e) => {
                    println!("Error: {:?}", e);
                    sleep_1kms();
                    continue;
                }
            };

            senders_c.clear();

            // 保存连接(发送者)
            for mut ele in channels {
                senders_c.push(ele.to_sender());
            }

            // 验证连接
            if res != b"Ready" {
                sleep_1kms();
                continue;
            }

            info!("lpc server connect succeed: {key_file}");

            // 循环接收数据
            loop {
                let d = recv.recv().map(|x| {
                    (
                        x.0,
                        x.1.into_iter()
                            .map(|mut x| x.to_sender())
                            .collect::<Vec<_>>(),
                    )
                });
                let d = match d {
                    Ok(d) => d,
                    Err(e) => {
                        println!("Error: {:?}", e);
                        sleep_1kms();
                        break;
                    }
                };
                if let Err(e) = sender.send(d) {
                    error!("sender.send: {:?}", e);
                }
            }
        });
        let s = Self {
            rt,
            senders,
            recver,
        };
        return s;
        // println!("Server key: {name}");
        // std::fs::write("./Server_key", name).unwrap();
    }
}

impl LpcExt for LpcServer {
    /// 接收数据
    fn recver<F: Fn(Vec<u8>, Option<OsIpcSender>) + Send + Clone + 'static>(&self, call: F) {
        let reves = self.recver.clone();
        std::thread::spawn(move || loop {
            let r = reves.recv().map(|x| {
                let (data, mut senders) = x;
                if b"PING" == &data[..] {
                    for ele in senders {
                        _ = ele.send(b"PONG", vec![], vec![]);
                    }
                    return;
                }
                let send = if senders.is_empty() {
                    None
                } else {
                    Some(senders.remove(0))
                };
                call(data, send);
            });
            if let Err(e) = r {
                error!("recver: {:?}", e);
                sleep_1kms();
            }
        });
    }

    /// 发送数据
    fn send<T: Into<Vec<u8>>>(&self, data: T) -> R {
        let data = data.into();
        match self.senders.get(0) {
            None => Err(format!("index: 0 not found"))?,
            Some(ele) => ele.send_data(data),
        }
    }

    fn send_res<T: Into<Vec<u8>>>(&self, data: T) -> R<Vec<u8>> {
        let data = data.into();
        match self.senders.get(0) {
            None => Err(format!("index: 0 not found"))?,
            Some(ele) => ele.send_data_res(data),
        }
    }
}

impl LpcServer {
    /// 发送数据, 指定发送者
    pub fn send_index<T: Into<Vec<u8>>>(&self, data: T, index: usize, is_res: bool) -> R<Vec<u8>> {
        let data = data.into();
        match self.senders.get(index) {
            None => Err(format!("index: {index} not found"))?,
            Some(ele) => {
                if is_res {
                    return ele.send_data_res(data)
                } else {
                    ele.send_data(data)?;
                    return Ok(vec![]);
                }
            }
        }
    }

    /// 发送数据, 给所有发送者
    pub fn send_all<T: Into<Vec<u8>> + Clone>(&self, data: T) -> R {
        for ele in self.senders.iter() {
            ele.send_data_res(data.clone())?;
        }
        return Ok(());
    }
}