1#[macro_use]
19extern crate failure;
20
21use std::io::{self, Write, Read};
22use std::net::TcpStream;
23use std::num;
24use std::str::Utf8Error;
25
26pub struct Client {
27 addr: String,
28}
29
30pub trait FourLetterWord {
31 type Response;
32 fn command() -> &'static str;
33 fn parse_response(_: &str) -> Result<Self::Response>;
34}
35
36#[derive(Debug, Fail)]
37pub enum Error {
38 #[fail(display = "Failed to parse integer: {}", _0)]
39 Parse(#[cause] num::ParseIntError),
40
41 #[fail(display = "Field missing from response: {}", _0)]
42 MissingField(&'static str),
43
44 #[fail(display = "Encountered I/O error: {}", _0)]
45 Io(#[cause] io::Error),
46
47 #[fail(display = "Response wasn't valid UTF-8: {}", _0)]
48 Utf8(#[cause] Utf8Error),
49}
50
51impl From<num::ParseIntError> for Error {
52 fn from(val: num::ParseIntError) -> Error {
53 Error::Parse(val)
54 }
55}
56
57impl From<io::Error> for Error {
58 fn from(val: io::Error) -> Error {
59 Error::Io(val)
60 }
61}
62
63impl From<Utf8Error> for Error {
64 fn from(val: Utf8Error) -> Error {
65 Error::Utf8(val)
66 }
67}
68
69pub type Result<T> = ::std::result::Result<T, Error>;
70
71impl Client {
72 pub fn new<S: Into<String>>(addr: S) -> Self {
73 Self { addr: addr.into() }
74 }
75
76 pub fn exec<F>(&self) -> Result<F::Response>
77 where F: FourLetterWord
78 {
79 let mut stream = TcpStream::connect(&self.addr)?;
80 stream.write_all(F::command().as_bytes())?;
81 let mut buf = Vec::new();
82 stream.read_to_end(&mut buf)?;
83 let s = ::std::str::from_utf8(&buf)?;
84
85 F::parse_response(s)
86 }
87}
88
89pub use mntr::Mntr;
91
92pub mod mntr {
93 use std::collections::HashMap;
94
95 use super::{Result, Error};
96
97 pub struct Mntr;
98
99 #[derive(Debug)]
104 pub struct Response {
105 pub zk_version: String,
106 pub zk_avg_latency: i64,
107 pub zk_max_latency: i64,
108 pub zk_min_latency: i64,
109 pub zk_packets_received: i64,
110 pub zk_packets_sent: i64,
111 pub zk_outstanding_requests: i64,
112 pub zk_server_state: String,
113 pub zk_znode_count: i64,
114 pub zk_watch_count: i64,
115 pub zk_ephemerals_count: i64,
116 pub zk_approximate_data_size: i64,
117 pub zk_followers: Option<i64>,
118 pub zk_synced_followers: Option<i64>,
119 pub zk_pending_syncs: Option<i64>,
120 pub zk_open_file_descriptor_count: Option<i64>,
121 pub zk_max_file_descriptor_count: Option<i64>,
122 pub zk_extras: HashMap<String, String>,
123 }
124
125 impl ::FourLetterWord for Mntr {
126 type Response = Response;
127
128 fn command() -> &'static str { "mntr" }
129
130 fn parse_response(response: &str) -> Result<Self::Response> {
134 let mut zk_version: Option<String> = None;
135 let mut zk_avg_latency: Option<i64> = None;
136 let mut zk_max_latency: Option<i64> = None;
137 let mut zk_min_latency: Option<i64> = None;
138 let mut zk_packets_received: Option<i64> = None;
139 let mut zk_packets_sent: Option<i64> = None;
140 let mut zk_outstanding_requests: Option<i64> = None;
141 let mut zk_server_state: Option<String> = None;
142 let mut zk_znode_count: Option<i64> = None;
143 let mut zk_watch_count: Option<i64> = None;
144 let mut zk_ephemerals_count: Option<i64> = None;
145 let mut zk_approximate_data_size: Option<i64> = None;
146 let mut zk_followers: Option<i64> = None;
147 let mut zk_synced_followers: Option<i64> = None;
148 let mut zk_pending_syncs: Option<i64> = None;
149 let mut zk_open_file_descriptor_count: Option<i64> = None;
150 let mut zk_max_file_descriptor_count: Option<i64> = None;
151 let mut zk_extras = HashMap::new();
152
153 let lines = response.lines();
154
155 for line in lines {
156 let mut iter = line.split('\t');
157 match (iter.next().map(|s| s.trim()), iter.next().map(|s| s.trim())) {
158 (Some(key), Some(value)) => {
159 match key {
160 "zk_version" => zk_version = Some(value.into()),
161 "zk_avg_latency" => zk_avg_latency = Some(value.parse()?),
162 "zk_max_latency" => zk_max_latency = Some(value.parse()?),
163 "zk_min_latency" => zk_min_latency = Some(value.parse()?),
164 "zk_packets_received" => zk_packets_received = Some(value.parse()?),
165 "zk_packets_sent" => zk_packets_sent = Some(value.parse()?),
166 "zk_outstanding_requests" => zk_outstanding_requests = Some(value.parse()?),
167 "zk_server_state" => zk_server_state = Some(value.into()),
168 "zk_znode_count" => zk_znode_count = Some(value.parse()?),
169 "zk_watch_count" => zk_watch_count = Some(value.parse()?),
170 "zk_ephemerals_count" => zk_ephemerals_count = Some(value.parse()?),
171 "zk_approximate_data_size" => zk_approximate_data_size = Some(value.parse()?),
172 "zk_followers" => zk_followers = Some(value.parse()?),
173 "zk_synced_followers" => zk_synced_followers = Some(value.parse()?),
174 "zk_pending_syncs" => zk_pending_syncs = Some(value.parse()?),
175 "zk_open_file_descriptor_count" => zk_open_file_descriptor_count = Some(value.parse()?),
176 "zk_max_file_descriptor_count" => zk_max_file_descriptor_count = Some(value.parse()?),
177 _ => { zk_extras.insert(key.into(), value.into()); },
178 }
179 },
180 _ => break,
181 }
182 }
183
184 macro_rules! error_if_none {
185 ($($name:ident)*) => {
186 $(
187 match $name {
188 Some(v) => v,
189 None => return Err(Error::MissingField(stringify!($name))),
190 }
191 )*
192 }
193 }
194
195 Ok(Response {
196 zk_version: error_if_none!(zk_version),
197 zk_avg_latency: error_if_none!(zk_avg_latency),
198 zk_max_latency: error_if_none!(zk_max_latency),
199 zk_min_latency: error_if_none!(zk_min_latency),
200 zk_packets_received: error_if_none!(zk_packets_received),
201 zk_packets_sent: error_if_none!(zk_packets_sent),
202 zk_outstanding_requests: error_if_none!(zk_outstanding_requests),
203 zk_server_state: error_if_none!(zk_server_state),
204 zk_znode_count: error_if_none!(zk_znode_count),
205 zk_watch_count: error_if_none!(zk_watch_count),
206 zk_ephemerals_count: error_if_none!(zk_ephemerals_count),
207 zk_approximate_data_size: error_if_none!(zk_approximate_data_size),
208 zk_followers,
209 zk_synced_followers,
210 zk_pending_syncs,
211 zk_open_file_descriptor_count,
212 zk_max_file_descriptor_count,
213 zk_extras
214 })
215 }
216 }
217}