use std::fmt;
use std::io::{Write, BufWriter};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use std::thread;
use futures::stream::Stream;
use self_meter;
use serde::{Serializer, Serialize};
use tokio_core::reactor::{Handle, Interval};
use tk_http::{Status};
use tk_http::server::{Encoder, EncoderDone};
use json::{serialize, ReportWrapper, ThreadIter};
use self_meter::Pid;
#[derive(Clone)]
pub struct Meter(Arc<Mutex<self_meter::Meter>>);
#[derive(Debug)]
pub struct Report<'a>(&'a Meter);
#[derive(Debug)]
pub struct ThreadReport<'a>(&'a Meter);
#[derive(Debug)]
pub struct ProcessReport<'a>(&'a Meter);
impl Meter {
pub fn new() -> Meter {
let inner = self_meter::Meter::new(Duration::new(1, 0))
.expect("self-meter should be created successfully");
Meter(Arc::new(Mutex::new(inner)))
}
pub fn spawn_scanner(&self, handle: &Handle) {
let meter = self.clone();
handle.spawn(
Interval::new(Duration::new(1, 0), handle)
.expect("interval should work")
.map_err(|e| panic!("interval error: {}", e))
.map(move |()| {
meter.lock().scan()
.map_err(|e| error!("Self-meter scan error: {}", e)).ok();
})
.for_each(|()| Ok(())
));
}
fn lock(&self) -> MutexGuard<self_meter::Meter> {
self.0.lock().expect("meter not poisoned")
}
pub fn serialize<W: Write>(&self, buf: W) {
serialize(&*self.lock(), buf)
}
pub fn report(&self) -> Report {
Report(self)
}
pub fn process_report(&self) -> ProcessReport {
ProcessReport(self)
}
pub fn thread_report(&self) -> ThreadReport {
ThreadReport(self)
}
pub fn respond<S>(&self, mut e: Encoder<S>) -> EncoderDone<S> {
e.status(Status::Ok);
e.add_header("Server",
concat!("self-meter-http/", env!("CARGO_PKG_VERSION"))
).unwrap();
e.add_header("Content-Type", "application/json").unwrap();
e.add_chunked().unwrap();
if e.done_headers().unwrap() {
self.serialize(BufWriter::new(&mut e))
}
e.done()
}
pub fn track_thread(&self, tid: Pid, name: &str) {
self.lock().track_thread(tid, name)
}
pub fn untrack_thread(&self, tid: Pid) {
self.lock().untrack_thread(tid)
}
pub fn track_current_thread(&self, name: &str) -> Pid {
self.lock().track_current_thread(name)
}
pub fn track_current_thread_by_name(&self) {
let thread = thread::current();
let name = thread.name().expect("thread name must be set");
self.lock().track_current_thread(name);
}
pub fn untrack_current_thread(&self) {
self.lock().untrack_current_thread();
}
}
impl fmt::Debug for Meter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Meter")
.finish()
}
}
impl<'a> Serialize for Report<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer
{
ReportWrapper { meter: &*self.0.lock() }.serialize(serializer)
}
}
impl<'a> Serialize for ProcessReport<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer
{
self.0.lock().report().serialize(serializer)
}
}
impl<'a> Serialize for ThreadReport<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer
{
ThreadIter(&*self.0.lock()).serialize(serializer)
}
}