Skip to main content

devboy_mcp/
telemetry.rs

1//! Telemetry pipeline — records every tool invocation and ships batches to a
2//! configurable HTTP endpoint, even when the call was executed locally.
3//!
4//! # Why on the proxy side?
5//!
6//! With transparent routing, some tool calls never reach an upstream MCP server.
7//! Operators who aggregate usage elsewhere (usage dashboards, billing, audit) still
8//! want those invocations to show up somewhere, so the proxy forwards a minimal event
9//! payload (no tool arguments, no responses) to the endpoint configured under
10//! `[proxy.telemetry].endpoint`. When no endpoint is configured, the pipeline still
11//! buffers events locally but never attempts an upload.
12//!
13//! # Back pressure
14//!
15//! A single bounded in-memory queue with drop-oldest semantics. We deliberately avoid
16//! SQLite here — the queue exists to ride out short connectivity hiccups, not to persist
17//! across restarts. If the operator wants durable telemetry they can run the proxy in a
18//! supervised process with a writable working directory and a `JSONL` sink (future
19//! extension).
20//!
21//! # Scheduling
22//!
23//! The uploader flushes whenever either:
24//! - the buffer reaches `batch_size`, or
25//! - `batch_interval_secs` have elapsed since the last flush attempt.
26//!
27//! When the endpoint is unset the pipeline still records events (helpful for CLI debug
28//! commands) but never attempts an upload.
29
30use std::collections::VecDeque;
31use std::sync::Arc;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use devboy_core::config::ProxyTelemetryConfig;
35use serde::{Deserialize, Serialize};
36use tokio::sync::{Mutex, Notify};
37use tokio::task::JoinHandle;
38use tracing::{debug, warn};
39
40/// Status of a single tool invocation.
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum TelemetryStatus {
44    Success,
45    Error,
46}
47
48/// Minimal event payload — intentionally excludes tool arguments and responses.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct TelemetryEvent {
51    /// Unprefixed tool name (`get_issues`).
52    pub tool: String,
53    /// Routing decision label (`strategy_remote`, `override_rule`, …).
54    pub routing_decision: String,
55    /// Optional detail for `override_rule` (the pattern that matched).
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub routing_detail: Option<String>,
58    /// Upstream prefix when the call went remote (`cloud`); `None` for local.
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub upstream: Option<String>,
61    pub status: TelemetryStatus,
62    /// Wall-clock latency observed by the proxy.
63    pub latency_ms: u64,
64    /// Unix epoch seconds at the time the decision was made.
65    pub timestamp_secs: u64,
66    /// Optional fallback marker — true if the primary executor failed and we retried.
67    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
68    pub was_fallback: bool,
69}
70
71impl TelemetryEvent {
72    pub fn now(tool: impl Into<String>, routing_decision: impl Into<String>) -> Self {
73        Self {
74            tool: tool.into(),
75            routing_decision: routing_decision.into(),
76            routing_detail: None,
77            upstream: None,
78            status: TelemetryStatus::Success,
79            latency_ms: 0,
80            timestamp_secs: unix_now(),
81            was_fallback: false,
82        }
83    }
84}
85
86fn unix_now() -> u64 {
87    SystemTime::now()
88        .duration_since(UNIX_EPOCH)
89        .map(|d| d.as_secs())
90        .unwrap_or(0)
91}
92
93/// Authentication credentials used when uploading batches.
94#[derive(Clone, Default)]
95pub struct TelemetryAuth {
96    pub bearer_token: Option<secrecy::SecretString>,
97}
98
99/// Request body shape expected by the backend (documented for the future endpoint).
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct TelemetryBatch {
102    pub events: Vec<TelemetryEvent>,
103}
104
105/// Shared event buffer. `record()` is cheap and lock-bound; clones are light (Arc).
106///
107/// Internally carries a `Notify` that the buffer signals when the queue length crosses
108/// the `flush_threshold` — the uploader waits on this alongside its periodic ticker so
109/// that "batch_size reached" triggers an immediate flush without waiting for the next
110/// interval tick.
111#[derive(Clone)]
112pub struct TelemetryBuffer {
113    inner: Arc<Mutex<VecDeque<TelemetryEvent>>>,
114    capacity: usize,
115    /// Threshold at which `record()` wakes the flusher. Set by the pipeline owner; a
116    /// freshly constructed buffer uses `capacity` (effectively "never size-triggered").
117    flush_threshold: Arc<std::sync::atomic::AtomicUsize>,
118    /// Signal for the background flush task to wake immediately.
119    size_trigger: Arc<Notify>,
120}
121
122impl TelemetryBuffer {
123    pub fn new(capacity: usize) -> Self {
124        Self {
125            inner: Arc::new(Mutex::new(VecDeque::with_capacity(capacity.min(1024)))),
126            capacity,
127            flush_threshold: Arc::new(std::sync::atomic::AtomicUsize::new(capacity)),
128            size_trigger: Arc::new(Notify::new()),
129        }
130    }
131
132    /// Set the queue length at which `record()` wakes the flusher. Called by the
133    /// pipeline once `batch_size` is known.
134    pub fn set_flush_threshold(&self, threshold: usize) {
135        self.flush_threshold
136            .store(threshold.max(1), std::sync::atomic::Ordering::Relaxed);
137    }
138
139    /// Notifier the background flusher waits on for size-triggered flushes.
140    pub fn size_trigger(&self) -> Arc<Notify> {
141        self.size_trigger.clone()
142    }
143
144    /// Push an event. When the queue is full, drops the oldest event to make room.
145    /// Wakes the background flusher as soon as the queue length crosses
146    /// [`Self::set_flush_threshold`].
147    pub async fn record(&self, event: TelemetryEvent) {
148        let (len_after, threshold) = {
149            let mut guard = self.inner.lock().await;
150            while guard.len() >= self.capacity {
151                let dropped = guard.pop_front();
152                debug!(
153                    dropped = ?dropped.as_ref().map(|e| e.tool.as_str()),
154                    "telemetry buffer full, dropping oldest event"
155                );
156            }
157            guard.push_back(event);
158            (
159                guard.len(),
160                self.flush_threshold
161                    .load(std::sync::atomic::Ordering::Relaxed),
162            )
163        };
164        if len_after >= threshold {
165            self.size_trigger.notify_one();
166        }
167    }
168
169    /// Drain up to `max` events without releasing the lock between pops.
170    pub async fn drain(&self, max: usize) -> Vec<TelemetryEvent> {
171        let mut guard = self.inner.lock().await;
172        let n = guard.len().min(max);
173        guard.drain(..n).collect()
174    }
175
176    /// Return events to the front of the queue (after a failed upload).
177    pub async fn requeue_front(&self, events: Vec<TelemetryEvent>) {
178        let mut guard = self.inner.lock().await;
179        for event in events.into_iter().rev() {
180            if guard.len() >= self.capacity {
181                // Full — drop the last event (tail) to keep the retried batch prioritized.
182                guard.pop_back();
183            }
184            guard.push_front(event);
185        }
186    }
187
188    pub async fn len(&self) -> usize {
189        self.inner.lock().await.len()
190    }
191
192    pub async fn is_empty(&self) -> bool {
193        self.inner.lock().await.is_empty()
194    }
195}
196
197/// Uploader — owns the HTTP client and knows how to ship a batch to `endpoint`.
198#[derive(Clone)]
199pub struct TelemetryUploader {
200    endpoint: String,
201    auth: TelemetryAuth,
202    http: reqwest::Client,
203}
204
205impl TelemetryUploader {
206    pub fn new(endpoint: String, auth: TelemetryAuth) -> devboy_core::Result<Self> {
207        let http = reqwest::Client::builder()
208            .timeout(Duration::from_secs(15))
209            .build()
210            .map_err(|e| devboy_core::Error::Http(format!("telemetry client build: {}", e)))?;
211        Ok(Self {
212            endpoint,
213            auth,
214            http,
215        })
216    }
217
218    /// POST a batch; returns Ok when the server accepted it (2xx).
219    pub async fn upload(&self, batch: &TelemetryBatch) -> devboy_core::Result<()> {
220        let mut req = self
221            .http
222            .post(&self.endpoint)
223            .header("content-type", "application/json");
224        if let Some(token) = &self.auth.bearer_token {
225            use secrecy::ExposeSecret;
226            req = req.header("authorization", format!("Bearer {}", token.expose_secret()));
227        }
228        let resp = req.json(batch).send().await.map_err(|e| {
229            devboy_core::Error::Http(format!("telemetry upload to {}: {}", self.endpoint, e))
230        })?;
231        let status = resp.status();
232        if !status.is_success() {
233            let body = resp.text().await.unwrap_or_default();
234            return Err(devboy_core::Error::Http(format!(
235                "telemetry upload rejected: HTTP {} — {}",
236                status, body
237            )));
238        }
239        Ok(())
240    }
241}
242
243/// The full pipeline — owns the buffer, an uploader, and a background flush task.
244///
245/// Callers get a cheap `TelemetryBuffer` clone via [`TelemetryPipeline::buffer`] so hot
246/// paths do not need to pay for the uploader machinery.
247pub struct TelemetryPipeline {
248    buffer: TelemetryBuffer,
249    config: ProxyTelemetryConfig,
250    uploader: Option<TelemetryUploader>,
251    task: Option<JoinHandle<()>>,
252    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
253}
254
255impl TelemetryPipeline {
256    pub fn new(config: ProxyTelemetryConfig) -> Self {
257        let capacity = config.offline_queue_max.max(16);
258        let buffer = TelemetryBuffer::new(capacity);
259        Self {
260            buffer,
261            config,
262            uploader: None,
263            task: None,
264            shutdown_tx: None,
265        }
266    }
267
268    pub fn buffer(&self) -> TelemetryBuffer {
269        self.buffer.clone()
270    }
271
272    pub fn config(&self) -> &ProxyTelemetryConfig {
273        &self.config
274    }
275
276    /// Wire up the uploader and start the flush task. No-op when telemetry is disabled
277    /// or when `endpoint` is not set.
278    pub fn start(&mut self, auth: TelemetryAuth) -> devboy_core::Result<()> {
279        if !self.config.enabled {
280            debug!("telemetry disabled in config; skipping uploader");
281            return Ok(());
282        }
283        let Some(endpoint) = self.config.endpoint.clone() else {
284            debug!("telemetry endpoint unset; events buffered but not uploaded");
285            return Ok(());
286        };
287
288        let uploader = TelemetryUploader::new(endpoint, auth)?;
289        self.uploader = Some(uploader.clone());
290
291        let buffer = self.buffer.clone();
292        let batch_size = self.config.batch_size.max(1);
293        let interval = Duration::from_secs(self.config.batch_interval_secs.max(1));
294        // Arm the buffer so `record()` pokes the flusher the moment the queue reaches
295        // `batch_size` — satisfies the "batch_size reached OR interval elapsed"
296        // contract without waiting up to `batch_interval_secs` every time.
297        buffer.set_flush_threshold(batch_size);
298        let size_trigger = buffer.size_trigger();
299        let (tx, mut rx) = tokio::sync::oneshot::channel();
300        self.shutdown_tx = Some(tx);
301
302        let task = tokio::spawn(async move {
303            let mut ticker = tokio::time::interval(interval);
304            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
305            loop {
306                tokio::select! {
307                    _ = &mut rx => {
308                        // Best-effort flush on shutdown.
309                        let events = buffer.drain(usize::MAX).await;
310                        if !events.is_empty() {
311                            let _ = uploader.upload(&TelemetryBatch { events }).await;
312                        }
313                        break;
314                    }
315                    _ = ticker.tick() => {
316                        flush_once(&buffer, &uploader, batch_size).await;
317                    }
318                    _ = size_trigger.notified() => {
319                        flush_once(&buffer, &uploader, batch_size).await;
320                    }
321                }
322            }
323        });
324
325        self.task = Some(task);
326        Ok(())
327    }
328
329    /// Force an immediate flush. Returns the number of events uploaded.
330    pub async fn flush(&self) -> devboy_core::Result<usize> {
331        let Some(uploader) = &self.uploader else {
332            return Ok(0);
333        };
334        let events = self.buffer.drain(usize::MAX).await;
335        let n = events.len();
336        if !events.is_empty()
337            && let Err(e) = uploader
338                .upload(&TelemetryBatch {
339                    events: events.clone(),
340                })
341                .await
342        {
343            // Requeue on failure to preserve at-least-once semantics.
344            self.buffer.requeue_front(events).await;
345            return Err(e);
346        }
347        Ok(n)
348    }
349
350    /// Gracefully stop the background task, doing one final flush attempt.
351    pub async fn shutdown(&mut self) {
352        if let Some(tx) = self.shutdown_tx.take() {
353            let _ = tx.send(());
354        }
355        if let Some(handle) = self.task.take() {
356            let _ = handle.await;
357        }
358    }
359}
360
361async fn flush_once(buffer: &TelemetryBuffer, uploader: &TelemetryUploader, batch_size: usize) {
362    loop {
363        let events = buffer.drain(batch_size).await;
364        if events.is_empty() {
365            return;
366        }
367        match uploader
368            .upload(&TelemetryBatch {
369                events: events.clone(),
370            })
371            .await
372        {
373            Ok(_) => {
374                debug!(count = events.len(), "telemetry batch uploaded");
375                if events.len() < batch_size {
376                    return;
377                }
378            }
379            Err(e) => {
380                warn!(error = %e, "telemetry upload failed, retrying later");
381                buffer.requeue_front(events).await;
382                return;
383            }
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use httpmock::prelude::*;
392
393    fn sample_event(tool: &str) -> TelemetryEvent {
394        let mut ev = TelemetryEvent::now(tool, "strategy_remote");
395        ev.latency_ms = 42;
396        ev
397    }
398
399    #[tokio::test]
400    async fn test_buffer_record_and_drain() {
401        let buf = TelemetryBuffer::new(10);
402        buf.record(sample_event("a")).await;
403        buf.record(sample_event("b")).await;
404        assert_eq!(buf.len().await, 2);
405
406        let drained = buf.drain(1).await;
407        assert_eq!(drained.len(), 1);
408        assert_eq!(drained[0].tool, "a");
409        assert_eq!(buf.len().await, 1);
410    }
411
412    #[tokio::test]
413    async fn test_buffer_drop_oldest_when_full() {
414        let buf = TelemetryBuffer::new(2);
415        buf.record(sample_event("a")).await;
416        buf.record(sample_event("b")).await;
417        buf.record(sample_event("c")).await;
418
419        let drained = buf.drain(10).await;
420        assert_eq!(drained.len(), 2);
421        assert_eq!(drained[0].tool, "b");
422        assert_eq!(drained[1].tool, "c");
423    }
424
425    #[tokio::test]
426    async fn test_size_trigger_fires_when_threshold_reached() {
427        let buf = TelemetryBuffer::new(10);
428        buf.set_flush_threshold(3);
429        let notify = buf.size_trigger();
430
431        // Two events stay below the threshold — notify must not fire yet.
432        buf.record(sample_event("a")).await;
433        buf.record(sample_event("b")).await;
434        let early = tokio::time::timeout(Duration::from_millis(50), notify.notified()).await;
435        assert!(
436            early.is_err(),
437            "size_trigger should not fire below threshold"
438        );
439
440        // Third event lands on the threshold — notify must fire.
441        buf.record(sample_event("c")).await;
442        let fired = tokio::time::timeout(Duration::from_millis(100), notify.notified()).await;
443        assert!(
444            fired.is_ok(),
445            "size_trigger must fire when queue reaches threshold"
446        );
447    }
448
449    #[tokio::test]
450    async fn test_buffer_requeue_front() {
451        let buf = TelemetryBuffer::new(5);
452        buf.record(sample_event("new")).await;
453        buf.requeue_front(vec![sample_event("old1"), sample_event("old2")])
454            .await;
455        let drained = buf.drain(10).await;
456        assert_eq!(
457            drained.iter().map(|e| e.tool.as_str()).collect::<Vec<_>>(),
458            vec!["old1", "old2", "new"]
459        );
460    }
461
462    #[tokio::test]
463    async fn test_uploader_sends_bearer_header_and_payload() {
464        let server = MockServer::start_async().await;
465        let mock = server
466            .mock_async(|when, then| {
467                when.method(POST)
468                    .path("/api/telemetry/tool-invocations")
469                    .header("authorization", "Bearer my-token")
470                    .body_includes(r#""tool":"get_issues""#);
471                then.status(202).body("");
472            })
473            .await;
474
475        let uploader = TelemetryUploader::new(
476            format!("{}/api/telemetry/tool-invocations", server.base_url()),
477            TelemetryAuth {
478                bearer_token: Some("my-token".into()),
479            },
480        )
481        .unwrap();
482
483        let batch = TelemetryBatch {
484            events: vec![sample_event("get_issues")],
485        };
486        uploader.upload(&batch).await.unwrap();
487        mock.assert_async().await;
488    }
489
490    #[tokio::test]
491    async fn test_uploader_reports_error_on_5xx() {
492        let server = MockServer::start_async().await;
493        server
494            .mock_async(|when, then| {
495                when.method(POST);
496                then.status(500).body("boom");
497            })
498            .await;
499
500        let uploader = TelemetryUploader::new(
501            format!("{}/tele", server.base_url()),
502            TelemetryAuth::default(),
503        )
504        .unwrap();
505
506        let err = uploader
507            .upload(&TelemetryBatch {
508                events: vec![sample_event("x")],
509            })
510            .await
511            .unwrap_err();
512        let msg = err.to_string();
513        assert!(msg.contains("500"));
514        assert!(msg.contains("boom"));
515    }
516
517    #[tokio::test]
518    async fn test_pipeline_flush_uploads_all_and_returns_count() {
519        let server = MockServer::start_async().await;
520        server
521            .mock_async(|when, then| {
522                when.method(POST);
523                then.status(200).body("");
524            })
525            .await;
526
527        let cfg = ProxyTelemetryConfig {
528            endpoint: Some(format!("{}/t", server.base_url())),
529            ..Default::default()
530        };
531
532        let mut pipeline = TelemetryPipeline::new(cfg);
533        pipeline.start(TelemetryAuth::default()).unwrap();
534        pipeline.buffer().record(sample_event("a")).await;
535        pipeline.buffer().record(sample_event("b")).await;
536
537        let n = pipeline.flush().await.unwrap();
538        assert_eq!(n, 2);
539        pipeline.shutdown().await;
540    }
541
542    #[tokio::test]
543    async fn test_pipeline_disabled_is_noop() {
544        let cfg = ProxyTelemetryConfig {
545            enabled: false,
546            ..Default::default()
547        };
548        let mut pipeline = TelemetryPipeline::new(cfg);
549        pipeline.start(TelemetryAuth::default()).unwrap();
550        pipeline.buffer().record(sample_event("x")).await;
551
552        // Flush is a no-op — uploader not wired.
553        let n = pipeline.flush().await.unwrap();
554        assert_eq!(n, 0);
555        pipeline.shutdown().await;
556    }
557
558    #[tokio::test]
559    async fn test_pipeline_without_endpoint_still_buffers_but_does_not_upload() {
560        let cfg = ProxyTelemetryConfig {
561            enabled: true,
562            endpoint: None,
563            ..Default::default()
564        };
565        let mut pipeline = TelemetryPipeline::new(cfg);
566        pipeline.start(TelemetryAuth::default()).unwrap();
567        pipeline.buffer().record(sample_event("x")).await;
568        assert_eq!(pipeline.buffer().len().await, 1);
569        pipeline.shutdown().await;
570    }
571
572    #[tokio::test]
573    async fn test_flush_requeues_on_failure() {
574        let server = MockServer::start_async().await;
575        server
576            .mock_async(|when, then| {
577                when.method(POST);
578                then.status(500).body("");
579            })
580            .await;
581
582        let cfg = ProxyTelemetryConfig {
583            endpoint: Some(format!("{}/t", server.base_url())),
584            ..Default::default()
585        };
586
587        let mut pipeline = TelemetryPipeline::new(cfg);
588        pipeline.start(TelemetryAuth::default()).unwrap();
589        pipeline.buffer().record(sample_event("a")).await;
590
591        assert!(pipeline.flush().await.is_err());
592        assert_eq!(pipeline.buffer().len().await, 1);
593        pipeline.shutdown().await;
594    }
595}