1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//! Queue metrics for write on a separate thread,
//! RawMetrics definitions are still synchronous.
//! If queue size is exceeded, calling code reverts to blocking.

use core::attributes::{Attributes, WithAttributes, Prefixed};
use core::name::MetricName;
use core::input::{InputKind, Input, InputScope, InputMetric};
use core::output::{OutputDyn, OutputScope, OutputMetric, Output};
use core::{MetricValue, Flush};
use core::metrics;
use cache::cache_in;
use core::error;
use core::label::Labels;

use std::rc::Rc;
use std::ops;
use std::fmt;

use std::sync::Arc;
use std::sync::mpsc;
use std::thread;

/// Wrap this raw output behind an asynchronous metrics dispatch queue.
pub trait QueuedOutput: Output + Sized {
    /// Wrap this output with an asynchronous dispatch queue.
    fn queued(self, max_size: usize) -> OutputQueue {
        OutputQueue::new(self, max_size)
    }
}

/// # Panics
///
/// Panics if the OS fails to create a thread.
fn new_async_channel(length: usize) -> Arc<mpsc::SyncSender<OutputQueueCmd>> {
    let (sender, receiver) = mpsc::sync_channel::<OutputQueueCmd>(length);

    thread::Builder::new()
        .name("dipstick-queue-out".to_string())
        .spawn(move || {
            let mut done = false;
            while !done {
                match receiver.recv() {
                    Ok(OutputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
                    Ok(OutputQueueCmd::Flush(scope)) => if let Err(e) = scope.flush() {
                        debug!("Could not asynchronously flush metrics: {}", e);
                    },
                    Err(e) => {
                        debug!("Async metrics receive loop terminated: {}", e);
                        // cannot break from within match, use safety pin instead
                        done = true
                    }
                }
            }
        })
        .unwrap(); // TODO: Panic, change API to return Result?
    Arc::new(sender)
}


/// Wrap scope with an asynchronous metric write & flush dispatcher.
#[derive(Clone)]
pub struct OutputQueue {
    attributes: Attributes,
    target: Arc<OutputDyn + Send + Sync + 'static>,
    q_sender: Arc<mpsc::SyncSender<OutputQueueCmd>>,
}

impl OutputQueue {
    /// Wrap new scopes with an asynchronous metric write & flush dispatcher.
    pub fn new<OUT: Output + Send + Sync + 'static>(target: OUT, queue_length: usize) -> Self {
        OutputQueue {
            attributes: Attributes::default(),
            target: Arc::new(target),
            q_sender: new_async_channel(queue_length),
        }
    }
}

impl WithAttributes for OutputQueue {
    fn get_attributes(&self) -> &Attributes { &self.attributes }
    fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}

impl cache_in::CachedInput for OutputQueue {}

impl Input for OutputQueue {
    type SCOPE = OutputQueueScope;

    /// Wrap new scopes with an asynchronous metric write & flush dispatcher.
    fn input(&self) -> Self::SCOPE {
        let target_scope = UnsafeScope::new(self.target.output_dyn());
        OutputQueueScope {
            attributes: self.attributes.clone(),
            sender: self.q_sender.clone(),
            target: Arc::new(target_scope),
        }
    }

}

/// This is only `pub` because `error` module needs to know about it.
/// Async commands should be of no concerns to applications.
pub enum OutputQueueCmd {
    /// Send metric write
    Write(Arc<OutputMetric>, MetricValue, Labels),
    /// Send metric flush
    Flush(Arc<UnsafeScope>),
}

/// A scope wrapper that sends writes & flushes over a Rust sync channel.
/// Commands are executed by a background thread.
#[derive(Clone)]
pub struct OutputQueueScope {
    attributes: Attributes,
    sender: Arc<mpsc::SyncSender<OutputQueueCmd>>,
    target: Arc<UnsafeScope>,
}

impl WithAttributes for OutputQueueScope {
    fn get_attributes(&self) -> &Attributes { &self.attributes }
    fn mut_attributes(&mut self) -> &mut Attributes { &mut self.attributes }
}

impl InputScope for OutputQueueScope {
    fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
        let name = self.prefix_append(name);
        let target_metric = Arc::new(self.target.new_metric(name, kind));
        let sender = self.sender.clone();
        InputMetric::new(move |value, mut labels| {
            labels.save_context();
            if let Err(e) = sender.send(OutputQueueCmd::Write(target_metric.clone(), value, labels)) {
                metrics::SEND_FAILED.mark();
                debug!("Failed to send async metrics: {}", e);
            }
        })
    }
}

impl Flush for OutputQueueScope {

    fn flush(&self) -> error::Result<()> {
        if let Err(e) = self.sender.send(OutputQueueCmd::Flush(self.target.clone())) {
            metrics::SEND_FAILED.mark();
            debug!("Failed to flush async metrics: {}", e);
            Err(e.into())
        } else {
            Ok(())
        }
    }
}

/// Wrap an OutputScope to make it Send + Sync, allowing it to travel the world of threads.
/// Obviously, it should only still be used from a single thread or dragons may occur.
#[derive(Clone)]
pub struct UnsafeScope(Rc<OutputScope + 'static> );

unsafe impl Send for UnsafeScope {}
unsafe impl Sync for UnsafeScope {}

impl UnsafeScope {
    /// Wrap a dynamic RawScope to make it Send + Sync.
    pub fn new(scope: Rc<OutputScope + 'static>) -> Self {
        UnsafeScope(scope)
    }
}

impl ops::Deref for UnsafeScope {
    type Target = OutputScope + 'static;
    fn deref(&self) -> &Self::Target {
        Rc::as_ref(&self.0)
    }
}


impl fmt::Debug for OutputMetric {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Box<Fn(Value)>")
    }
}

unsafe impl Send for OutputMetric {}
unsafe impl Sync for OutputMetric {}