zk_4lw/
lib.rs

1// Copyright 2018 OneSignal, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! ZooKeeper "For Letter Word" commands
16//!
17//! Provides a high-level TCP client for monitoring ZooKeeper
18#[macro_use]
19extern crate failure;
20
21use std::io::{self, Write, Read};
22use std::net::TcpStream;
23use std::num;
24use std::str::Utf8Error;
25
26pub struct Client {
27    addr: String,
28}
29
30pub trait FourLetterWord {
31    type Response;
32    fn command() -> &'static str;
33    fn parse_response(_: &str) -> Result<Self::Response>;
34}
35
36#[derive(Debug, Fail)]
37pub enum Error {
38    #[fail(display = "Failed to parse integer: {}", _0)]
39    Parse(#[cause] num::ParseIntError),
40
41    #[fail(display = "Field missing from response: {}", _0)]
42    MissingField(&'static str),
43
44    #[fail(display = "Encountered I/O error: {}", _0)]
45    Io(#[cause] io::Error),
46
47    #[fail(display = "Response wasn't valid UTF-8: {}", _0)]
48    Utf8(#[cause] Utf8Error),
49}
50
51impl From<num::ParseIntError> for Error {
52    fn from(val: num::ParseIntError) -> Error {
53        Error::Parse(val)
54    }
55}
56
57impl From<io::Error> for Error {
58    fn from(val: io::Error) -> Error {
59        Error::Io(val)
60    }
61}
62
63impl From<Utf8Error> for Error {
64    fn from(val: Utf8Error) -> Error {
65        Error::Utf8(val)
66    }
67}
68
69pub type Result<T> = ::std::result::Result<T, Error>;
70
71impl Client {
72    pub fn new<S: Into<String>>(addr: S) -> Self {
73        Self { addr: addr.into() }
74    }
75
76    pub fn exec<F>(&self) -> Result<F::Response>
77        where F: FourLetterWord
78    {
79        let mut stream = TcpStream::connect(&self.addr)?;
80        stream.write_all(F::command().as_bytes())?;
81        let mut buf = Vec::new();
82        stream.read_to_end(&mut buf)?;
83        let s = ::std::str::from_utf8(&buf)?;
84
85        F::parse_response(s)
86    }
87}
88
89/// The "mntr" command
90pub use mntr::Mntr;
91
92pub mod mntr {
93    use std::collections::HashMap;
94
95    use super::{Result, Error};
96
97    pub struct Mntr;
98
99    /// Response to the `mntr` command
100    ///
101    /// The fields here are what's defined in the docs as of 2018/02/23.
102    /// Additional fields are stored as strings within zk_extras.
103    #[derive(Debug)]
104    pub struct Response {
105        pub zk_version: String,
106        pub zk_avg_latency: i64,
107        pub zk_max_latency: i64,
108        pub zk_min_latency: i64,
109        pub zk_packets_received: i64,
110        pub zk_packets_sent: i64,
111        pub zk_outstanding_requests: i64,
112        pub zk_server_state: String,
113        pub zk_znode_count: i64,
114        pub zk_watch_count: i64,
115        pub zk_ephemerals_count: i64,
116        pub zk_approximate_data_size: i64,
117        pub zk_followers: Option<i64>,
118        pub zk_synced_followers: Option<i64>,
119        pub zk_pending_syncs: Option<i64>,
120        pub zk_open_file_descriptor_count: Option<i64>,
121        pub zk_max_file_descriptor_count: Option<i64>,
122        pub zk_extras: HashMap<String, String>,
123    }
124
125    impl ::FourLetterWord for Mntr {
126        type Response = Response;
127
128        fn command() -> &'static str { "mntr" }
129
130        // XXX (jwilm) This method could be dramatically simplified if there
131        //             were a serde deserializer for
132        //             "value\t *key\nvalue\t *key\n..."
133        fn parse_response(response: &str) -> Result<Self::Response> {
134            let mut zk_version: Option<String> = None;
135            let mut zk_avg_latency: Option<i64> = None;
136            let mut zk_max_latency: Option<i64> = None;
137            let mut zk_min_latency: Option<i64> = None;
138            let mut zk_packets_received: Option<i64> = None;
139            let mut zk_packets_sent: Option<i64> = None;
140            let mut zk_outstanding_requests: Option<i64> = None;
141            let mut zk_server_state: Option<String> = None;
142            let mut zk_znode_count: Option<i64> = None;
143            let mut zk_watch_count: Option<i64> = None;
144            let mut zk_ephemerals_count: Option<i64> = None;
145            let mut zk_approximate_data_size: Option<i64> = None;
146            let mut zk_followers: Option<i64> = None;
147            let mut zk_synced_followers: Option<i64> = None;
148            let mut zk_pending_syncs: Option<i64> = None;
149            let mut zk_open_file_descriptor_count: Option<i64> = None;
150            let mut zk_max_file_descriptor_count: Option<i64> = None;
151            let mut zk_extras = HashMap::new();
152
153            let lines = response.lines();
154
155            for line in lines {
156                let mut iter = line.split('\t');
157                match (iter.next().map(|s| s.trim()), iter.next().map(|s| s.trim())) {
158                    (Some(key), Some(value)) => {
159                        match key {
160                            "zk_version" => zk_version = Some(value.into()),
161                            "zk_avg_latency" => zk_avg_latency = Some(value.parse()?),
162                            "zk_max_latency" => zk_max_latency = Some(value.parse()?),
163                            "zk_min_latency" => zk_min_latency = Some(value.parse()?),
164                            "zk_packets_received" => zk_packets_received = Some(value.parse()?),
165                            "zk_packets_sent" => zk_packets_sent = Some(value.parse()?),
166                            "zk_outstanding_requests" => zk_outstanding_requests = Some(value.parse()?),
167                            "zk_server_state" => zk_server_state = Some(value.into()),
168                            "zk_znode_count" => zk_znode_count = Some(value.parse()?),
169                            "zk_watch_count" => zk_watch_count = Some(value.parse()?),
170                            "zk_ephemerals_count" => zk_ephemerals_count = Some(value.parse()?),
171                            "zk_approximate_data_size" => zk_approximate_data_size = Some(value.parse()?),
172                            "zk_followers" => zk_followers = Some(value.parse()?),
173                            "zk_synced_followers" => zk_synced_followers = Some(value.parse()?),
174                            "zk_pending_syncs" => zk_pending_syncs = Some(value.parse()?),
175                            "zk_open_file_descriptor_count" => zk_open_file_descriptor_count = Some(value.parse()?),
176                            "zk_max_file_descriptor_count" => zk_max_file_descriptor_count = Some(value.parse()?),
177                            _ => { zk_extras.insert(key.into(), value.into()); },
178                        }
179                    },
180                    _ => break,
181                }
182            }
183
184            macro_rules! error_if_none {
185                ($($name:ident)*) => {
186                    $(
187                        match $name {
188                            Some(v) => v,
189                            None => return Err(Error::MissingField(stringify!($name))),
190                        }
191                    )*
192                }
193            }
194
195            Ok(Response {
196                zk_version: error_if_none!(zk_version),
197                zk_avg_latency: error_if_none!(zk_avg_latency),
198                zk_max_latency: error_if_none!(zk_max_latency),
199                zk_min_latency: error_if_none!(zk_min_latency),
200                zk_packets_received: error_if_none!(zk_packets_received),
201                zk_packets_sent: error_if_none!(zk_packets_sent),
202                zk_outstanding_requests: error_if_none!(zk_outstanding_requests),
203                zk_server_state: error_if_none!(zk_server_state),
204                zk_znode_count: error_if_none!(zk_znode_count),
205                zk_watch_count: error_if_none!(zk_watch_count),
206                zk_ephemerals_count: error_if_none!(zk_ephemerals_count),
207                zk_approximate_data_size: error_if_none!(zk_approximate_data_size),
208                zk_followers,
209                zk_synced_followers,
210                zk_pending_syncs,
211                zk_open_file_descriptor_count,
212                zk_max_file_descriptor_count,
213                zk_extras
214            })
215        }
216    }
217}