influxdb_dispatcher/
lib.rs1#![doc = include_str!("../README.md")]
2
3#[cfg(feature = "util")]
4pub mod util;
5
6use std::{future::Future, time::Duration};
7
8use futures::{stream::FuturesUnordered, StreamExt};
9use influxdb::InfluxDbWriteable;
10use tokio::{sync::mpsc, time::MissedTickBehavior};
11
12pub trait IntoNamedQuery: InfluxDbWriteable + Sized {
14 fn into_named_query(self) -> influxdb::WriteQuery {
15 let type_name = std::any::type_name::<Self>();
16
17 let name = type_name
18 .rsplit_once("::")
19 .map(|(_, name)| name)
20 .unwrap_or(type_name);
21
22 InfluxDbWriteable::into_query(self, name)
23 }
24}
25
26impl<T: InfluxDbWriteable> IntoNamedQuery for T {}
27
28pub async fn dispatch(client: &influxdb::Client, metric: influxdb::WriteQuery) {
31 if let Err(error) = client.query(metric).await {
32 tracing::error!("Failed to submit metric: {}", error);
33 }
34}
35
36pub async fn dispatch_many<I>(client: &influxdb::Client, metrics: I)
39where
40 I: IntoIterator<Item = influxdb::WriteQuery>,
41{
42 metrics
43 .into_iter()
44 .map(|metric| dispatch(client, metric))
45 .collect::<FuturesUnordered<_>>()
46 .collect::<()>()
47 .await;
48}
49
50pub trait MetricsConsumer {
53 type Metric;
55
56 fn new(client: influxdb::Client) -> Self;
58
59 fn accept(&mut self, metric: Self::Metric);
61
62 fn flush(&mut self) -> impl Future<Output = ()> + Send;
64}
65
66#[derive(Debug)]
69pub struct InfluxDbHandle<M> {
70 channel: mpsc::Sender<M>,
72 metrics_task: tokio::task::JoinHandle<()>,
75}
76
77impl<M> Drop for InfluxDbHandle<M> {
78 fn drop(&mut self) {
79 self.metrics_task.abort(); }
81}
82
83impl<M> InfluxDbHandle<M>
84where
85 M: Send + 'static,
86{
87 pub fn new<C>(consumer: C, push_interval: u64, buffer_size: usize) -> Self
90 where
91 C: MetricsConsumer<Metric = M> + Send + 'static,
92 {
93 let (tx, rx) = mpsc::channel(buffer_size);
94
95 let task = Self::push_loop(consumer, rx, push_interval);
96
97 Self {
98 channel: tx,
99 metrics_task: tokio::task::spawn(task),
100 }
101 }
102
103 pub fn submit(&self, metric: M) {
107 if let Err(error) = self.channel.try_send(metric) {
108 tracing::error!("Failed to submit metric: {}", error);
109 }
110 }
111
112 #[tracing::instrument(skip(consumer, channel))]
116 async fn push_loop<C>(mut consumer: C, mut channel: mpsc::Receiver<M>, push_interval: u64)
117 where
118 C: MetricsConsumer<Metric = M>,
119 {
120 let mut interval = tokio::time::interval(Duration::from_secs(push_interval));
121 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
122
123 tracing::info!("Starting InfluxDb metrics loop");
124
125 loop {
126 tokio::select! {
127 result = channel.recv() => match result {
128 Some(metric) => consumer.accept(metric),
129 None => { tracing::info!("Shutting down metrics task.");
131 break
132 }
133 },
134
135 _ = interval.tick() => consumer.flush().await,
136 }
137 }
138 }
139}