sfo-cmd-server 0.3.1

command server implement
Documentation
mod client;

use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
use callback_result::CallbackWaiter;
pub use client::*;
use num::{FromPrimitive, ToPrimitive};
use sfo_pool::WorkerClassification;
use std::hash::Hash;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

mod classified_client;
pub use classified_client::*;

use crate::errors::CmdResult;
use crate::{CmdBody, CmdHandler, CmdTunnelMeta, PeerId, TunnelId};

pub trait CmdSend<M: CmdTunnelMeta>: Send + 'static {
    fn get_tunnel_meta(&self) -> Option<Arc<M>>;
    fn get_remote_peer_id(&self) -> PeerId;
}

pub trait SendGuard<M: CmdTunnelMeta, S: CmdSend<M>>: Send + 'static + Deref<Target = S> {}

#[async_trait::async_trait]
pub trait CmdClient<
    LEN: RawEncode
        + for<'a> RawDecode<'a>
        + Copy
        + RawFixedBytes
        + Sync
        + Send
        + 'static
        + FromPrimitive
        + ToPrimitive,
    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
    M: CmdTunnelMeta,
    S: CmdSend<M>,
    G: SendGuard<M, S>,
>: Send + Sync + 'static
{
    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
    async fn send(&self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
    async fn send_with_resp(
        &self,
        cmd: CMD,
        version: u8,
        body: &[u8],
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    async fn send_parts(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
    async fn send_parts_with_resp(
        &self,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    #[deprecated(note = "use send_parts instead")]
    async fn send2(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
        self.send_parts(cmd, version, body).await
    }
    #[deprecated(note = "use send_parts_with_resp instead")]
    async fn send2_with_resp(
        &self,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
        timeout: Duration,
    ) -> CmdResult<CmdBody> {
        self.send_parts_with_resp(cmd, version, body, timeout).await
    }
    async fn send_cmd(&self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
    async fn send_cmd_with_resp(
        &self,
        cmd: CMD,
        version: u8,
        body: CmdBody,
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    async fn send_by_specify_tunnel(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: &[u8],
    ) -> CmdResult<()>;
    async fn send_by_specify_tunnel_with_resp(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: &[u8],
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    async fn send_parts_by_specify_tunnel(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
    ) -> CmdResult<()>;
    async fn send_parts_by_specify_tunnel_with_resp(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    #[deprecated(note = "use send_parts_by_specify_tunnel instead")]
    async fn send2_by_specify_tunnel(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
    ) -> CmdResult<()> {
        self.send_parts_by_specify_tunnel(tunnel_id, cmd, version, body)
            .await
    }
    #[deprecated(note = "use send_parts_by_specify_tunnel_with_resp instead")]
    async fn send2_by_specify_tunnel_with_resp(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
        timeout: Duration,
    ) -> CmdResult<CmdBody> {
        self.send_parts_by_specify_tunnel_with_resp(tunnel_id, cmd, version, body, timeout)
            .await
    }
    async fn send_cmd_by_specify_tunnel(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: CmdBody,
    ) -> CmdResult<()>;
    async fn send_cmd_by_specify_tunnel_with_resp(
        &self,
        tunnel_id: TunnelId,
        cmd: CMD,
        version: u8,
        body: CmdBody,
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    async fn clear_all_tunnel(&self);
    async fn get_send(&self, tunnel_id: TunnelId) -> CmdResult<G>;
}

#[async_trait::async_trait]
pub trait ClassifiedCmdClient<
    LEN: RawEncode
        + for<'a> RawDecode<'a>
        + Copy
        + RawFixedBytes
        + Sync
        + Send
        + 'static
        + FromPrimitive
        + ToPrimitive,
    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
    C: WorkerClassification,
    M: CmdTunnelMeta,
    S: CmdSend<M>,
    G: SendGuard<M, S>,
>: CmdClient<LEN, CMD, M, S, G>
{
    async fn send_by_classified_tunnel(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: &[u8],
    ) -> CmdResult<()>;
    async fn send_by_classified_tunnel_with_resp(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: &[u8],
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    async fn send_parts_by_classified_tunnel(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
    ) -> CmdResult<()>;
    async fn send_parts_by_classified_tunnel_with_resp(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    #[deprecated(note = "use send_parts_by_classified_tunnel instead")]
    async fn send2_by_classified_tunnel(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
    ) -> CmdResult<()> {
        self.send_parts_by_classified_tunnel(classification, cmd, version, body)
            .await
    }
    #[deprecated(note = "use send_parts_by_classified_tunnel_with_resp instead")]
    async fn send2_by_classified_tunnel_with_resp(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: &[&[u8]],
        timeout: Duration,
    ) -> CmdResult<CmdBody> {
        self.send_parts_by_classified_tunnel_with_resp(classification, cmd, version, body, timeout)
            .await
    }
    async fn send_cmd_by_classified_tunnel(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: CmdBody,
    ) -> CmdResult<()>;
    async fn send_cmd_by_classified_tunnel_with_resp(
        &self,
        classification: C,
        cmd: CMD,
        version: u8,
        body: CmdBody,
        timeout: Duration,
    ) -> CmdResult<CmdBody>;
    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
}

pub(crate) type RespWaiter = CallbackWaiter<u128, CmdBody>;
pub(crate) type RespWaiterRef = Arc<RespWaiter>;

pub(crate) fn gen_resp_id<
    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static,
>(
    tunnel_id: TunnelId,
    cmd: CMD,
    seq: u32,
) -> u128 {
    let cmd_buf = cmd.raw_encode_to_buffer().unwrap();
    let mut cmd = cmd_buf.len() as u64;
    for chunk in cmd_buf.chunks(8) {
        let mut buf = [0u8; 8];
        buf[..chunk.len()].copy_from_slice(chunk);
        cmd = cmd.rotate_left(13) ^ u64::from_be_bytes(buf);
    }
    ((tunnel_id.value() as u128) << 96) | ((seq as u128) << 64) | (cmd as u128)
}

pub(crate) fn gen_seq() -> u32 {
    rand::random::<u32>()
}

#[cfg(test)]
mod tests {
    use super::gen_resp_id;
    use crate::TunnelId;

    #[test]
    fn resp_id_changes_with_seq() {
        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 1);
        let id2 = gen_resp_id(TunnelId::from(7), 0x11u8, 2);
        assert_ne!(id1, id2);
    }

    #[test]
    fn resp_id_changes_with_cmd() {
        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
        let id2 = gen_resp_id(TunnelId::from(7), 0x12u8, 5);
        assert_ne!(id1, id2);
    }

    #[test]
    fn resp_id_changes_with_tunnel() {
        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
        let id2 = gen_resp_id(TunnelId::from(8), 0x11u8, 5);
        assert_ne!(id1, id2);
    }

    #[test]
    fn resp_id_changes_with_long_cmd_suffix() {
        let id1 = gen_resp_id(
            TunnelId::from(7),
            0x1122_3344_5566_7788_0000_0000_0000_0001u128,
            5,
        );
        let id2 = gen_resp_id(
            TunnelId::from(7),
            0x1122_3344_5566_7788_0000_0000_0000_0002u128,
            5,
        );
        assert_ne!(id1, id2);
    }
}