1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//!
//! # Commiuncation With Client
//!

use crate::{def::*, POOL, SOCK};
use flate2::{write::ZlibEncoder, Compression};
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use futures_timer::Delay;
use myutil::{err::*, *};
use serde::Serialize;
use std::{io::Write, net::SocketAddr, time::Duration};

/// 生成异步框架底层的线程池
pub(crate) fn gen_thread_pool(n: Option<u8>) -> Result<ThreadPool> {
    ThreadPoolBuilder::new()
        .pool_size(n.map(|n| n as usize).unwrap_or_else(num_cpus::get))
        .create()
        .c(d!())
}

/// 回送成功信息
#[macro_export(crate)]
macro_rules! send_ok {
    ($uuid: expr, $msg: expr, $peeraddr: expr) => {
        $crate::util::send_back(
            $crate::util::gen_resp_ok($uuid, $msg),
            $peeraddr,
        )
    };
}

/// 生成标志'成功'的回复体
pub(crate) fn gen_resp_ok(uuid: u64, msg: impl Serialize) -> Resp {
    Resp {
        uuid,
        status: RetStatus::Success,
        msg: info!(serde_json::to_vec(&msg)).unwrap_or_default(),
    }
}

/// 回送失败信息
#[macro_export(crate)]
macro_rules! send_err {
    ($uuid: expr, $err: expr, $peeraddr: expr) => {{
        let log = genlog($err);
        $crate::util::send_back(
            $crate::util::gen_resp_err($uuid, &log),
            $peeraddr,
        )
        .c(d!(&log))
        .map(|_| p(eg!(log)))
    }};
}

/// 生成标志'出错'的回复体
pub(crate) fn gen_resp_err(uuid: u64, msg: &str) -> Resp {
    Resp {
        uuid,
        status: RetStatus::Fail,
        msg: msg.as_bytes().to_vec(),
    }
}

/// 回送信息
#[inline(always)]
pub(crate) fn send_back(resp: Resp, peeraddr: SocketAddr) -> Result<()> {
    serde_json::to_vec(&resp)
        .c(d!())
        .and_then(|resp| {
            let mut en = ZlibEncoder::new(Vec::new(), Compression::default());
            en.write_all(&resp[..]).c(d!())?;
            en.finish().c(d!())
        })
        .map(|resp_compressed| {
            POOL.spawn_ok(async move {
                info_omit!(SOCK.send_to(&resp_compressed, peeraddr));
            });
        })
}

/// 异步 sleep
pub(crate) async fn asleep(sec: u64) {
    Delay::new(Duration::from_secs(sec)).await;
}