1use std::{
2 collections::HashMap,
3 net::{IpAddr, Ipv4Addr, Ipv6Addr},
4};
5
6use serde::{Deserialize, Serialize};
7
8use crate::{RPCResponse, RPCResult};
9
10#[derive(Serialize)]
11#[serde(rename_all = "PascalCase")]
12pub(crate) struct RequestHeader {
13 pub seq: u64,
14 pub command: &'static str,
15}
16
17#[derive(Deserialize)]
18#[serde(rename_all = "PascalCase")]
19pub(crate) struct ResponseHeader {
20 pub seq: u64,
21 pub error: String,
22}
23
24macro_rules! count {
25 () => { 0 };
26 ($item:tt) => {1};
27 ($item:tt$(, $rest:tt)+) => { count!( $($rest),+ ) + 1 }
28}
29
30macro_rules! cmd_arg {
31 (
32 $buf:expr,
33 $($key:literal: $val:expr),*
34 ) => {{
35 let len: u32 = count!( $($key),* );
36
37 rmp::encode::write_map_len($buf, len).unwrap();
38 $(
39 rmp::encode::write_str($buf, $key).unwrap();
40 rmp_serde::encode::write_named($buf, $val).unwrap();
41 )*
42 }};
43}
44
45macro_rules! req {
46 (
47 $name:literal
48 $(#[$meta:meta])*
49 $vis:vis $ident:ident( $($arg:ident: $arg_ty:ty),* ) -> $res:ty $({
50 $($key:literal: $val:expr),*
51 })?
52 ) => {
53 impl crate::Client {
54 $(#[$meta])*
55 $vis fn $ident<'a>(&'a self$(, $arg: $arg_ty)*) -> crate::RPCRequest<'a, $res> {
56 #[allow(unused_mut)]
57 let mut buf = Vec::new();
58
59 $(cmd_arg! { &mut buf, $($key: $val),* };)?
60
61 self.request($name, buf)
62 }
63 }
64 };
65}
66
67macro_rules! stream {
68 (
69 $name:literal
70
71 $vis:vis $ident:ident( $($arg:ident: $arg_ty:ty),* ) -> $res:ty $({
72 $($key:literal: $val:expr),*
73 })?
74 ) => {
75 impl crate::Client {
76 $vis fn $ident(self: &std::sync::Arc<Self>$(, $arg: $arg_ty)*) -> crate::RPCStream<$res> {
77 #[allow(unused_mut)]
78 let mut buf = Vec::new();
79
80 $(cmd_arg! { &mut buf, $($key: $val),* };)?
81
82 self.start_stream($name, buf)
83 }
84 }
85 };
86}
87
88macro_rules! res {
89 ($ty:ty) => {
90 impl RPCResponse for $ty {
91 fn read_from(read: crate::SeqRead<'_>) -> RPCResult<Self> {
92 Ok(read.read_msg())
93 }
94 }
95 };
96}
97
98req! {
99 "handshake"
100 pub(crate) handshake(version: u32) -> () {
102 "Version": &version
103 }
104}
105
106req! {
107 "auth"
108 pub(crate) auth(auth_key: &str) -> () {
110 "AuthKey": auth_key
111 }
112}
113
114req! {
115 "event"
116 pub fire_event(name: &str, payload: &[u8], coalesce: bool) -> () {
118 "Name": name,
119 "Payload": payload,
120 "Coalesce": &coalesce
121 }
122}
123
124req! {
125 "force-leave"
126 pub force_leave(node: &str) -> () {
128 "Node": node
129 }
130}
131
132#[derive(Deserialize, Debug)]
133pub struct JoinResponse {
134 #[serde(rename = "Num")]
135 pub nodes_joined: u64,
136}
137
138res!(JoinResponse);
139
140req! {
141 "join"
142 pub join(existing: &[&str], replay: bool) -> JoinResponse {
144 "Existing": existing,
145 "Replay": &replay
146 }
147}
148
149#[derive(Deserialize, Debug)]
150#[serde(rename_all = "PascalCase")]
151pub struct Member {
152 pub name: String,
153 #[serde(deserialize_with = "deserialize_ip_addr")]
154 pub addr: IpAddr,
155 pub port: u32,
156 pub tags: HashMap<String, String>,
157 pub status: String,
158 pub protocol_min: u32,
159 pub protocol_max: u32,
160 pub protocol_cur: u32,
161 pub delegate_max: u32,
162 pub delegate_min: u32,
163 pub delegate_cur: u32,
164}
165
166fn deserialize_ip_addr<'de, D>(de: D) -> Result<IpAddr, D::Error>
167where
168 D: serde::Deserializer<'de>,
169{
170 let addr = Ipv6Addr::from(<u128 as serde::Deserialize>::deserialize(de)?);
171
172 let addr = match addr.octets() {
177 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
178 IpAddr::V4(Ipv4Addr::new(a, b, c, d))
179 }
180 _ => IpAddr::V6(addr),
181 };
182
183 Ok(addr)
184}
185
186#[derive(Deserialize, Debug)]
187#[serde(rename_all = "PascalCase")]
188pub struct MembersResponse {
189 pub members: Vec<Member>,
190}
191
192res!(MembersResponse);
193
194req! {
195 "members"
196 pub members() -> MembersResponse
198}
199
200req! {
201 "members-filtered"
202 pub members_filtered(status: Option<&str>, name: Option<&str>, tags: Option<&HashMap<String, String>>) -> MembersResponse {
204 "Status": &status,
205 "Name": &name,
206 "Tags": &tags
207 }
208}
209
210req! {
211 "tags"
212 pub tags(add_tags: &[&str], delete_tags: &[&str]) -> MembersResponse {
214 "Tags": add_tags,
215 "DeleteTags": delete_tags
216 }
217}
218
219req! {
220 "stop"
221 pub(crate) stop_stream(seq: u64) -> () {
223 "Stop": &seq
224 }
225}
226
227req! {
228 "leave"
229 pub leave() -> ()
231}
232
233req! {
234 "respond"
235 pub query_respond(id: u64, payload: &[u8]) -> () {
237 "ID": &id,
238 "Payload": payload
239 }
240}
241
242#[derive(Deserialize, Debug)]
243#[serde(rename_all = "PascalCase")]
244pub struct Coordinate {
245 pub adjustment: f32,
246 pub error: f32,
247 pub height: f32,
248 pub vec: [f32; 8],
249}
250
251#[derive(Deserialize, Debug)]
252#[serde(rename_all = "PascalCase")]
253pub struct CoordinateResponse {
254 pub ok: bool,
255
256 #[serde(default)]
257 pub coord: Option<Coordinate>,
258}
259
260res!(CoordinateResponse);
261
262req! {
263 "get-coordinate"
264 pub get_coordinate(node: &str) -> CoordinateResponse {
266 "Node": node
267 }
268}
269
270#[derive(Deserialize, Debug)]
271pub struct Agent {
272 pub name: String,
273}
274
275#[derive(Deserialize, Debug)]
276pub struct RuntimeInfo {
277 pub os: String,
278 pub arch: String,
279 pub version: String,
280 pub max_procs: String,
281 pub goroutines: String,
282 pub cpu_count: String,
283}
284
285#[derive(Deserialize, Debug)]
286pub struct SerfInfo {
287 pub failed: String,
288 pub left: String,
289 pub event_time: String,
290 pub query_time: String,
291 pub event_queue: String,
292 pub members: String,
293 pub member_time: String,
294 pub intent_queue: String,
295 pub query_queue: String,
296}
297
298#[derive(Deserialize, Debug)]
299pub struct AgentStats {
300 pub agent: Agent,
301 pub runtime: RuntimeInfo,
302 pub serf: SerfInfo,
303 pub tags: HashMap<String, String>,
304}
305
306res!(AgentStats);
307
308req! {
309 "stats"
310 pub stats() -> AgentStats
312}
313
314#[derive(Deserialize, Debug)]
317#[serde(tag = "Event")]
318pub enum StreamMessage {
319 #[serde(rename = "user")]
320 User {
321 #[serde(rename = "LTime")]
322 ltime: u64,
323 #[serde(rename = "Name")]
324 name: String,
325 #[serde(rename = "Payload")]
326 payload: Vec<u8>,
327 #[serde(rename = "Coalesce")]
328 coalesce: bool,
329 },
330 #[serde(rename = "member-join")]
331 MemberJoin {
332 #[serde(rename = "Members")]
333 members: Vec<Member>,
334 },
335 Query {
336 #[serde(rename = "ID")]
337 id: u64,
338 #[serde(rename = "LTime")]
339 ltime: u64,
340 #[serde(rename = "Name")]
341 name: String,
342 #[serde(rename = "Payload")]
343 payload: Vec<u8>,
344 },
345}
346res!(StreamMessage);
347
348stream! {
349 "stream"
350 pub stream(ty: &str) -> StreamMessage {
351 "Type": ty
352 }
353}
354
355