self_meter_http/
locked.rs1use std::fmt;
2use std::io::{Write, BufWriter};
3use std::sync::{Arc, Mutex, MutexGuard};
4use std::time::Duration;
5use std::thread;
6
7use futures::stream::Stream;
8use self_meter;
9use serde::{Serializer, Serialize};
10use tokio_core::reactor::{Handle, Interval};
11use tk_http::{Status};
12use tk_http::server::{Encoder, EncoderDone};
13
14use json::{serialize, ReportWrapper, ThreadIter};
15use self_meter::Pid;
16
17#[derive(Clone)]
20pub struct Meter(Arc<Mutex<self_meter::Meter>>);
21
22#[derive(Debug)]
28pub struct Report<'a>(&'a Meter);
29
30#[derive(Debug)]
37pub struct ThreadReport<'a>(&'a Meter);
38
39#[derive(Debug)]
46pub struct ProcessReport<'a>(&'a Meter);
47
48impl Meter {
49
50 pub fn new() -> Meter {
52 let inner = self_meter::Meter::new(Duration::new(1, 0))
53 .expect("self-meter should be created successfully");
54 Meter(Arc::new(Mutex::new(inner)))
55 }
56
57 pub fn spawn_scanner(&self, handle: &Handle) {
61 let meter = self.clone();
62 handle.spawn(
63 Interval::new(Duration::new(1, 0), handle)
64 .expect("interval should work")
65 .map_err(|e| panic!("interval error: {}", e))
66 .map(move |()| {
67 meter.lock().scan()
68 .map_err(|e| error!("Self-meter scan error: {}", e)).ok();
69 })
70 .for_each(|()| Ok(())
71 ));
72 }
73
74 fn lock(&self) -> MutexGuard<self_meter::Meter> {
75 self.0.lock().expect("meter not poisoned")
76 }
77
78
79 pub fn serialize<W: Write>(&self, buf: W) {
81 serialize(&*self.lock(), buf)
82 }
83
84 pub fn report(&self) -> Report {
86 Report(self)
87 }
88
89 pub fn process_report(&self) -> ProcessReport {
94 ProcessReport(self)
95 }
96
97 pub fn thread_report(&self) -> ThreadReport {
102 ThreadReport(self)
103 }
104
105 pub fn respond<S>(&self, mut e: Encoder<S>) -> EncoderDone<S> {
107 e.status(Status::Ok);
108 e.add_header("Server",
110 concat!("self-meter-http/", env!("CARGO_PKG_VERSION"))
111 ).unwrap();
112 e.add_header("Content-Type", "application/json").unwrap();
113 e.add_chunked().unwrap();
114 if e.done_headers().unwrap() {
115 self.serialize(BufWriter::new(&mut e))
116 }
117 e.done()
118 }
119 pub fn track_thread(&self, tid: Pid, name: &str) {
124 self.lock().track_thread(tid, name)
125 }
126 pub fn untrack_thread(&self, tid: Pid) {
128 self.lock().untrack_thread(tid)
129 }
130 pub fn track_current_thread(&self, name: &str) -> Pid {
132 self.lock().track_current_thread(name)
133 }
134 pub fn track_current_thread_by_name(&self) {
143 let thread = thread::current();
144 let name = thread.name().expect("thread name must be set");
145 self.lock().track_current_thread(name);
146 }
147 pub fn untrack_current_thread(&self) {
149 self.lock().untrack_current_thread();
150 }
151}
152
153impl fmt::Debug for Meter {
154 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
155 f.debug_struct("Meter")
156 .finish()
157 }
158}
159
160impl<'a> Serialize for Report<'a> {
161 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
162 where S: Serializer
163 {
164 ReportWrapper { meter: &*self.0.lock() }.serialize(serializer)
165 }
166}
167
168impl<'a> Serialize for ProcessReport<'a> {
169 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170 where S: Serializer
171 {
172 self.0.lock().report().serialize(serializer)
173 }
174}
175
176impl<'a> Serialize for ThreadReport<'a> {
177 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
178 where S: Serializer
179 {
180 ThreadIter(&*self.0.lock()).serialize(serializer)
181 }
182}