1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use crate::{Event, LocalServiceId, NodeId, ServiceId};
use anyhow::Result;
use bytes::Bytes;
use futures::prelude::*;
use std::io::Cursor;

#[derive(Serialize, Deserialize, Debug)]
pub enum Message {
    /// 客户端发送ping
    Ping,

    /// 客户端注册节点
    RegisterNode(String),

    /// 服务端发送节点注册成功消息,并返回节点id
    NodeRegistered(NodeId),

    /// 客户端退出
    UnregisterNode,

    /// 注册服务
    RegisterService { name: String, id: LocalServiceId },

    /// 注销服务
    UnregisterService { id: LocalServiceId },

    /// 客户端发送请求
    Req {
        seq: u32,
        from: Option<LocalServiceId>,
        to_service: String,
        method: String,
        data: Bytes,
    },

    /// 服务端发送请求
    XReq {
        from: Option<ServiceId>,
        to: LocalServiceId,
        seq: u32,
        method: String,
        data: Bytes,
    },

    /// 服务端发送响应
    Rep {
        seq: u32,
        result: Result<Bytes, String>,
    },

    /// 客户端发送通知
    SendNotify {
        from: Option<LocalServiceId>,
        to_service: String,
        method: String,
        data: Bytes,
    },

    /// 服务端发送通知
    Notify {
        from: Option<ServiceId>,
        to_service: String,
        method: String,
        data: Bytes,
    },

    /// 系统事件
    Event { event: Event },
}

pub async fn read_message<R: AsyncRead + Unpin>(mut r: R) -> Result<Message> {
    let mut len = [0u8; 4];
    r.read_exact(&mut len).await?;
    let mut buf = Vec::with_capacity(u32::from_le_bytes(len) as usize);
    buf.resize(buf.capacity(), 0);
    r.read_exact(&mut buf).await?;
    let msg: Message = rmp_serde::from_read(Cursor::new(buf))?;
    Ok(msg)
}

pub async fn write_message<W: AsyncWrite + Unpin>(mut w: W, msg: &Message) -> Result<()> {
    let data = rmp_serde::to_vec(msg)?;
    w.write(&(data.len() as u32).to_le_bytes()).await?;
    w.write(&data).await?;
    Ok(())
}