use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
};
use serde::{Deserialize, Serialize};
use crate::{RPCResponse, RPCResult};
#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct RequestHeader {
pub seq: u64,
pub command: &'static str,
}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct ResponseHeader {
pub seq: u64,
pub error: String,
}
macro_rules! count {
() => { 0 };
($item:tt) => {1};
($item:tt$(, $rest:tt)+) => { count!( $($rest),+ ) + 1 }
}
macro_rules! cmd_arg {
(
$buf:expr,
$($key:literal: $val:expr),*
) => {{
let len: u32 = count!( $($key),* );
rmp::encode::write_map_len($buf, len).unwrap();
$(
rmp::encode::write_str($buf, $key).unwrap();
rmp_serde::encode::write_named($buf, $val).unwrap();
)*
}};
}
macro_rules! req {
(
$name:literal
$(#[$meta:meta])*
$vis:vis $ident:ident( $($arg:ident: $arg_ty:ty),* ) -> $res:ty $({
$($key:literal: $val:expr),*
})?
) => {
impl crate::Client {
$(#[$meta])*
$vis fn $ident<'a>(&'a self$(, $arg: $arg_ty)*) -> crate::RPCRequest<'a, $res> {
#[allow(unused_mut)]
let mut buf = Vec::new();
$(cmd_arg! { &mut buf, $($key: $val),* };)?
self.request($name, buf)
}
}
};
}
macro_rules! stream {
(
$name:literal
$vis:vis $ident:ident( $($arg:ident: $arg_ty:ty),* ) -> $res:ty $({
$($key:literal: $val:expr),*
})?
) => {
impl crate::Client {
$vis fn $ident(self: &std::sync::Arc<Self>$(, $arg: $arg_ty)*) -> crate::RPCStream<$res> {
#[allow(unused_mut)]
let mut buf = Vec::new();
$(cmd_arg! { &mut buf, $($key: $val),* };)?
self.start_stream($name, buf)
}
}
};
}
macro_rules! res {
($ty:ty) => {
impl RPCResponse for $ty {
fn read_from(read: crate::SeqRead<'_>) -> RPCResult<Self> {
Ok(read.read_msg())
}
}
};
}
req! {
"handshake"
pub(crate) handshake(version: u32) -> () {
"Version": &version
}
}
req! {
"auth"
pub(crate) auth(auth_key: &str) -> () {
"AuthKey": auth_key
}
}
req! {
"event"
pub fire_event(name: &str, payload: &[u8], coalesce: bool) -> () {
"Name": name,
"Payload": payload,
"Coalesce": &coalesce
}
}
req! {
"force-leave"
pub force_leave(node: &str) -> () {
"Node": node
}
}
#[derive(Deserialize, Debug)]
pub struct JoinResponse {
#[serde(rename = "Num")]
pub nodes_joined: u64,
}
res!(JoinResponse);
req! {
"join"
pub join(existing: &[&str], replay: bool) -> JoinResponse {
"Existing": existing,
"Replay": &replay
}
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct Member {
pub name: String,
#[serde(deserialize_with = "deserialize_ip_addr")]
pub addr: IpAddr,
pub port: u32,
pub tags: HashMap<String, String>,
pub status: String,
pub protocol_min: u32,
pub protocol_max: u32,
pub protocol_cur: u32,
pub delegate_max: u32,
pub delegate_min: u32,
pub delegate_cur: u32,
}
fn deserialize_ip_addr<'de, D>(de: D) -> Result<IpAddr, D::Error>
where
D: serde::Deserializer<'de>,
{
let addr = Ipv6Addr::from(<u128 as serde::Deserialize>::deserialize(de)?);
let addr = match addr.octets() {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
IpAddr::V4(Ipv4Addr::new(a, b, c, d))
}
_ => IpAddr::V6(addr),
};
Ok(addr)
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct MembersResponse {
pub members: Vec<Member>,
}
res!(MembersResponse);
req! {
"members"
pub members() -> MembersResponse
}
req! {
"members-filtered"
pub members_filtered(status: Option<&str>, name: Option<&str>, tags: Option<&HashMap<String, String>>) -> MembersResponse {
"Status": &status,
"Name": &name,
"Tags": &tags
}
}
req! {
"tags"
pub tags(add_tags: &[&str], delete_tags: &[&str]) -> MembersResponse {
"Tags": add_tags,
"DeleteTags": delete_tags
}
}
req! {
"stop"
pub(crate) stop_stream(seq: u64) -> () {
"Stop": &seq
}
}
req! {
"leave"
pub leave() -> ()
}
req! {
"respond"
pub query_respond(id: u64, payload: &[u8]) -> () {
"ID": &id,
"Payload": payload
}
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct Coordinate {
pub adjustment: f32,
pub error: f32,
pub height: f32,
pub vec: [f32; 8],
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct CoordinateResponse {
pub ok: bool,
#[serde(default)]
pub coord: Option<Coordinate>,
}
res!(CoordinateResponse);
req! {
"get-coordinate"
pub get_coordinate(node: &str) -> CoordinateResponse {
"Node": node
}
}
#[derive(Deserialize, Debug)]
pub struct Agent {
pub name: String,
}
#[derive(Deserialize, Debug)]
pub struct RuntimeInfo {
pub os: String,
pub arch: String,
pub version: String,
pub max_procs: String,
pub goroutines: String,
pub cpu_count: String,
}
#[derive(Deserialize, Debug)]
pub struct SerfInfo {
pub failed: String,
pub left: String,
pub event_time: String,
pub query_time: String,
pub event_queue: String,
pub members: String,
pub member_time: String,
pub intent_queue: String,
pub query_queue: String,
}
#[derive(Deserialize, Debug)]
pub struct AgentStats {
pub agent: Agent,
pub runtime: RuntimeInfo,
pub serf: SerfInfo,
pub tags: HashMap<String, String>,
}
res!(AgentStats);
req! {
"stats"
pub stats() -> AgentStats
}
#[derive(Deserialize, Debug)]
#[serde(tag = "Event")]
pub enum StreamMessage {
#[serde(rename = "user")]
User {
#[serde(rename = "LTime")]
ltime: u64,
#[serde(rename = "Name")]
name: String,
#[serde(rename = "Payload")]
payload: Vec<u8>,
#[serde(rename = "Coalesce")]
coalesce: bool,
},
#[serde(rename = "member-join")]
MemberJoin {
#[serde(rename = "Members")]
members: Vec<Member>,
},
Query {
#[serde(rename = "ID")]
id: u64,
#[serde(rename = "LTime")]
ltime: u64,
#[serde(rename = "Name")]
name: String,
#[serde(rename = "Payload")]
payload: Vec<u8>,
},
}
res!(StreamMessage);
stream! {
"stream"
pub stream(ty: &str) -> StreamMessage {
"Type": ty
}
}