nexo_pairing/
plugin_metrics.rs1use std::time::Duration;
36
37use nexo_broker::{AnyBroker, BrokerHandle, Message};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
41
42#[derive(Debug, Clone)]
44pub struct PluginMetricsDescriptor {
45 pub plugin_id: String,
46 pub broker_topic_prefix: String,
47 pub timeout: Duration,
48}
49
50impl PluginMetricsDescriptor {
51 pub fn new(plugin_id: impl Into<String>, broker_topic_prefix: impl Into<String>) -> Self {
52 Self {
53 plugin_id: plugin_id.into(),
54 broker_topic_prefix: broker_topic_prefix.into(),
55 timeout: DEFAULT_TIMEOUT,
56 }
57 }
58
59 pub fn with_timeout(mut self, timeout: Duration) -> Self {
60 self.timeout = timeout;
61 self
62 }
63}
64
65pub async fn scrape_all(broker: &AnyBroker, descriptors: &[PluginMetricsDescriptor]) -> String {
74 if descriptors.is_empty() {
75 return String::new();
76 }
77 let mut results = Vec::with_capacity(descriptors.len());
83 for d in descriptors {
84 results.push(scrape_one(broker, d).await);
85 }
86 let mut out = String::new();
87 for (descriptor, result) in descriptors.iter().zip(results) {
88 match result {
89 Ok(text) if text.is_empty() => {}
90 Ok(text) => {
91 out.push_str(&text);
92 if !text.ends_with('\n') {
93 out.push('\n');
94 }
95 }
96 Err(err) => {
97 tracing::warn!(
98 plugin = %descriptor.plugin_id,
99 error = %err,
100 "plugin metrics scrape failed; skipping",
101 );
102 }
103 }
104 }
105 out
106}
107
108pub async fn scrape_one(
112 broker: &AnyBroker,
113 descriptor: &PluginMetricsDescriptor,
114) -> Result<String, PluginMetricsScrapeError> {
115 let topic = format!("{}.metrics.scrape", descriptor.broker_topic_prefix);
116 let msg = Message::new(topic.clone(), json!({}));
117 let reply = broker
118 .request(&topic, msg, descriptor.timeout)
119 .await
120 .map_err(|e| PluginMetricsScrapeError::Broker(e.to_string()))?;
121 let text = reply
122 .payload
123 .get("text")
124 .and_then(|v| v.as_str())
125 .unwrap_or("")
126 .to_string();
127 Ok(text)
128}
129
130#[derive(Debug, thiserror::Error)]
131pub enum PluginMetricsScrapeError {
132 #[error("broker error: {0}")]
133 Broker(String),
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
141 async fn scrape_one_returns_broker_error_when_no_subscriber() {
142 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
143 let d = PluginMetricsDescriptor::new("email", "plugin.email")
144 .with_timeout(Duration::from_millis(100));
145 let result = scrape_one(&broker, &d).await;
146 assert!(matches!(result, Err(PluginMetricsScrapeError::Broker(_))));
147 }
148
149 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
150 async fn scrape_all_returns_empty_string_when_no_descriptors() {
151 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
152 let out = scrape_all(&broker, &[]).await;
153 assert_eq!(out, "");
154 }
155
156 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
157 async fn scrape_all_skips_failed_plugins_without_panicking() {
158 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
159 let descriptors = vec![
160 PluginMetricsDescriptor::new("email", "plugin.email")
161 .with_timeout(Duration::from_millis(50)),
162 PluginMetricsDescriptor::new("telegram", "plugin.telegram")
163 .with_timeout(Duration::from_millis(50)),
164 ];
165 let out = scrape_all(&broker, &descriptors).await;
169 assert_eq!(out, "");
170 }
171
172 #[test]
173 fn descriptor_with_timeout_overrides_default() {
174 let d = PluginMetricsDescriptor::new("email", "plugin.email")
175 .with_timeout(Duration::from_secs(10));
176 assert_eq!(d.timeout, Duration::from_secs(10));
177 }
178
179 #[test]
180 fn descriptor_default_timeout_is_five_seconds() {
181 let d = PluginMetricsDescriptor::new("email", "plugin.email");
182 assert_eq!(d.timeout, DEFAULT_TIMEOUT);
183 }
184}