1use crate::CachedInput;
6use crate::attributes::{Attributes, MetricId, OnFlush, Prefixed, WithAttributes};
7use crate::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
8use crate::label::Labels;
9use crate::metrics;
10use crate::name::MetricName;
11use crate::{Flush, MetricValue};
12
13use std::sync::Arc;
14#[cfg(not(feature = "crossbeam-channel"))]
15use std::sync::mpsc;
16use std::{io, thread};
17
18#[cfg(feature = "crossbeam-channel")]
19use crossbeam_channel as crossbeam;
20
21pub trait QueuedInput: Input + Send + Sync + 'static + Sized {
25 fn queued(self, max_size: usize) -> InputQueue {
27 InputQueue::new(self, max_size)
28 }
29}
30
31#[cfg(not(feature = "crossbeam-channel"))]
35fn new_async_channel(length: usize) -> Arc<mpsc::SyncSender<InputQueueCmd>> {
36 let (sender, receiver) = mpsc::sync_channel::<InputQueueCmd>(length);
37
38 thread::Builder::new()
39 .name("dipstick-queue-in".to_string())
40 .spawn(move || {
41 let mut done = false;
42 while !done {
43 match receiver.recv() {
44 Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
45 Ok(InputQueueCmd::Flush(scope)) => {
46 if let Err(e) = scope.flush() {
47 debug!("Could not asynchronously flush metrics: {e}");
48 }
49 }
50 Err(e) => {
51 debug!("Async metrics receive loop terminated: {e}");
52 done = true
54 }
55 }
56 }
57 })
58 .unwrap(); Arc::new(sender)
60}
61
62#[cfg(feature = "crossbeam-channel")]
66fn new_async_channel(length: usize) -> Arc<crossbeam::Sender<InputQueueCmd>> {
67 let (sender, receiver) = crossbeam::bounded::<InputQueueCmd>(length);
68
69 thread::Builder::new()
70 .name("dipstick-queue-in".to_string())
71 .spawn(move || {
72 let mut done = false;
73 while !done {
74 match receiver.recv() {
75 Ok(InputQueueCmd::Write(metric, value, labels)) => metric.write(value, labels),
76 Ok(InputQueueCmd::Flush(scope)) => {
77 if let Err(e) = scope.flush() {
78 debug!("Could not asynchronously flush metrics: {e}");
79 }
80 }
81 Err(e) => {
82 debug!("Async metrics receive loop terminated: {e}");
83 done = true
85 }
86 }
87 }
88 })
89 .unwrap(); Arc::new(sender)
91}
92
93#[derive(Clone)]
95pub struct InputQueue {
96 attributes: Attributes,
97 target: Arc<dyn InputDyn + Send + Sync + 'static>,
98 #[cfg(not(feature = "crossbeam-channel"))]
99 sender: Arc<mpsc::SyncSender<InputQueueCmd>>,
100 #[cfg(feature = "crossbeam-channel")]
101 sender: Arc<crossbeam::Sender<InputQueueCmd>>,
102}
103
104impl InputQueue {
105 pub fn new<OUT: Input + Send + Sync + 'static>(target: OUT, queue_length: usize) -> Self {
107 InputQueue {
108 attributes: Attributes::default(),
109 target: Arc::new(target),
110 sender: new_async_channel(queue_length),
111 }
112 }
113}
114
115impl CachedInput for InputQueue {}
116
117impl WithAttributes for InputQueue {
118 fn get_attributes(&self) -> &Attributes {
119 &self.attributes
120 }
121 fn mut_attributes(&mut self) -> &mut Attributes {
122 &mut self.attributes
123 }
124}
125
126impl Input for InputQueue {
127 type SCOPE = InputQueueScope;
128
129 fn metrics(&self) -> Self::SCOPE {
131 let target_scope = self.target.input_dyn();
132 InputQueueScope {
133 attributes: self.attributes.clone(),
134 sender: self.sender.clone(),
135 target: target_scope,
136 }
137 }
138}
139
140pub enum InputQueueCmd {
143 Write(InputMetric, MetricValue, Labels),
145 Flush(Arc<dyn InputScope + Send + Sync + 'static>),
147}
148
149#[derive(Clone)]
152pub struct InputQueueScope {
153 attributes: Attributes,
154 #[cfg(not(feature = "crossbeam-channel"))]
155 sender: Arc<mpsc::SyncSender<InputQueueCmd>>,
156 #[cfg(feature = "crossbeam-channel")]
157 sender: Arc<crossbeam::Sender<InputQueueCmd>>,
158 target: Arc<dyn InputScope + Send + Sync + 'static>,
159}
160
161impl InputQueueScope {
162 pub fn wrap<SC: InputScope + Send + Sync + 'static>(
164 target_scope: SC,
165 queue_length: usize,
166 ) -> Self {
167 InputQueueScope {
168 attributes: Attributes::default(),
169 sender: new_async_channel(queue_length),
170 target: Arc::new(target_scope),
171 }
172 }
173}
174
175impl WithAttributes for InputQueueScope {
176 fn get_attributes(&self) -> &Attributes {
177 &self.attributes
178 }
179 fn mut_attributes(&mut self) -> &mut Attributes {
180 &mut self.attributes
181 }
182}
183
184impl InputScope for InputQueueScope {
185 fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric {
186 let name = self.prefix_append(name);
187 let target_metric = self.target.new_metric(name.clone(), kind);
188 let sender = self.sender.clone();
189 InputMetric::new(MetricId::forge("queue", name), move |value, mut labels| {
190 labels.save_context();
191 if let Err(e) = sender.send(InputQueueCmd::Write(target_metric.clone(), value, labels))
192 {
193 metrics::SEND_FAILED.mark();
194 debug!("Failed to send async metrics: {e}");
195 }
196 })
197 }
198}
199
200impl Flush for InputQueueScope {
201 fn flush(&self) -> io::Result<()> {
202 self.notify_flush_listeners();
203 if let Err(e) = self.sender.send(InputQueueCmd::Flush(self.target.clone())) {
204 metrics::SEND_FAILED.mark();
205 debug!("Failed to flush async metrics: {e}");
206 Err(io::Error::other(e))
207 } else {
208 Ok(())
209 }
210 }
211}