use std::{fmt::Debug, sync::Arc};
use anyhow::Context;
use tokio::{
runtime,
sync::{
mpsc::{self, Receiver},
oneshot, RwLock, RwLockReadGuard,
},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use super::{
def::{Metric, RawMetricId},
duplicate::DuplicateReaction,
error::MetricCreationError,
registry::MetricRegistry,
};
use crate::{metrics::duplicate::DuplicateCriteria, pipeline::naming::namespace::Namespace2};
use listener::{ListenerName, MetricListener, MetricListenerBuilder};
pub enum ControlMessage {
RegisterMetrics {
metrics: Vec<Metric>,
duplicate_criteria: DuplicateCriteria,
on_duplicate: DuplicateReaction,
reply_to: Option<oneshot::Sender<Vec<Result<RawMetricId, MetricCreationError>>>>,
},
Subscribe(ListenerName, Box<dyn listener::MetricListenerBuilder + Send>),
}
pub(crate) struct MetricRegistryControl {
registry: Arc<RwLock<MetricRegistry>>,
listeners: Vec<(ListenerName, Box<dyn MetricListener>)>,
}
pub mod listener {
use crate::metrics::def::{Metric, RawMetricId};
pub trait MetricListener: FnMut(Vec<(RawMetricId, Metric)>) -> anyhow::Result<()> + Send {}
impl<F> MetricListener for F where F: FnMut(Vec<(RawMetricId, Metric)>) -> anyhow::Result<()> + Send {}
pub(super) struct BuildContext<'a> {
pub(super) rt: &'a tokio::runtime::Handle,
}
pub trait MetricListenerBuildContext {
fn async_runtime(&self) -> &tokio::runtime::Handle;
}
impl MetricListenerBuildContext for BuildContext<'_> {
fn async_runtime(&self) -> &tokio::runtime::Handle {
self.rt
}
}
pub trait MetricListenerBuilder:
FnOnce(&mut dyn MetricListenerBuildContext) -> anyhow::Result<Box<dyn MetricListener>>
{
}
impl<F> MetricListenerBuilder for F where
F: FnOnce(&mut dyn MetricListenerBuildContext) -> anyhow::Result<Box<dyn MetricListener>>
{
}
pub struct ListenerName {
pub plugin: String,
pub name: String,
}
}
impl MetricRegistryControl {
pub fn new(registry: MetricRegistry) -> Self {
Self {
registry: Arc::new(RwLock::new(registry)),
listeners: Vec::new(),
}
}
pub fn create_listeners(
&mut self,
builders: Namespace2<Box<dyn MetricListenerBuilder>>,
rt: &tokio::runtime::Handle,
) -> anyhow::Result<()> {
self.listeners.reserve_exact(builders.total_count());
for ((plugin_name, listener_name), builder) in builders {
let mut ctx = listener::BuildContext { rt };
let listener = builder(&mut ctx)
.with_context(|| format!("failed to create listener {plugin_name}/{listener_name}"))?;
let full_name = ListenerName {
plugin: plugin_name,
name: listener_name,
};
self.listeners.push((full_name, listener));
}
Ok(())
}
pub fn start(
self,
shutdown: CancellationToken,
on: &runtime::Handle,
) -> (MetricSender, MetricAccess, JoinHandle<()>) {
let (tx, rx) = mpsc::channel(256);
let reader = MetricAccess {
inner: self.registry.clone(),
};
let sender = MetricSender(tx);
let task = self.run(shutdown.clone(), rx);
let task_handle = on.spawn(task);
(sender, reader, task_handle)
}
async fn handle_message(&mut self, msg: ControlMessage) {
fn call_listener(name: &ListenerName, listener: &mut dyn MetricListener, metrics: Vec<(RawMetricId, Metric)>) {
let n = metrics.len();
if let Err(e) = listener(metrics) {
let plugin = &name.plugin;
let listener = &name.name;
log::error!("Error in metric listener {plugin}/{listener} (called on {n} metrics): {e}",);
}
}
match msg {
ControlMessage::RegisterMetrics {
metrics,
duplicate_criteria,
on_duplicate,
reply_to,
} => {
let n = metrics.len();
let mut copy = (*self.registry.read().await).clone();
let res = copy.register_many(metrics, duplicate_criteria, on_duplicate);
let mut new_metrics = Vec::with_capacity(n);
let mut results = Vec::with_capacity(n);
for individual_res in res {
if let Ok(metric_id) = individual_res {
let metric_def = copy
.by_id(&metric_id)
.expect("metric has just been registered and should exist");
new_metrics.push((metric_id, metric_def.to_owned()));
}
results.push(individual_res);
}
*self.registry.write().await = copy;
for (name, listener) in &mut self.listeners {
call_listener(name, listener, new_metrics.clone());
}
if let Some(tx) = reply_to {
if let Err(e) = tx.send(results) {
log::error!("Failed to send reply to metric registration message: {e:?}");
}
}
}
ControlMessage::Subscribe(name, listener) => {
let rt = tokio::runtime::Handle::current();
let plugin_name = name.plugin.clone();
let mut ns = Namespace2::new();
ns.add(name.plugin, name.name, listener as Box<dyn MetricListenerBuilder>)
.expect("the namespace is empty, there cannot be any duplicate");
if let Err(e) = self.create_listeners(ns, &rt) {
log::error!("Error while building a metric listener for plugin {plugin_name}: {e:?}");
}
}
}
}
pub async fn run(mut self, shutdown: CancellationToken, mut rx: Receiver<ControlMessage>) {
loop {
tokio::select! {
_ = shutdown.cancelled() => {
break;
},
message = rx.recv() => {
match message {
Some(msg) => self.handle_message(msg).await,
None => todo!("registry_control_loop#rx channel closed")
}
}
}
}
}
}
#[derive(Clone)]
pub struct MetricAccess {
inner: Arc<RwLock<MetricRegistry>>,
}
#[derive(Clone)]
pub struct MetricReader(MetricAccess);
#[derive(Clone)]
pub struct MetricSender(mpsc::Sender<ControlMessage>);
#[derive(thiserror::Error)]
pub enum SendError {
#[error("the control channel is full")]
ChannelFull(ControlMessage),
#[error("the pipeline has been shut down")]
Shutdown,
}
#[derive(thiserror::Error, Debug)]
pub enum SendWithReplyError {
#[error("message could not be sent")]
Send(SendError),
#[error("could not get a response from the registry")]
Recv(oneshot::error::RecvError),
}
impl Debug for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ChannelFull(msg) => {
let msg_short: &dyn Debug = &match msg {
ControlMessage::RegisterMetrics { .. } => "ControlMessage::RegisterMetrics(...)",
ControlMessage::Subscribe(_, _) => "ControlMessage::Subscribe(...)",
};
f.debug_tuple("ChannelFull").field(msg_short).finish()
}
Self::Shutdown => write!(f, "Shutdown"),
}
}
}
impl MetricAccess {
pub async fn read(&self) -> RwLockReadGuard<MetricRegistry> {
self.inner.read().await
}
pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<MetricRegistry> {
self.inner.write().await
}
pub fn into_read_only(self) -> MetricReader {
MetricReader(self)
}
}
impl MetricReader {
pub async fn read(&self) -> RwLockReadGuard<MetricRegistry> {
self.0.read().await
}
pub(crate) fn blocking_read(&self) -> RwLockReadGuard<MetricRegistry> {
self.0.inner.blocking_read()
}
}
impl MetricSender {
pub async fn send(&self, message: ControlMessage) -> Result<(), SendError> {
self.0.send(message).await.map_err(|_| SendError::Shutdown)
}
pub fn try_send(&self, message: ControlMessage) -> Result<(), SendError> {
match self.0.try_send(message) {
Ok(_) => Ok(()),
Err(mpsc::error::TrySendError::Full(m)) => Err(SendError::ChannelFull(m)),
Err(mpsc::error::TrySendError::Closed(_)) => Err(SendError::Shutdown),
}
}
pub async fn create_metrics(
&self,
metrics: Vec<Metric>,
on_duplicate: DuplicateReaction,
) -> Result<Vec<Result<RawMetricId, MetricCreationError>>, SendWithReplyError> {
let (tx, rx) = oneshot::channel();
let message = ControlMessage::RegisterMetrics {
metrics,
duplicate_criteria: DuplicateCriteria::Incompatible,
on_duplicate,
reply_to: Some(tx),
};
self.send(message).await.map_err(|e| SendWithReplyError::Send(e))?;
let result = rx.await.map_err(|e| SendWithReplyError::Recv(e))?;
Ok(result)
}
pub fn try_subscribe<F: MetricListenerBuilder + Send + 'static>(
&self,
name: ListenerName,
listener_builder: F,
) -> Result<(), SendError> {
self.try_send(ControlMessage::Subscribe(name, Box::new(listener_builder)))
}
pub async fn subscribe<F: MetricListenerBuilder + Send + 'static>(
&self,
name: ListenerName,
listener_builder: F,
) -> Result<(), SendError> {
self.send(ControlMessage::Subscribe(name, Box::new(listener_builder)))
.await
}
}