influxdb_dispatcher/
lib.rs

1#![doc = include_str!("../README.md")]
2
3#[cfg(feature = "util")]
4pub mod util;
5
6use std::{future::Future, time::Duration};
7
8use futures::{stream::FuturesUnordered, StreamExt};
9use influxdb::InfluxDbWriteable;
10use tokio::{sync::mpsc, time::MissedTickBehavior};
11
12/// Convert a metric to an [influxdb] query using the type name.
13pub trait IntoNamedQuery: InfluxDbWriteable + Sized {
14    fn into_named_query(self) -> influxdb::WriteQuery {
15        let type_name = std::any::type_name::<Self>();
16
17        let name = type_name
18            .rsplit_once("::")
19            .map(|(_, name)| name)
20            .unwrap_or(type_name);
21
22        InfluxDbWriteable::into_query(self, name)
23    }
24}
25
26impl<T: InfluxDbWriteable> IntoNamedQuery for T {}
27
28/// Dispatch a single metric to the database.
29/// Will emmit a log record if an error occurs.
30pub async fn dispatch(client: &influxdb::Client, metric: influxdb::WriteQuery) {
31    if let Err(error) = client.query(metric).await {
32        tracing::error!("Failed to submit metric: {}", error);
33    }
34}
35
36/// Dispatch many metrics to the database.
37/// These will be dispatched concurrently.
38pub async fn dispatch_many<I>(client: &influxdb::Client, metrics: I)
39where
40    I: IntoIterator<Item = influxdb::WriteQuery>,
41{
42    metrics
43        .into_iter()
44        .map(|metric| dispatch(client, metric))
45        .collect::<FuturesUnordered<_>>()
46        .collect::<()>()
47        .await;
48}
49
50/// Aggregator for metrics.
51/// An aggregator should collect metrics so they can be batch dispatched.
52pub trait MetricsConsumer {
53    /// The metrics type.
54    type Metric;
55
56    /// Create a new instance for the given client.
57    fn new(client: influxdb::Client) -> Self;
58
59    /// Consume a metric.
60    fn accept(&mut self, metric: Self::Metric);
61
62    /// Flush all consumed metrics to the database.
63    fn flush(&mut self) -> impl Future<Output = ()> + Send;
64}
65
66/// A handle to the InfluxDb metrics recorder.
67/// Aborts the submission task when dropped.
68#[derive(Debug)]
69pub struct InfluxDbHandle<M> {
70    /// The channel for submitting metrics.
71    channel: mpsc::Sender<M>,
72    /// The metrics task, which consumes the metrics in the channel and submits them in an
73    /// infinite loop.
74    metrics_task: tokio::task::JoinHandle<()>,
75}
76
77impl<M> Drop for InfluxDbHandle<M> {
78    fn drop(&mut self) {
79        self.metrics_task.abort(); // Prevent the task from leaking.
80    }
81}
82
83impl<M> InfluxDbHandle<M>
84where
85    M: Send + 'static,
86{
87    /// Start the metrics task.
88    /// This task will run indefinitely, but will be aborted when the handle is dropped.
89    pub fn new<C>(consumer: C, push_interval: u64, buffer_size: usize) -> Self
90    where
91        C: MetricsConsumer<Metric = M> + Send + 'static,
92    {
93        let (tx, rx) = mpsc::channel(buffer_size);
94
95        let task = Self::push_loop(consumer, rx, push_interval);
96
97        Self {
98            channel: tx,
99            metrics_task: tokio::task::spawn(task),
100        }
101    }
102
103    /// Submit a metric.
104    /// There is no strong guarantee that the metric will be recorded. It may actually be
105    /// discarded if we're struggling to dispatch all metrics.
106    pub fn submit(&self, metric: M) {
107        if let Err(error) = self.channel.try_send(metric) {
108            tracing::error!("Failed to submit metric: {}", error);
109        }
110    }
111
112    /// InfluxDb push loop.
113    /// This function will run indefinitely, so it must be placed inside a task so that it can
114    /// be aborted when we're done.
115    #[tracing::instrument(skip(consumer, channel))]
116    async fn push_loop<C>(mut consumer: C, mut channel: mpsc::Receiver<M>, push_interval: u64)
117    where
118        C: MetricsConsumer<Metric = M>,
119    {
120        let mut interval = tokio::time::interval(Duration::from_secs(push_interval));
121        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
122
123        tracing::info!("Starting InfluxDb metrics loop");
124
125        loop {
126            tokio::select! {
127                result = channel.recv() => match result {
128                    Some(metric) => consumer.accept(metric),
129                    None => { // Channel is closed, abort metrics task.
130                        tracing::info!("Shutting down metrics task.");
131                        break
132                    }
133                },
134
135                _ = interval.tick() => consumer.flush().await,
136            }
137        }
138    }
139}