#![doc = include_str!("../README.md")]
#[cfg(feature = "util")]
pub mod util;
use std::{future::Future, time::Duration};
use futures::{stream::FuturesUnordered, StreamExt};
use influxdb::InfluxDbWriteable;
use tokio::{sync::mpsc, time::MissedTickBehavior};
pub trait IntoNamedQuery: InfluxDbWriteable + Sized {
fn into_named_query(self) -> influxdb::WriteQuery {
let type_name = std::any::type_name::<Self>();
let name = type_name
.rsplit_once("::")
.map(|(_, name)| name)
.unwrap_or(type_name);
InfluxDbWriteable::into_query(self, name)
}
}
impl<T: InfluxDbWriteable> IntoNamedQuery for T {}
pub async fn dispatch(client: &influxdb::Client, metric: influxdb::WriteQuery) {
if let Err(error) = client.query(metric).await {
tracing::error!("Failed to submit metric: {}", error);
}
}
pub async fn dispatch_many<I>(client: &influxdb::Client, metrics: I)
where
I: IntoIterator<Item = influxdb::WriteQuery>,
{
metrics
.into_iter()
.map(|metric| dispatch(client, metric))
.collect::<FuturesUnordered<_>>()
.collect::<()>()
.await;
}
pub trait MetricsConsumer {
type Metric;
fn new(client: influxdb::Client) -> Self;
fn accept(&mut self, metric: Self::Metric);
fn flush(&mut self) -> impl Future<Output = ()> + Send;
}
#[derive(Debug)]
pub struct InfluxDbHandle<M> {
channel: mpsc::Sender<M>,
metrics_task: tokio::task::JoinHandle<()>,
}
impl<M> Drop for InfluxDbHandle<M> {
fn drop(&mut self) {
self.metrics_task.abort(); }
}
impl<M> InfluxDbHandle<M>
where
M: Send + 'static,
{
pub fn new<C>(consumer: C, push_interval: u64, buffer_size: usize) -> Self
where
C: MetricsConsumer<Metric = M> + Send + 'static,
{
let (tx, rx) = mpsc::channel(buffer_size);
let task = Self::push_loop(consumer, rx, push_interval);
Self {
channel: tx,
metrics_task: tokio::task::spawn(task),
}
}
pub fn submit(&self, metric: M) {
if let Err(error) = self.channel.try_send(metric) {
tracing::error!("Failed to submit metric: {}", error);
}
}
#[tracing::instrument(skip(consumer, channel))]
async fn push_loop<C>(mut consumer: C, mut channel: mpsc::Receiver<M>, push_interval: u64)
where
C: MetricsConsumer<Metric = M>,
{
let mut interval = tokio::time::interval(Duration::from_secs(push_interval));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
tracing::info!("Starting InfluxDb metrics loop");
loop {
tokio::select! {
result = channel.recv() => match result {
Some(metric) => consumer.accept(metric),
None => { tracing::info!("Shutting down metrics task.");
break
}
},
_ = interval.tick() => consumer.flush().await,
}
}
}
}