use futures::future::ok;
use futures::{Async};
use prometheus::core::{AtomicU64, Atomic};
use std::cell::RefCell;
use std::net::{SocketAddr, TcpListener};
use std::rc::Rc;
use std::time::Duration;
use crate::ws_server_peer::http_serve::get_static_file_reply;
use super::{BoxedNewPeerFuture, Peer};
use super::{ConstructParams, PeerConstructor, Specifier};
use std::io::{Read, Write, ErrorKind};
use tokio_io::{AsyncRead, AsyncWrite};
use std::io::Error as IoError;
use prometheus::{Encoder, IntCounter, Histogram};
#[derive(prometheus_metric_storage::MetricStorage)]
#[metric(subsystem = "websocat1")]
pub struct GlobalStats {
w_msgs: IntCounter,
r_msgs: IntCounter,
w_bytes: IntCounter,
r_bytes: IntCounter,
connects: IntCounter,
disconnects: IntCounter,
#[metric(buckets(0.1, 1, 10, 60, 300, 3600))]
session_durations: Histogram,
#[metric(buckets(0.1, 1, 10, 60, 300, 3600))]
between_connects: Histogram,
#[metric(buckets(0, 32, 1024, 65536, 1048576, 33554432, 1073741824))]
session_w_bytes: Histogram,
#[metric(buckets(0, 32, 1024, 65536, 1048576, 33554432, 1073741824))]
session_r_bytes: Histogram,
#[metric(buckets(0, 1, 2, 8, 64, 2048, 65536, 2097152))]
session_w_msgs: Histogram,
#[metric(buckets(0, 1, 2, 8, 64, 2048, 65536, 2097152))]
session_r_msgs: Histogram,
#[metric(buckets(0.5, 10, 100, 1000, 1000_0, 1000_00, 1000_000, 10_000_000, 100_000_000))]
session_avg_r_bps: Histogram,
#[metric(buckets(0.5, 10, 100, 1000, 1000_0, 1000_00, 1000_000, 10_000_000, 100_000_000))]
session_avg_w_bps: Histogram,
#[metric(buckets(0, 1, 10, 50, 300, 1024, 8192, 65536, 4194304))]
read_lengths: Histogram,
read_errors: IntCounter,
read_wouldblocks: IntCounter,
write_errors: IntCounter,
write_wouldblocks: IntCounter,
#[metric(buckets(0, 1, 10, 50, 300, 1024, 8192, 65536, 4194304))]
write_lengths: Histogram,
#[metric(buckets(0.1e-3,1e-3,0.01,0.1,1,10))]
read_timings: Histogram,
#[metric(buckets(0.1e-3,1e-3,0.01,0.1,1,10))]
write_timings: Histogram,
}
pub type HGlobalStats = Rc<GlobalStats>;
pub type GlobalState = (HGlobalStats, Rc<RefCell<Option<prometheus::HistogramTimer>>>);
struct Droppie {
w_msgs: AtomicU64,
r_msgs: AtomicU64,
w_bytes: AtomicU64,
r_bytes: AtomicU64,
session_timing: Option<prometheus::HistogramTimer>,
handle: HGlobalStats,
}
impl Droppie {
fn new(handle: HGlobalStats) -> Droppie {
handle.connects.inc();
Droppie {
session_timing: Some(handle.session_durations.start_timer()),
handle: handle,
w_msgs: AtomicU64::new(0),
r_msgs: AtomicU64::new(0),
w_bytes: AtomicU64::new(0),
r_bytes: AtomicU64::new(0),
}
}
}
impl Drop for Droppie {
fn drop(&mut self) {
let t = self.session_timing.take().unwrap().stop_and_record();
self.handle.session_r_bytes.observe(self.r_bytes.get() as f64);
self.handle.session_w_bytes.observe(self.w_bytes.get() as f64);
self.handle.session_r_msgs.observe(self.r_msgs.get() as f64);
self.handle.session_w_msgs.observe(self.w_msgs.get() as f64);
let r_avg_bps = self.r_bytes.get() as f64 / t;
let w_avg_bps = self.w_bytes.get() as f64 / t;
self.handle.session_avg_r_bps.observe(r_avg_bps);
self.handle.session_avg_w_bps.observe(w_avg_bps);
self.handle.disconnects.inc();
}
}
pub fn new_global_stats() -> GlobalState {
(Rc::new(GlobalStats::new(prometheus::default_registry()).unwrap()), Rc::new(RefCell::new(None)))
}
pub fn serve(psa: SocketAddr) -> crate::Result<()> {
let tcp = TcpListener::bind(&psa)?;
debug!("Listening TCP socket for Prometheus metrics");
std::thread::spawn(move || {
for s in tcp.incoming() {
if let Ok(s) = s {
let mut s = std::io::BufWriter::new(s);
let stats = prometheus::default_registry().gather();
let header = get_static_file_reply(None, "text/plain; version=0.0.4");
let _ = s.write_all(&header[..]);
let _ = prometheus::TextEncoder::default().encode(&stats[..], &mut s);
}
std::thread::sleep(Duration::from_millis(5));
}
});
Ok(())
}
#[derive(Debug)]
pub struct Prometheus<T: Specifier>(pub T);
impl<T: Specifier> Specifier for Prometheus<T> {
fn construct(&self, cp: ConstructParams) -> PeerConstructor {
let stats: GlobalState = cp.global(new_global_stats).clone();
let inner = self.0.construct(cp.clone());
inner.map(move |p, _| prometheus_peer(p, stats.clone()))
}
specifier_boilerplate!(globalstate has_subspec);
self_0_is_subspecifier!(proxy_is_multiconnect);
}
specifier_class!(
name = PrometheusClass,
target = Prometheus,
prefixes = ["prometheus:", "metrics:"],
arg_handling = subspec,
overlay = true,
MessageOriented,
MulticonnectnessDependsOnInnerType,
help = r#"
[A] Account connections, messages, bytes and other data and expose Prometheus metrics on a separate port.
Not included by default, build a crate with `--features=prometheus_peer` to have it.
You can also use `--features=prometheus_peer,prometheus/process` to have additional metrics.
"#
);
pub fn prometheus_peer(inner_peer: Peer, stats: GlobalState) -> BoxedNewPeerFuture {
let droppie = Droppie::new(stats.0);
*stats.1.borrow_mut() = Some(droppie.handle.between_connects.start_timer());
let droppie = Rc::new(droppie);
let r = StatsWrapperR(inner_peer.0, droppie.clone());
let w = StatsWrapperW(inner_peer.1, droppie);
let thepeer = Peer::new(r, w, inner_peer.2);
Box::new(ok(thepeer)) as BoxedNewPeerFuture
}
struct StatsWrapperR(Box<dyn AsyncRead>, Rc<Droppie>);
impl Read for StatsWrapperR {
fn read(&mut self, b: &mut [u8]) -> Result<usize, IoError> {
let timer = self.1.handle.read_timings.start_timer();
let ret = self.0.read(b);
timer.stop_and_record();
match &ret {
Ok(x) => {
self.1.handle.read_lengths.observe(*x as f64);
self.1.handle.r_msgs.inc();
self.1.handle.r_bytes.inc_by(*x as u64);
self.1.r_msgs.inc_by(1);
self.1.r_bytes.inc_by(*x as u64);
},
Err(e) if e.kind() == ErrorKind::WouldBlock => {
self.1.handle.read_wouldblocks.inc();
},
Err(_) => {
self.1.handle.read_errors.inc();
},
};
ret
}
}
impl AsyncRead for StatsWrapperR {}
struct StatsWrapperW(Box<dyn AsyncWrite>, Rc<Droppie>);
impl Write for StatsWrapperW {
fn write(&mut self, b: &[u8]) -> Result<usize, IoError> {
let timer = self.1.handle.write_timings.start_timer();
let ret = self.0.write(b);
timer.stop_and_record();
match &ret {
Ok(x) => {
self.1.handle.write_lengths.observe(*x as f64);
self.1.handle.w_msgs.inc();
self.1.handle.w_bytes.inc_by(*x as u64);
self.1.w_msgs.inc_by(1);
self.1.w_bytes.inc_by(*x as u64);
},
Err(e) if e.kind() == ErrorKind::WouldBlock => {
self.1.handle.write_wouldblocks.inc();
},
Err(_) => {
self.1.handle.write_errors.inc();
},
};
ret
}
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
impl AsyncWrite for StatsWrapperW {
fn shutdown(&mut self) -> std::result::Result<Async<()>, std::io::Error> {
self.0.shutdown()
}
}