1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
use std::fmt;
use std::io::{Write, BufWriter};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use std::thread;

use futures::stream::Stream;
use self_meter;
use serde::{Serializer, Serialize};
use tokio_core::reactor::{Handle, Interval};
use tk_http::{Status};
use tk_http::server::{Encoder, EncoderDone};

use json::{serialize, ReportWrapper, ThreadIter};
use self_meter::Pid;

/// A wrapper around original ``self_meter::Meter`` that locks internal
/// mutex on most operations and maybe used in multiple threads safely.
#[derive(Clone)]
pub struct Meter(Arc<Mutex<self_meter::Meter>>);

/// A serializable report structure
///
/// Use `Meter::report()` to get instance.
///
/// Currently the structure is fully opaque except serialization
#[derive(Debug)]
pub struct Report<'a>(&'a Meter);

/// A serializable thread report structure
///
/// The structure serializes to a map of thread reports.
/// Use `Meter::thread_report()` to get instance.
///
/// Currently the structure is fully opaque except serialization
#[derive(Debug)]
pub struct ThreadReport<'a>(&'a Meter);

/// A serializable process report structure
///
/// The structure serializes to a structure of whole-process metrics.
/// Use `Meter::process_report()` to get instance.
///
/// Currently the structure is fully opaque except serialization
#[derive(Debug)]
pub struct ProcessReport<'a>(&'a Meter);

impl Meter {

    /// Create a new meter with specified scan iterval of one second
    pub fn new() -> Meter {
        let inner = self_meter::Meter::new(Duration::new(1, 0))
            .expect("self-meter should be created successfully");
        Meter(Arc::new(Mutex::new(inner)))
    }

    /// Adds a scanner coroutine to tokio main loop
    ///
    /// This must be called once per process (not per thread or tokio loop)
    pub fn spawn_scanner(&self, handle: &Handle) {
        let meter = self.clone();
        handle.spawn(
            Interval::new(Duration::new(1, 0), handle)
            .expect("interval should work")
            .map_err(|e| panic!("interval error: {}", e))
            .map(move |()| {
                meter.lock().scan()
                .map_err(|e| error!("Self-meter scan error: {}", e)).ok();
            })
            .for_each(|()| Ok(())
        ));
    }

    fn lock(&self) -> MutexGuard<self_meter::Meter> {
        self.0.lock().expect("meter not poisoned")
    }


    /// Serialize response into JSON
    pub fn serialize<W: Write>(&self, buf: W) {
        serialize(&*self.lock(), buf)
    }

    /// Get serializable report
    pub fn report(&self) -> Report {
        Report(self)
    }

    /// Get serializable report for process data
    ///
    /// This is a part of `report()` / `Report`, and is needed for fine-grained
    /// serialization control.
    pub fn process_report(&self) -> ProcessReport {
        ProcessReport(self)
    }

    /// Get serializable report for thread data
    ///
    /// This is a part of `report()` / `Report`, and is needed for fine-grained
    /// serialization control.
    pub fn thread_report(&self) -> ThreadReport {
        ThreadReport(self)
    }

    /// Same as `serialize` but also adds required HTTP headers
    pub fn respond<S>(&self, mut e: Encoder<S>) -> EncoderDone<S> {
        e.status(Status::Ok);
        // TODO(tailhook) add date
        e.add_header("Server",
            concat!("self-meter-http/", env!("CARGO_PKG_VERSION"))
        ).unwrap();
        e.add_header("Content-Type", "application/json").unwrap();
        e.add_chunked().unwrap();
        if e.done_headers().unwrap() {
            self.serialize(BufWriter::new(&mut e))
        }
        e.done()
    }
    /// Start tracking specified thread
    ///
    /// Note: you must add main thread here manually. Usually you
    /// should use `track_current_thread()` instead.
    pub fn track_thread(&self, tid: Pid, name: &str) {
        self.lock().track_thread(tid, name)
    }
    /// Stop tracking specified thread (for example if it's dead)
    pub fn untrack_thread(&self, tid: Pid) {
        self.lock().untrack_thread(tid)
    }
    /// Add current thread using `track_thread`, returns thread id
    pub fn track_current_thread(&self, name: &str) -> Pid {
        self.lock().track_current_thread(name)
    }
    /// Track current thread by using name from `std::thread::current().name()`
    ///
    /// This may not work if thread has no name, use `track_current_thread`
    /// if thread has no own name or if you're unsure.
    ///
    /// # Panics
    ///
    /// If no thread is set.
    pub fn track_current_thread_by_name(&self) {
        let thread = thread::current();
        let name = thread.name().expect("thread name must be set");
        self.lock().track_current_thread(name);
    }
    /// Remove current thread using `untrack_thread`
    pub fn untrack_current_thread(&self) {
        self.lock().untrack_current_thread();
    }
}

impl fmt::Debug for Meter {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Meter")
        .finish()
    }
}

impl<'a> Serialize for Report<'a> {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
        where S: Serializer
    {
        ReportWrapper { meter: &*self.0.lock() }.serialize(serializer)
    }
}

impl<'a> Serialize for ProcessReport<'a> {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
        where S: Serializer
    {
        self.0.lock().report().serialize(serializer)
    }
}

impl<'a> Serialize for ThreadReport<'a> {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
        where S: Serializer
    {
        ThreadIter(&*self.0.lock()).serialize(serializer)
    }
}