Skip to main content

nexo_pairing/
plugin_metrics.rs

1//! Phase 81.33.b.real Stage 5 — daemon-side plugin metrics
2//! scrape.
3//!
4//! Plugins declare `[plugin.metrics] prometheus = true` +
5//! `broker_topic_prefix = "..."`. The daemon's `/metrics`
6//! handler iterates the registered scrape descriptors on every
7//! request, issues `<broker_topic_prefix>.metrics.scrape`
8//! broker RPC, and concatenates the returned Prometheus text
9//! into the aggregate response.
10//!
11//! Replaces the previous pattern where each plugin's metrics
12//! call was hardcoded inside `run_metrics_server` (e.g. the
13//! legacy `nexo_plugin_email::metrics::render_prometheus(...)`
14//! direct call).
15//!
16//! ## Wire format
17//!
18//! Daemon → plugin on `<broker_topic_prefix>.metrics.scrape`:
19//!
20//! ```json
21//! {}
22//! ```
23//!
24//! Plugin replies:
25//!
26//! ```json
27//! { "text": "# HELP <metric_name> ...\n# TYPE ...\n<metric> <value>\n..." }
28//! ```
29//!
30//! Empty / missing `text` is treated as a successful scrape with
31//! no metrics. Plugins that fail to respond (timeout, broker
32//! error, malformed reply) are skipped with a warn-level log —
33//! one slow plugin does NOT stall the aggregate response.
34
35use std::time::Duration;
36
37use nexo_broker::{AnyBroker, BrokerHandle, Message};
38use serde_json::json;
39
40const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
41
42/// One entry per plugin opting into metrics scrape.
43#[derive(Debug, Clone)]
44pub struct PluginMetricsDescriptor {
45    pub plugin_id: String,
46    pub broker_topic_prefix: String,
47    pub timeout: Duration,
48}
49
50impl PluginMetricsDescriptor {
51    pub fn new(plugin_id: impl Into<String>, broker_topic_prefix: impl Into<String>) -> Self {
52        Self {
53            plugin_id: plugin_id.into(),
54            broker_topic_prefix: broker_topic_prefix.into(),
55            timeout: DEFAULT_TIMEOUT,
56        }
57    }
58
59    pub fn with_timeout(mut self, timeout: Duration) -> Self {
60        self.timeout = timeout;
61        self
62    }
63}
64
65/// Scrape every declaring plugin in parallel and return the
66/// concatenated Prometheus text. Plugins that fail (timeout,
67/// broker error, malformed reply) contribute the empty string
68/// + emit a warn-level log; one slow plugin does not stall the
69/// whole `/metrics` request.
70///
71/// Returns a single `String` with newline-separated plugin
72/// outputs. Caller appends to its own `/metrics` body.
73pub async fn scrape_all(broker: &AnyBroker, descriptors: &[PluginMetricsDescriptor]) -> String {
74    if descriptors.is_empty() {
75        return String::new();
76    }
77    // Issue every scrape sequentially. Concurrent join_all would
78    // be marginally faster but pulls a `futures` dep edge that
79    // this crate avoids; scrape volumes are small (≤10 plugins
80    // typical, 5s timeout each) so serialised dispatch stays
81    // well under any realistic Prometheus scrape budget.
82    let mut results = Vec::with_capacity(descriptors.len());
83    for d in descriptors {
84        results.push(scrape_one(broker, d).await);
85    }
86    let mut out = String::new();
87    for (descriptor, result) in descriptors.iter().zip(results) {
88        match result {
89            Ok(text) if text.is_empty() => {}
90            Ok(text) => {
91                out.push_str(&text);
92                if !text.ends_with('\n') {
93                    out.push('\n');
94                }
95            }
96            Err(err) => {
97                tracing::warn!(
98                    plugin = %descriptor.plugin_id,
99                    error = %err,
100                    "plugin metrics scrape failed; skipping",
101                );
102            }
103        }
104    }
105    out
106}
107
108/// Scrape a single plugin. Used by [`scrape_all`] internally;
109/// exposed for callers that want fine-grained control (e.g.
110/// admin RPC `/metrics/per_plugin` follow-up).
111pub async fn scrape_one(
112    broker: &AnyBroker,
113    descriptor: &PluginMetricsDescriptor,
114) -> Result<String, PluginMetricsScrapeError> {
115    let topic = format!("{}.metrics.scrape", descriptor.broker_topic_prefix);
116    let msg = Message::new(topic.clone(), json!({}));
117    let reply = broker
118        .request(&topic, msg, descriptor.timeout)
119        .await
120        .map_err(|e| PluginMetricsScrapeError::Broker(e.to_string()))?;
121    let text = reply
122        .payload
123        .get("text")
124        .and_then(|v| v.as_str())
125        .unwrap_or("")
126        .to_string();
127    Ok(text)
128}
129
130#[derive(Debug, thiserror::Error)]
131pub enum PluginMetricsScrapeError {
132    #[error("broker error: {0}")]
133    Broker(String),
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
141    async fn scrape_one_returns_broker_error_when_no_subscriber() {
142        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
143        let d = PluginMetricsDescriptor::new("email", "plugin.email")
144            .with_timeout(Duration::from_millis(100));
145        let result = scrape_one(&broker, &d).await;
146        assert!(matches!(result, Err(PluginMetricsScrapeError::Broker(_))));
147    }
148
149    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
150    async fn scrape_all_returns_empty_string_when_no_descriptors() {
151        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
152        let out = scrape_all(&broker, &[]).await;
153        assert_eq!(out, "");
154    }
155
156    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
157    async fn scrape_all_skips_failed_plugins_without_panicking() {
158        let broker = AnyBroker::Local(nexo_broker::LocalBroker::new());
159        let descriptors = vec![
160            PluginMetricsDescriptor::new("email", "plugin.email")
161                .with_timeout(Duration::from_millis(50)),
162            PluginMetricsDescriptor::new("telegram", "plugin.telegram")
163                .with_timeout(Duration::from_millis(50)),
164        ];
165        // Both plugins time out (no subscribers). Aggregate
166        // returns empty string but does NOT panic; the warn-log
167        // path is exercised.
168        let out = scrape_all(&broker, &descriptors).await;
169        assert_eq!(out, "");
170    }
171
172    #[test]
173    fn descriptor_with_timeout_overrides_default() {
174        let d = PluginMetricsDescriptor::new("email", "plugin.email")
175            .with_timeout(Duration::from_secs(10));
176        assert_eq!(d.timeout, Duration::from_secs(10));
177    }
178
179    #[test]
180    fn descriptor_default_timeout_is_five_seconds() {
181        let d = PluginMetricsDescriptor::new("email", "plugin.email");
182        assert_eq!(d.timeout, DEFAULT_TIMEOUT);
183    }
184}