Skip to main content

dipstick/
queue.rs

1//! Queue metrics for write on a separate thread,
2//! Metrics definitions are still synchronous.
3//! If queue size is exceeded, calling code reverts to blocking.
4
5use crate::CachedInput;
6use crate::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
7use crate::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
8use crate::label::Labels;
9use crate::metrics;
10use crate::name::MetricName;
11use crate::{Flush, MetricValue};
12
13use std::sync::Arc;
14#[cfg(not(feature = "crossbeam-channel"))]
15use std::sync::mpsc;
16use std::{io, thread};
17
18#[cfg(feature = "crossbeam-channel")]
19use crossbeam_channel as crossbeam;
20
21/// Wrap this output behind an asynchronous metrics dispatch queue.
22/// This is not strictly required for multi threading since the provided scopes
23/// are already Send + Sync but might be desired to lower the latency
24pub trait QueuedInput: Input + Send + Sync + 'static + Sized {
25    /// Wrap this output with an asynchronous dispatch queue of specified length.
26    fn queued(self, max_size: usize) -> InputQueue {
27        InputQueue::new(self, max_size)
28    }
29}
30
31/// # Panics
32///
33/// Panics if the OS fails to create a thread.
34#[cfg(not(feature = "crossbeam-channel"))]
35fn new_async_channel(length: usize) -> Arc<mpsc::SyncSender<InputQueueCmd>> {
36    let (sender, receiver) = mpsc::sync_channel::<InputQueueCmd>(length);
37
38    thread::Builder::new()
39        .name("dipstick-queue-in".to_string())
40        .spawn(move || {
41            let mut done = false;
42            while !done {
43                match receiver.recv() {
44                    Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
45                    Ok(InputQueueCmd::Flush(scope)) => {
46                        if let Err(e) = scope.flush() {
47                            debug!("Could not asynchronously flush metrics: {e}");
48                        }
49                    }
50                    Err(e) => {
51                        debug!("Async metrics receive loop terminated: {e}");
52                        // cannot break from within match, use safety pin instead
53                        done = true
54                    }
55                }
56            }
57        })
58        .unwrap(); // TODO: Panic, change API to return Result?
59    Arc::new(sender)
60}
61
62/// # Panics
63///
64/// Panics if the OS fails to create a thread.
65#[cfg(feature = "crossbeam-channel")]
66fn new_async_channel(length: usize) -> Arc<crossbeam::Sender<InputQueueCmd>> {
67    let (sender, receiver) = crossbeam::bounded::<InputQueueCmd>(length);
68
69    thread::Builder::new()
70        .name("dipstick-queue-in".to_string())
71        .spawn(move || {
72            let mut done = false;
73            while !done {
74                match receiver.recv() {
75                    Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
76                    Ok(InputQueueCmd::Flush(scope)) => {
77                        if let Err(e) = scope.flush() {
78                            debug!("Could not asynchronously flush metrics: {e}");
79                        }
80                    }
81                    Err(e) => {
82                        debug!("Async metrics receive loop terminated: {e}");
83                        // cannot break from within match, use safety pin instead
84                        done = true
85                    }
86                }
87            }
88        })
89        .unwrap(); // TODO: Panic, change API to return Result?
90    Arc::new(sender)
91}
92
93/// Wrap new scopes with an asynchronous metric write & flush dispatcher.
94#[derive(Clone)]
95pub struct InputQueue {
96    attributes: Attributes,
97    target: Arc<dyn InputDyn + Send + Sync + 'static>,
98    #[cfg(not(feature = "crossbeam-channel"))]
99    sender: Arc<mpsc::SyncSender<InputQueueCmd>>,
100    #[cfg(feature = "crossbeam-channel")]
101    sender: Arc<crossbeam::Sender<InputQueueCmd>>,
102}
103
104impl InputQueue {
105    /// Wrap new scopes with an asynchronous metric write & flush dispatcher.
106    pub fn new<OUT: Input + Send + Sync + 'static>(target: OUT, queue_length: usize) -> Self {
107        InputQueue {
108            attributes: Attributes::default(),
109            target: Arc::new(target),
110            sender: new_async_channel(queue_length),
111        }
112    }
113}
114
115impl CachedInput for InputQueue {}
116
117impl WithAttributes for InputQueue {
118    fn get_attributes(&self) -> &Attributes {
119        &self.attributes
120    }
121    fn mut_attributes(&mut self) -> &mut Attributes {
122        &mut self.attributes
123    }
124}
125
126impl Input for InputQueue {
127    type SCOPE = InputQueueScope;
128
129    /// Wrap new scopes with an asynchronous metric write & flush dispatcher.
130    fn metrics(&self) -> Self::SCOPE {
131        let target_scope = self.target.input_dyn();
132        InputQueueScope {
133            attributes: self.attributes.clone(),
134            sender: self.sender.clone(),
135            target: target_scope,
136        }
137    }
138}
139
140/// This is only `pub` because `error` module needs to know about it.
141/// Async commands should be of no concerns to applications.
142pub enum InputQueueCmd {
143    /// Send metric write
144    Write(InputMetric, MetricValue, Labels),
145    /// Send metric flush
146    Flush(Arc<dyn InputScope + Send + Sync + 'static>),
147}
148
149/// A metric scope wrapper that sends writes & flushes over a Rust sync channel.
150/// Commands are executed by a background thread.
151#[derive(Clone)]
152pub struct InputQueueScope {
153    attributes: Attributes,
154    #[cfg(not(feature = "crossbeam-channel"))]
155    sender: Arc<mpsc::SyncSender<InputQueueCmd>>,
156    #[cfg(feature = "crossbeam-channel")]
157    sender: Arc<crossbeam::Sender<InputQueueCmd>>,
158    target: Arc<dyn InputScope + Send + Sync + 'static>,
159}
160
161impl InputQueueScope {
162    /// Wrap new scopes with an asynchronous metric write & flush dispatcher.
163    pub fn wrap<SC: InputScope + Send + Sync + 'static>(
164        target_scope: SC,
165        queue_length: usize,
166    ) -> Self {
167        InputQueueScope {
168            attributes: Attributes::default(),
169            sender: new_async_channel(queue_length),
170            target: Arc::new(target_scope),
171        }
172    }
173}
174
175impl WithAttributes for InputQueueScope {
176    fn get_attributes(&self) -> &Attributes {
177        &self.attributes
178    }
179    fn mut_attributes(&mut self) -> &mut Attributes {
180        &mut self.attributes
181    }
182}
183
184impl InputScope for InputQueueScope {
185    fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
186        let name = self.prefix_append(name);
187        let target_metric = self.target.new_metric(name.clone(), kind);
188        let sender = self.sender.clone();
189        InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| {
190            labels.save_context();
191            if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels))
192            {
193                metrics::SEND_FAILED.mark();
194                debug!("Failed to send async metrics: {e}");
195            }
196        })
197    }
198}
199
200impl Flush for InputQueueScope {
201    fn flush(&self) -> io::Result<()> {
202        self.notify_flush_listeners();
203        if let Err(e) = self.sender.send(InputQueueCmd::Flush(self.target.clone())) {
204            metrics::SEND_FAILED.mark();
205            debug!("Failed to flush async metrics: {e}");
206            Err(io::Error::other(e))
207        } else {
208            Ok(())
209        }
210    }
211}