self_meter_http/
locked.rs

1use std::fmt;
2use std::io::{Write, BufWriter};
3use std::sync::{Arc, Mutex, MutexGuard};
4use std::time::Duration;
5use std::thread;
6
7use futures::stream::Stream;
8use self_meter;
9use serde::{Serializer, Serialize};
10use tokio_core::reactor::{Handle, Interval};
11use tk_http::{Status};
12use tk_http::server::{Encoder, EncoderDone};
13
14use json::{serialize, ReportWrapper, ThreadIter};
15use self_meter::Pid;
16
17/// A wrapper around original ``self_meter::Meter`` that locks internal
18/// mutex on most operations and maybe used in multiple threads safely.
19#[derive(Clone)]
20pub struct Meter(Arc<Mutex<self_meter::Meter>>);
21
22/// A serializable report structure
23///
24/// Use `Meter::report()` to get instance.
25///
26/// Currently the structure is fully opaque except serialization
27#[derive(Debug)]
28pub struct Report<'a>(&'a Meter);
29
30/// A serializable thread report structure
31///
32/// The structure serializes to a map of thread reports.
33/// Use `Meter::thread_report()` to get instance.
34///
35/// Currently the structure is fully opaque except serialization
36#[derive(Debug)]
37pub struct ThreadReport<'a>(&'a Meter);
38
39/// A serializable process report structure
40///
41/// The structure serializes to a structure of whole-process metrics.
42/// Use `Meter::process_report()` to get instance.
43///
44/// Currently the structure is fully opaque except serialization
45#[derive(Debug)]
46pub struct ProcessReport<'a>(&'a Meter);
47
48impl Meter {
49
50    /// Create a new meter with specified scan iterval of one second
51    pub fn new() -> Meter {
52        let inner = self_meter::Meter::new(Duration::new(1, 0))
53            .expect("self-meter should be created successfully");
54        Meter(Arc::new(Mutex::new(inner)))
55    }
56
57    /// Adds a scanner coroutine to tokio main loop
58    ///
59    /// This must be called once per process (not per thread or tokio loop)
60    pub fn spawn_scanner(&self, handle: &Handle) {
61        let meter = self.clone();
62        handle.spawn(
63            Interval::new(Duration::new(1, 0), handle)
64            .expect("interval should work")
65            .map_err(|e| panic!("interval error: {}", e))
66            .map(move |()| {
67                meter.lock().scan()
68                .map_err(|e| error!("Self-meter scan error: {}", e)).ok();
69            })
70            .for_each(|()| Ok(())
71        ));
72    }
73
74    fn lock(&self) -> MutexGuard<self_meter::Meter> {
75        self.0.lock().expect("meter not poisoned")
76    }
77
78
79    /// Serialize response into JSON
80    pub fn serialize<W: Write>(&self, buf: W) {
81        serialize(&*self.lock(), buf)
82    }
83
84    /// Get serializable report
85    pub fn report(&self) -> Report {
86        Report(self)
87    }
88
89    /// Get serializable report for process data
90    ///
91    /// This is a part of `report()` / `Report`, and is needed for fine-grained
92    /// serialization control.
93    pub fn process_report(&self) -> ProcessReport {
94        ProcessReport(self)
95    }
96
97    /// Get serializable report for thread data
98    ///
99    /// This is a part of `report()` / `Report`, and is needed for fine-grained
100    /// serialization control.
101    pub fn thread_report(&self) -> ThreadReport {
102        ThreadReport(self)
103    }
104
105    /// Same as `serialize` but also adds required HTTP headers
106    pub fn respond<S>(&self, mut e: Encoder<S>) -> EncoderDone<S> {
107        e.status(Status::Ok);
108        // TODO(tailhook) add date
109        e.add_header("Server",
110            concat!("self-meter-http/", env!("CARGO_PKG_VERSION"))
111        ).unwrap();
112        e.add_header("Content-Type", "application/json").unwrap();
113        e.add_chunked().unwrap();
114        if e.done_headers().unwrap() {
115            self.serialize(BufWriter::new(&mut e))
116        }
117        e.done()
118    }
119    /// Start tracking specified thread
120    ///
121    /// Note: you must add main thread here manually. Usually you
122    /// should use `track_current_thread()` instead.
123    pub fn track_thread(&self, tid: Pid, name: &str) {
124        self.lock().track_thread(tid, name)
125    }
126    /// Stop tracking specified thread (for example if it's dead)
127    pub fn untrack_thread(&self, tid: Pid) {
128        self.lock().untrack_thread(tid)
129    }
130    /// Add current thread using `track_thread`, returns thread id
131    pub fn track_current_thread(&self, name: &str) -> Pid {
132        self.lock().track_current_thread(name)
133    }
134    /// Track current thread by using name from `std::thread::current().name()`
135    ///
136    /// This may not work if thread has no name, use `track_current_thread`
137    /// if thread has no own name or if you're unsure.
138    ///
139    /// # Panics
140    ///
141    /// If no thread is set.
142    pub fn track_current_thread_by_name(&self) {
143        let thread = thread::current();
144        let name = thread.name().expect("thread name must be set");
145        self.lock().track_current_thread(name);
146    }
147    /// Remove current thread using `untrack_thread`
148    pub fn untrack_current_thread(&self) {
149        self.lock().untrack_current_thread();
150    }
151}
152
153impl fmt::Debug for Meter {
154    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
155        f.debug_struct("Meter")
156        .finish()
157    }
158}
159
160impl<'a> Serialize for Report<'a> {
161    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
162        where S: Serializer
163    {
164        ReportWrapper { meter: &*self.0.lock() }.serialize(serializer)
165    }
166}
167
168impl<'a> Serialize for ProcessReport<'a> {
169    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170        where S: Serializer
171    {
172        self.0.lock().report().serialize(serializer)
173    }
174}
175
176impl<'a> Serialize for ThreadReport<'a> {
177    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
178        where S: Serializer
179    {
180        ThreadIter(&*self.0.lock()).serialize(serializer)
181    }
182}