potatonet_common/
bus_message.rs1use crate::{LocalServiceId, NodeId, ServiceId};
2use anyhow::Result;
3use async_std::net::TcpStream;
4use bytes::Bytes;
5use futures::channel::mpsc::{Receiver, Sender};
6use futures::prelude::*;
7use std::io::Cursor;
8use std::sync::Arc;
9
10const MAX_DATA_SIZE: usize = 1024 * 1024;
11
12#[derive(Serialize, Deserialize, Debug)]
13pub enum Message {
14 Ping,
16
17 Bye,
19
20 Hello(NodeId),
22
23 RegisterService { name: String, id: LocalServiceId },
25
26 UnregisterService { id: LocalServiceId },
28
29 Req {
31 seq: u32,
32 from: LocalServiceId,
33 to_service: String,
34 method: u32,
35 data: Bytes,
36 },
37
38 XReq {
40 from: ServiceId,
41 to: LocalServiceId,
42 seq: u32,
43 method: u32,
44 data: Bytes,
45 },
46
47 Rep {
49 seq: u32,
50 result: Result<Bytes, String>,
51 },
52
53 Notify {
55 from: LocalServiceId,
56 to_service: String,
57 method: u32,
58 data: Bytes,
59 },
60
61 NotifyTo {
63 from: LocalServiceId,
64 to: ServiceId,
65 method: u32,
66 data: Bytes,
67 },
68
69 XNotify {
71 from: ServiceId,
72 to_service: String,
73 method: u32,
74 data: Bytes,
75 },
76
77 XNotifyTo {
79 from: ServiceId,
80 to: LocalServiceId,
81 method: u32,
82 data: Bytes,
83 },
84
85 Subscribe { topic: String },
87
88 Unsubscribe { topic: String },
90
91 Publish { topic: String, data: Bytes },
93
94 XPublish { topic: String, data: Bytes },
96}
97
98async fn read_message<R: AsyncRead + Unpin>(mut r: R, buf: &mut Vec<u8>) -> Result<Message> {
99 let mut len = [0u8; 4];
100 r.read_exact(&mut len).await?;
101 let data_size = u32::from_le_bytes(len) as usize;
102 if data_size > MAX_DATA_SIZE {
103 bail!("data length exceeding the limit");
104 }
105 buf.resize(data_size, 0);
106 r.read_exact(buf).await?;
107 let msg: Message = rmp_serde::from_read(Cursor::new(&buf))?;
108 Ok(msg)
109}
110
111async fn write_message<W: AsyncWrite + Unpin>(
112 mut w: W,
113 msg: &Message,
114 buf: &mut Vec<u8>,
115) -> Result<()> {
116 buf.clear();
117 rmp_serde::encode::write(buf, &msg)?;
118 w.write(&(buf.len() as u32).to_le_bytes()).await?;
119 w.write(&buf).await?;
120 Ok(())
121}
122
123pub async fn read_one_message<R: AsyncRead + Unpin>(r: R) -> Result<Message> {
124 let mut buf = Vec::new();
125 read_message(r, &mut buf).await
126}
127
128pub async fn read_messages(stream: Arc<TcpStream>, mut tx: Sender<Message>) {
129 let mut buf = Vec::with_capacity(1024);
130 while let Ok(msg) = read_message(&*stream, &mut buf).await {
131 if let Err(_) = tx.send(msg).await {
132 break;
134 }
135 }
136}
137
138pub async fn write_messages(stream: Arc<TcpStream>, mut rx: Receiver<Message>) {
139 let mut buf = Vec::with_capacity(1024);
140
141 while let Some(msg) = rx.next().await {
142 if let Err(_) = write_message(&*stream, &msg, &mut buf).await {
143 break;
145 }
146 }
147}