serf_rpc/
protocol.rs

1use std::{
2    collections::HashMap,
3    net::{IpAddr, Ipv4Addr, Ipv6Addr},
4};
5
6use serde::{Deserialize, Serialize};
7
8use crate::{RPCResponse, RPCResult};
9
10#[derive(Serialize)]
11#[serde(rename_all = "PascalCase")]
12pub(crate) struct RequestHeader {
13    pub seq: u64,
14    pub command: &'static str,
15}
16
17#[derive(Deserialize)]
18#[serde(rename_all = "PascalCase")]
19pub(crate) struct ResponseHeader {
20    pub seq: u64,
21    pub error: String,
22}
23
24macro_rules! count {
25    () => { 0 };
26    ($item:tt) => {1};
27    ($item:tt$(, $rest:tt)+) => { count!( $($rest),+ ) + 1 }
28}
29
30macro_rules! cmd_arg {
31    (
32        $buf:expr,
33        $($key:literal: $val:expr),*
34    ) => {{
35        let len: u32 = count!( $($key),* );
36
37        rmp::encode::write_map_len($buf, len).unwrap();
38        $(
39            rmp::encode::write_str($buf, $key).unwrap();
40            rmp_serde::encode::write_named($buf, $val).unwrap();
41        )*
42    }};
43}
44
45macro_rules! req {
46    (
47        $name:literal
48        $(#[$meta:meta])*
49        $vis:vis $ident:ident( $($arg:ident: $arg_ty:ty),* ) -> $res:ty $({
50            $($key:literal: $val:expr),*
51        })?
52    ) => {
53        impl crate::Client {
54            $(#[$meta])*
55            $vis fn $ident<'a>(&'a self$(, $arg: $arg_ty)*) -> crate::RPCRequest<'a, $res> {
56                #[allow(unused_mut)]
57                let mut buf = Vec::new();
58
59                $(cmd_arg! { &mut buf, $($key: $val),* };)?
60
61                self.request($name, buf)
62            }
63        }
64    };
65}
66
67macro_rules! stream {
68    (
69        $name:literal
70
71        $vis:vis $ident:ident( $($arg:ident: $arg_ty:ty),* ) -> $res:ty $({
72            $($key:literal: $val:expr),*
73        })?
74    ) => {
75        impl crate::Client {
76            $vis fn $ident(self: &std::sync::Arc<Self>$(, $arg: $arg_ty)*) -> crate::RPCStream<$res> {
77                #[allow(unused_mut)]
78                let mut buf = Vec::new();
79
80                $(cmd_arg! { &mut buf, $($key: $val),* };)?
81
82                self.start_stream($name, buf)
83            }
84        }
85    };
86}
87
88macro_rules! res {
89    ($ty:ty) => {
90        impl RPCResponse for $ty {
91            fn read_from(read: crate::SeqRead<'_>) -> RPCResult<Self> {
92                Ok(read.read_msg())
93            }
94        }
95    };
96}
97
98req! {
99    "handshake"
100    /// Send a handshake
101    pub(crate) handshake(version: u32) -> () {
102        "Version": &version
103    }
104}
105
106req! {
107    "auth"
108    /// Send an auth key
109    pub(crate) auth(auth_key: &str) -> () {
110        "AuthKey": auth_key
111    }
112}
113
114req! {
115    "event"
116    /// Fire an event
117    pub fire_event(name: &str, payload: &[u8], coalesce: bool) -> () {
118        "Name": name,
119        "Payload": payload,
120        "Coalesce": &coalesce
121    }
122}
123
124req! {
125    "force-leave"
126    /// Force a node to leave
127    pub force_leave(node: &str) -> () {
128        "Node": node
129    }
130}
131
132#[derive(Deserialize, Debug)]
133pub struct JoinResponse {
134    #[serde(rename = "Num")]
135    pub nodes_joined: u64,
136}
137
138res!(JoinResponse);
139
140req! {
141    "join"
142    /// Join a serf cluster, given existing ip addresses. `replay` controls whether to replay old user events
143    pub join(existing: &[&str], replay: bool) -> JoinResponse {
144        "Existing": existing,
145        "Replay": &replay
146    }
147}
148
149#[derive(Deserialize, Debug)]
150#[serde(rename_all = "PascalCase")]
151pub struct Member {
152    pub name: String,
153    #[serde(deserialize_with = "deserialize_ip_addr")]
154    pub addr: IpAddr,
155    pub port: u32,
156    pub tags: HashMap<String, String>,
157    pub status: String,
158    pub protocol_min: u32,
159    pub protocol_max: u32,
160    pub protocol_cur: u32,
161    pub delegate_max: u32,
162    pub delegate_min: u32,
163    pub delegate_cur: u32,
164}
165
166fn deserialize_ip_addr<'de, D>(de: D) -> Result<IpAddr, D::Error>
167where
168    D: serde::Deserializer<'de>,
169{
170    let addr = Ipv6Addr::from(<u128 as serde::Deserialize>::deserialize(de)?);
171
172    // serf gives us ipv6 ips, with ipv4 addresses mapped to ipv6.
173    // https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses
174    //
175    // based on std's unstable to_ipv4_mapped()
176    let addr = match addr.octets() {
177        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
178            IpAddr::V4(Ipv4Addr::new(a, b, c, d))
179        }
180        _ => IpAddr::V6(addr),
181    };
182
183    Ok(addr)
184}
185
186#[derive(Deserialize, Debug)]
187#[serde(rename_all = "PascalCase")]
188pub struct MembersResponse {
189    pub members: Vec<Member>,
190}
191
192res!(MembersResponse);
193
194req! {
195    "members"
196    /// Returns a list of all known members
197    pub members() -> MembersResponse
198}
199
200req! {
201    "members-filtered"
202    /// Returns a filtered list of all known members
203    pub members_filtered(status: Option<&str>, name: Option<&str>, tags: Option<&HashMap<String, String>>) -> MembersResponse {
204        "Status": &status,
205        "Name": &name,
206        "Tags": &tags
207    }
208}
209
210req! {
211    "tags"
212    /// Modifies the tags of the current node
213    pub tags(add_tags: &[&str], delete_tags: &[&str]) -> MembersResponse {
214        "Tags": add_tags,
215        "DeleteTags": delete_tags
216    }
217}
218
219req! {
220    "stop"
221    /// Stops a stream by seq id (this is automatically called on Drop by the RPCStream struct)
222    pub(crate) stop_stream(seq: u64) -> () {
223        "Stop": &seq
224    }
225}
226
227req! {
228    "leave"
229    /// Gracefully leave
230    pub leave() -> ()
231}
232
233req! {
234    "respond"
235    /// Response to a query
236    pub query_respond(id: u64, payload: &[u8]) -> () {
237        "ID": &id,
238        "Payload": payload
239    }
240}
241
242#[derive(Deserialize, Debug)]
243#[serde(rename_all = "PascalCase")]
244pub struct Coordinate {
245    pub adjustment: f32,
246    pub error: f32,
247    pub height: f32,
248    pub vec: [f32; 8],
249}
250
251#[derive(Deserialize, Debug)]
252#[serde(rename_all = "PascalCase")]
253pub struct CoordinateResponse {
254    pub ok: bool,
255
256    #[serde(default)]
257    pub coord: Option<Coordinate>,
258}
259
260res!(CoordinateResponse);
261
262req! {
263    "get-coordinate"
264    /// Get a node's coordinate
265    pub get_coordinate(node: &str) -> CoordinateResponse {
266        "Node": node
267    }
268}
269
270#[derive(Deserialize, Debug)]
271pub struct Agent {
272    pub name: String,
273}
274
275#[derive(Deserialize, Debug)]
276pub struct RuntimeInfo {
277    pub os: String,
278    pub arch: String,
279    pub version: String,
280    pub max_procs: String,
281    pub goroutines: String,
282    pub cpu_count: String,
283}
284
285#[derive(Deserialize, Debug)]
286pub struct SerfInfo {
287    pub failed: String,
288    pub left: String,
289    pub event_time: String,
290    pub query_time: String,
291    pub event_queue: String,
292    pub members: String,
293    pub member_time: String,
294    pub intent_queue: String,
295    pub query_queue: String,
296}
297
298#[derive(Deserialize, Debug)]
299pub struct AgentStats {
300    pub agent: Agent,
301    pub runtime: RuntimeInfo,
302    pub serf: SerfInfo,
303    pub tags: HashMap<String, String>,
304}
305
306res!(AgentStats);
307
308req! {
309    "stats"
310    /// Get information about the Serf agent.
311    pub stats() -> AgentStats
312}
313
314// TODO: STREAM, MONITOR, QUERY
315
316#[derive(Deserialize, Debug)]
317#[serde(tag = "Event")]
318pub enum StreamMessage {
319    #[serde(rename = "user")]
320    User {
321        #[serde(rename = "LTime")]
322        ltime: u64,
323        #[serde(rename = "Name")]
324        name: String,
325        #[serde(rename = "Payload")]
326        payload: Vec<u8>,
327        #[serde(rename = "Coalesce")]
328        coalesce: bool,
329    },
330    #[serde(rename = "member-join")]
331    MemberJoin {
332        #[serde(rename = "Members")]
333        members: Vec<Member>,
334    },
335    Query {
336        #[serde(rename = "ID")]
337        id: u64,
338        #[serde(rename = "LTime")]
339        ltime: u64,
340        #[serde(rename = "Name")]
341        name: String,
342        #[serde(rename = "Payload")]
343        payload: Vec<u8>,
344    },
345}
346res!(StreamMessage);
347
348stream! {
349    "stream"
350    pub stream(ty: &str) -> StreamMessage {
351        "Type": ty
352    }
353}
354
355// TODO: query