nexo_pairing/
plugin_metrics.rs1use std::time::Duration;
36
37use nexo_broker::{AnyBroker, BrokerHandle, Message};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
41
42#[derive(Debug, Clone)]
44pub struct PluginMetricsDescriptor {
45 pub plugin_id: String,
46 pub broker_topic_prefix: String,
47 pub timeout: Duration,
48}
49
50impl PluginMetricsDescriptor {
51 pub fn new(plugin_id: impl Into<String>, broker_topic_prefix: impl Into<String>) -> Self {
52 Self {
53 plugin_id: plugin_id.into(),
54 broker_topic_prefix: broker_topic_prefix.into(),
55 timeout: DEFAULT_TIMEOUT,
56 }
57 }
58
59 pub fn with_timeout(mut self, timeout: Duration) -> Self {
60 self.timeout = timeout;
61 self
62 }
63}
64
65pub async fn scrape_all(
74 broker: &AnyBroker,
75 descriptors: &[PluginMetricsDescriptor],
76) -> String {
77 if descriptors.is_empty() {
78 return String::new();
79 }
80 let mut results = Vec::with_capacity(descriptors.len());
86 for d in descriptors {
87 results.push(scrape_one(broker, d).await);
88 }
89 let mut out = String::new();
90 for (descriptor, result) in descriptors.iter().zip(results.into_iter()) {
91 match result {
92 Ok(text) if text.is_empty() => {}
93 Ok(text) => {
94 out.push_str(&text);
95 if !text.ends_with('\n') {
96 out.push('\n');
97 }
98 }
99 Err(err) => {
100 tracing::warn!(
101 plugin = %descriptor.plugin_id,
102 error = %err,
103 "plugin metrics scrape failed; skipping",
104 );
105 }
106 }
107 }
108 out
109}
110
111pub async fn scrape_one(
115 broker: &AnyBroker,
116 descriptor: &PluginMetricsDescriptor,
117) -> Result<String, PluginMetricsScrapeError> {
118 let topic = format!("{}.metrics.scrape", descriptor.broker_topic_prefix);
119 let msg = Message::new(topic.clone(), json!({}));
120 let reply = broker
121 .request(&topic, msg, descriptor.timeout)
122 .await
123 .map_err(|e| PluginMetricsScrapeError::Broker(e.to_string()))?;
124 let text = reply
125 .payload
126 .get("text")
127 .and_then(|v| v.as_str())
128 .unwrap_or("")
129 .to_string();
130 Ok(text)
131}
132
133#[derive(Debug, thiserror::Error)]
134pub enum PluginMetricsScrapeError {
135 #[error("broker error: {0}")]
136 Broker(String),
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
144 async fn scrape_one_returns_broker_error_when_no_subscriber() {
145 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
146 let d = PluginMetricsDescriptor::new("email", "plugin.email")
147 .with_timeout(Duration::from_millis(100));
148 let result = scrape_one(&broker, &d).await;
149 assert!(matches!(result, Err(PluginMetricsScrapeError::Broker(_))));
150 }
151
152 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
153 async fn scrape_all_returns_empty_string_when_no_descriptors() {
154 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
155 let out = scrape_all(&broker, &[]).await;
156 assert_eq!(out, "");
157 }
158
159 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
160 async fn scrape_all_skips_failed_plugins_without_panicking() {
161 let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
162 let descriptors = vec![
163 PluginMetricsDescriptor::new("email", "plugin.email")
164 .with_timeout(Duration::from_millis(50)),
165 PluginMetricsDescriptor::new("telegram", "plugin.telegram")
166 .with_timeout(Duration::from_millis(50)),
167 ];
168 let out = scrape_all(&broker, &descriptors).await;
172 assert_eq!(out, "");
173 }
174
175 #[test]
176 fn descriptor_with_timeout_overrides_default() {
177 let d = PluginMetricsDescriptor::new("email", "plugin.email")
178 .with_timeout(Duration::from_secs(10));
179 assert_eq!(d.timeout, Duration::from_secs(10));
180 }
181
182 #[test]
183 fn descriptor_default_timeout_is_five_seconds() {
184 let d = PluginMetricsDescriptor::new("email", "plugin.email");
185 assert_eq!(d.timeout, DEFAULT_TIMEOUT);
186 }
187}