Skip to main content

nexo_pairing/
plugin_metrics.rs

1//! Phase 81.33.b.real Stage 5 — daemon-side plugin metrics
2//! scrape.
3//!
4//! Plugins declare `[plugin.metrics] prometheus = true` +
5//! `broker_topic_prefix = "..."`. The daemon's `/metrics`
6//! handler iterates the registered scrape descriptors on every
7//! request, issues `<broker_topic_prefix>.metrics.scrape`
8//! broker RPC, and concatenates the returned Prometheus text
9//! into the aggregate response.
10//!
11//! Replaces the previous pattern where each plugin's metrics
12//! call was hardcoded inside `run_metrics_server` (e.g. the
13//! legacy `nexo_plugin_email::metrics::render_prometheus(...)`
14//! direct call).
15//!
16//! ## Wire format
17//!
18//! Daemon → plugin on `<broker_topic_prefix>.metrics.scrape`:
19//!
20//! ```json
21//! {}
22//! ```
23//!
24//! Plugin replies:
25//!
26//! ```json
27//! { "text": "# HELP <metric_name> ...\n# TYPE ...\n<metric> <value>\n..." }
28//! ```
29//!
30//! Empty / missing `text` is treated as a successful scrape with
31//! no metrics. Plugins that fail to respond (timeout, broker
32//! error, malformed reply) are skipped with a warn-level log —
33//! one slow plugin does NOT stall the aggregate response.
34
35use std::time::Duration;
36
37use nexo_broker::{AnyBroker, BrokerHandle, Message};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
41
42/// One entry per plugin opting into metrics scrape.
43#[derive(Debug, Clone)]
44pub struct PluginMetricsDescriptor {
45    pub plugin_id: String,
46    pub broker_topic_prefix: String,
47    pub timeout: Duration,
48}
49
50impl PluginMetricsDescriptor {
51    pub fn new(plugin_id: impl Into<String>, broker_topic_prefix: impl Into<String>) -> Self {
52        Self {
53            plugin_id: plugin_id.into(),
54            broker_topic_prefix: broker_topic_prefix.into(),
55            timeout: DEFAULT_TIMEOUT,
56        }
57    }
58
59    pub fn with_timeout(mut self, timeout: Duration) -> Self {
60        self.timeout = timeout;
61        self
62    }
63}
64
65/// Scrape every declaring plugin in parallel and return the
66/// concatenated Prometheus text. Plugins that fail (timeout,
67/// broker error, malformed reply) contribute the empty string
68/// + emit a warn-level log; one slow plugin does not stall the
69/// whole `/metrics` request.
70///
71/// Returns a single `String` with newline-separated plugin
72/// outputs. Caller appends to its own `/metrics` body.
73pub async fn scrape_all(
74    broker: &AnyBroker,
75    descriptors: &[PluginMetricsDescriptor],
76) -> String {
77    if descriptors.is_empty() {
78        return String::new();
79    }
80    // Issue every scrape sequentially. Concurrent join_all would
81    // be marginally faster but pulls a `futures` dep edge that
82    // this crate avoids; scrape volumes are small (≤10 plugins
83    // typical, 5s timeout each) so serialised dispatch stays
84    // well under any realistic Prometheus scrape budget.
85    let mut results = Vec::with_capacity(descriptors.len());
86    for d in descriptors {
87        results.push(scrape_one(broker, d).await);
88    }
89    let mut out = String::new();
90    for (descriptor, result) in descriptors.iter().zip(results.into_iter()) {
91        match result {
92            Ok(text) if text.is_empty() => {}
93            Ok(text) => {
94                out.push_str(&text);
95                if !text.ends_with('\n') {
96                    out.push('\n');
97                }
98            }
99            Err(err) => {
100                tracing::warn!(
101                    plugin = %descriptor.plugin_id,
102                    error = %err,
103                    "plugin metrics scrape failed; skipping",
104                );
105            }
106        }
107    }
108    out
109}
110
111/// Scrape a single plugin. Used by [`scrape_all`] internally;
112/// exposed for callers that want fine-grained control (e.g.
113/// admin RPC `/metrics/per_plugin` follow-up).
114pub async fn scrape_one(
115    broker: &AnyBroker,
116    descriptor: &PluginMetricsDescriptor,
117) -> Result<String, PluginMetricsScrapeError> {
118    let topic = format!("{}.metrics.scrape", descriptor.broker_topic_prefix);
119    let msg = Message::new(topic.clone(), json!({}));
120    let reply = broker
121        .request(&topic, msg, descriptor.timeout)
122        .await
123        .map_err(|e| PluginMetricsScrapeError::Broker(e.to_string()))?;
124    let text = reply
125        .payload
126        .get("text")
127        .and_then(|v| v.as_str())
128        .unwrap_or("")
129        .to_string();
130    Ok(text)
131}
132
133#[derive(Debug, thiserror::Error)]
134pub enum PluginMetricsScrapeError {
135    #[error("broker error: {0}")]
136    Broker(String),
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
144    async fn scrape_one_returns_broker_error_when_no_subscriber() {
145        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
146        let d = PluginMetricsDescriptor::new("email", "plugin.email")
147            .with_timeout(Duration::from_millis(100));
148        let result = scrape_one(&broker, &d).await;
149        assert!(matches!(result, Err(PluginMetricsScrapeError::Broker(_))));
150    }
151
152    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
153    async fn scrape_all_returns_empty_string_when_no_descriptors() {
154        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
155        let out = scrape_all(&broker, &[]).await;
156        assert_eq!(out, "");
157    }
158
159    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
160    async fn scrape_all_skips_failed_plugins_without_panicking() {
161        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
162        let descriptors = vec![
163            PluginMetricsDescriptor::new("email", "plugin.email")
164                .with_timeout(Duration::from_millis(50)),
165            PluginMetricsDescriptor::new("telegram", "plugin.telegram")
166                .with_timeout(Duration::from_millis(50)),
167        ];
168        // Both plugins time out (no subscribers). Aggregate
169        // returns empty string but does NOT panic; the warn-log
170        // path is exercised.
171        let out = scrape_all(&broker, &descriptors).await;
172        assert_eq!(out, "");
173    }
174
175    #[test]
176    fn descriptor_with_timeout_overrides_default() {
177        let d = PluginMetricsDescriptor::new("email", "plugin.email")
178            .with_timeout(Duration::from_secs(10));
179        assert_eq!(d.timeout, Duration::from_secs(10));
180    }
181
182    #[test]
183    fn descriptor_default_timeout_is_five_seconds() {
184        let d = PluginMetricsDescriptor::new("email", "plugin.email");
185        assert_eq!(d.timeout, DEFAULT_TIMEOUT);
186    }
187}