1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use core::*;
use self_metrics::*;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
mod_metrics!(Aggregate, QUEUE_METRICS = DIPSTICK_METRICS.with_prefix("async_queue"));
mod_marker!(Aggregate, QUEUE_METRICS, { SEND_FAILED: "send_failed" });
pub trait WithAsyncQueue
where
Self: Sized,
{
fn with_async_queue(&self, queue_size: usize) -> Self;
}
impl<M: Send + Sync + Clone + 'static> WithAsyncQueue for Chain<M> {
fn with_async_queue(&self, queue_size: usize) -> Self {
self.mod_scope(|next| {
let (sender, receiver) = mpsc::sync_channel::<QueueCommand<M>>(queue_size);
thread::spawn(move || loop {
while let Ok(cmd) = receiver.recv() {
match cmd {
QueueCommand {
cmd: Some((metric, value)),
next_scope,
} => next_scope.write(&metric, value),
QueueCommand {
cmd: None,
next_scope,
} => next_scope.flush(),
}
}
});
Arc::new(move |buffered| {
let next_scope: ControlScopeFn<M> = next(buffered);
let sender = sender.clone();
ControlScopeFn::new(move |cmd| {
let send_cmd = match cmd {
ScopeCmd::Write(metric, value) => {
let metric: &M = metric;
Some((metric.clone(), value))
},
ScopeCmd::Flush => None,
};
sender
.send(QueueCommand {
cmd: send_cmd,
next_scope: next_scope.clone(),
})
.unwrap_or_else(|e| {
SEND_FAILED.mark();
trace!("Async metrics could not be sent: {}", e);
})
})
})
})
}
}
#[deprecated(since = "0.5.0", note = "Use `with_async_queue` instead.")]
pub fn async<M, IC>(queue_size: usize, chain: IC) -> Chain<M>
where
M: Clone + Send + Sync + 'static,
IC: Into<Chain<M>>,
{
let chain = chain.into();
chain.with_async_queue(queue_size)
}
#[derive(Derivative)]
#[derivative(Debug)]
pub struct QueueCommand<M> {
cmd: Option<(M, Value)>,
#[derivative(Debug = "ignore")]
next_scope: ControlScopeFn<M>,
}