devboy-mcp 0.28.1

MCP (Model Context Protocol) server for devboy-tools — JSON-RPC 2.0 over stdio, exposing every devboy provider as MCP tools to AI agents.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
//! Telemetry pipeline — records every tool invocation and ships batches to a
//! configurable HTTP endpoint, even when the call was executed locally.
//!
//! # Why on the proxy side?
//!
//! With transparent routing, some tool calls never reach an upstream MCP server.
//! Operators who aggregate usage elsewhere (usage dashboards, billing, audit) still
//! want those invocations to show up somewhere, so the proxy forwards a minimal event
//! payload (no tool arguments, no responses) to the endpoint configured under
//! `[proxy.telemetry].endpoint`. When no endpoint is configured, the pipeline still
//! buffers events locally but never attempts an upload.
//!
//! # Back pressure
//!
//! A single bounded in-memory queue with drop-oldest semantics. We deliberately avoid
//! SQLite here — the queue exists to ride out short connectivity hiccups, not to persist
//! across restarts. If the operator wants durable telemetry they can run the proxy in a
//! supervised process with a writable working directory and a `JSONL` sink (future
//! extension).
//!
//! # Scheduling
//!
//! The uploader flushes whenever either:
//! - the buffer reaches `batch_size`, or
//! - `batch_interval_secs` have elapsed since the last flush attempt.
//!
//! When the endpoint is unset the pipeline still records events (helpful for CLI debug
//! commands) but never attempts an upload.

use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use devboy_core::config::ProxyTelemetryConfig;
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tracing::{debug, warn};

/// Status of a single tool invocation.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TelemetryStatus {
    Success,
    Error,
}

/// Minimal event payload — intentionally excludes tool arguments and responses.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryEvent {
    /// Unprefixed tool name (`get_issues`).
    pub tool: String,
    /// Routing decision label (`strategy_remote`, `override_rule`, …).
    pub routing_decision: String,
    /// Optional detail for `override_rule` (the pattern that matched).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub routing_detail: Option<String>,
    /// Upstream prefix when the call went remote (`cloud`); `None` for local.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub upstream: Option<String>,
    pub status: TelemetryStatus,
    /// Wall-clock latency observed by the proxy.
    pub latency_ms: u64,
    /// Unix epoch seconds at the time the decision was made.
    pub timestamp_secs: u64,
    /// Optional fallback marker — true if the primary executor failed and we retried.
    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
    pub was_fallback: bool,
}

impl TelemetryEvent {
    pub fn now(tool: impl Into<String>, routing_decision: impl Into<String>) -> Self {
        Self {
            tool: tool.into(),
            routing_decision: routing_decision.into(),
            routing_detail: None,
            upstream: None,
            status: TelemetryStatus::Success,
            latency_ms: 0,
            timestamp_secs: unix_now(),
            was_fallback: false,
        }
    }
}

fn unix_now() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0)
}

/// Authentication credentials used when uploading batches.
#[derive(Clone, Default)]
pub struct TelemetryAuth {
    pub bearer_token: Option<secrecy::SecretString>,
}

/// Request body shape expected by the backend (documented for the future endpoint).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryBatch {
    pub events: Vec<TelemetryEvent>,
}

/// Shared event buffer. `record()` is cheap and lock-bound; clones are light (Arc).
///
/// Internally carries a `Notify` that the buffer signals when the queue length crosses
/// the `flush_threshold` — the uploader waits on this alongside its periodic ticker so
/// that "batch_size reached" triggers an immediate flush without waiting for the next
/// interval tick.
#[derive(Clone)]
pub struct TelemetryBuffer {
    inner: Arc<Mutex<VecDeque<TelemetryEvent>>>,
    capacity: usize,
    /// Threshold at which `record()` wakes the flusher. Set by the pipeline owner; a
    /// freshly constructed buffer uses `capacity` (effectively "never size-triggered").
    flush_threshold: Arc<std::sync::atomic::AtomicUsize>,
    /// Signal for the background flush task to wake immediately.
    size_trigger: Arc<Notify>,
}

impl TelemetryBuffer {
    pub fn new(capacity: usize) -> Self {
        Self {
            inner: Arc::new(Mutex::new(VecDeque::with_capacity(capacity.min(1024)))),
            capacity,
            flush_threshold: Arc::new(std::sync::atomic::AtomicUsize::new(capacity)),
            size_trigger: Arc::new(Notify::new()),
        }
    }

    /// Set the queue length at which `record()` wakes the flusher. Called by the
    /// pipeline once `batch_size` is known.
    pub fn set_flush_threshold(&self, threshold: usize) {
        self.flush_threshold
            .store(threshold.max(1), std::sync::atomic::Ordering::Relaxed);
    }

    /// Notifier the background flusher waits on for size-triggered flushes.
    pub fn size_trigger(&self) -> Arc<Notify> {
        self.size_trigger.clone()
    }

    /// Push an event. When the queue is full, drops the oldest event to make room.
    /// Wakes the background flusher as soon as the queue length crosses
    /// [`Self::set_flush_threshold`].
    pub async fn record(&self, event: TelemetryEvent) {
        let (len_after, threshold) = {
            let mut guard = self.inner.lock().await;
            while guard.len() >= self.capacity {
                let dropped = guard.pop_front();
                debug!(
                    dropped = ?dropped.as_ref().map(|e| e.tool.as_str()),
                    "telemetry buffer full, dropping oldest event"
                );
            }
            guard.push_back(event);
            (
                guard.len(),
                self.flush_threshold
                    .load(std::sync::atomic::Ordering::Relaxed),
            )
        };
        if len_after >= threshold {
            self.size_trigger.notify_one();
        }
    }

    /// Drain up to `max` events without releasing the lock between pops.
    pub async fn drain(&self, max: usize) -> Vec<TelemetryEvent> {
        let mut guard = self.inner.lock().await;
        let n = guard.len().min(max);
        guard.drain(..n).collect()
    }

    /// Return events to the front of the queue (after a failed upload).
    pub async fn requeue_front(&self, events: Vec<TelemetryEvent>) {
        let mut guard = self.inner.lock().await;
        for event in events.into_iter().rev() {
            if guard.len() >= self.capacity {
                // Full — drop the last event (tail) to keep the retried batch prioritized.
                guard.pop_back();
            }
            guard.push_front(event);
        }
    }

    pub async fn len(&self) -> usize {
        self.inner.lock().await.len()
    }

    pub async fn is_empty(&self) -> bool {
        self.inner.lock().await.is_empty()
    }
}

/// Uploader — owns the HTTP client and knows how to ship a batch to `endpoint`.
#[derive(Clone)]
pub struct TelemetryUploader {
    endpoint: String,
    auth: TelemetryAuth,
    http: reqwest::Client,
}

impl TelemetryUploader {
    pub fn new(endpoint: String, auth: TelemetryAuth) -> devboy_core::Result<Self> {
        let http = reqwest::Client::builder()
            .timeout(Duration::from_secs(15))
            .build()
            .map_err(|e| devboy_core::Error::Http(format!("telemetry client build: {}", e)))?;
        Ok(Self {
            endpoint,
            auth,
            http,
        })
    }

    /// POST a batch; returns Ok when the server accepted it (2xx).
    pub async fn upload(&self, batch: &TelemetryBatch) -> devboy_core::Result<()> {
        let mut req = self
            .http
            .post(&self.endpoint)
            .header("content-type", "application/json");
        if let Some(token) = &self.auth.bearer_token {
            use secrecy::ExposeSecret;
            req = req.header("authorization", format!("Bearer {}", token.expose_secret()));
        }
        let resp = req.json(batch).send().await.map_err(|e| {
            devboy_core::Error::Http(format!("telemetry upload to {}: {}", self.endpoint, e))
        })?;
        let status = resp.status();
        if !status.is_success() {
            let body = resp.text().await.unwrap_or_default();
            return Err(devboy_core::Error::Http(format!(
                "telemetry upload rejected: HTTP {}{}",
                status, body
            )));
        }
        Ok(())
    }
}

/// The full pipeline — owns the buffer, an uploader, and a background flush task.
///
/// Callers get a cheap `TelemetryBuffer` clone via [`TelemetryPipeline::buffer`] so hot
/// paths do not need to pay for the uploader machinery.
pub struct TelemetryPipeline {
    buffer: TelemetryBuffer,
    config: ProxyTelemetryConfig,
    uploader: Option<TelemetryUploader>,
    task: Option<JoinHandle<()>>,
    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}

impl TelemetryPipeline {
    pub fn new(config: ProxyTelemetryConfig) -> Self {
        let capacity = config.offline_queue_max.max(16);
        let buffer = TelemetryBuffer::new(capacity);
        Self {
            buffer,
            config,
            uploader: None,
            task: None,
            shutdown_tx: None,
        }
    }

    pub fn buffer(&self) -> TelemetryBuffer {
        self.buffer.clone()
    }

    pub fn config(&self) -> &ProxyTelemetryConfig {
        &self.config
    }

    /// Wire up the uploader and start the flush task. No-op when telemetry is disabled
    /// or when `endpoint` is not set.
    pub fn start(&mut self, auth: TelemetryAuth) -> devboy_core::Result<()> {
        if !self.config.enabled {
            debug!("telemetry disabled in config; skipping uploader");
            return Ok(());
        }
        let Some(endpoint) = self.config.endpoint.clone() else {
            debug!("telemetry endpoint unset; events buffered but not uploaded");
            return Ok(());
        };

        let uploader = TelemetryUploader::new(endpoint, auth)?;
        self.uploader = Some(uploader.clone());

        let buffer = self.buffer.clone();
        let batch_size = self.config.batch_size.max(1);
        let interval = Duration::from_secs(self.config.batch_interval_secs.max(1));
        // Arm the buffer so `record()` pokes the flusher the moment the queue reaches
        // `batch_size` — satisfies the "batch_size reached OR interval elapsed"
        // contract without waiting up to `batch_interval_secs` every time.
        buffer.set_flush_threshold(batch_size);
        let size_trigger = buffer.size_trigger();
        let (tx, mut rx) = tokio::sync::oneshot::channel();
        self.shutdown_tx = Some(tx);

        let task = tokio::spawn(async move {
            let mut ticker = tokio::time::interval(interval);
            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
            loop {
                tokio::select! {
                    _ = &mut rx => {
                        // Best-effort flush on shutdown.
                        let events = buffer.drain(usize::MAX).await;
                        if !events.is_empty() {
                            let _ = uploader.upload(&TelemetryBatch { events }).await;
                        }
                        break;
                    }
                    _ = ticker.tick() => {
                        flush_once(&buffer, &uploader, batch_size).await;
                    }
                    _ = size_trigger.notified() => {
                        flush_once(&buffer, &uploader, batch_size).await;
                    }
                }
            }
        });

        self.task = Some(task);
        Ok(())
    }

    /// Force an immediate flush. Returns the number of events uploaded.
    pub async fn flush(&self) -> devboy_core::Result<usize> {
        let Some(uploader) = &self.uploader else {
            return Ok(0);
        };
        let events = self.buffer.drain(usize::MAX).await;
        let n = events.len();
        if !events.is_empty()
            && let Err(e) = uploader
                .upload(&TelemetryBatch {
                    events: events.clone(),
                })
                .await
        {
            // Requeue on failure to preserve at-least-once semantics.
            self.buffer.requeue_front(events).await;
            return Err(e);
        }
        Ok(n)
    }

    /// Gracefully stop the background task, doing one final flush attempt.
    pub async fn shutdown(&mut self) {
        if let Some(tx) = self.shutdown_tx.take() {
            let _ = tx.send(());
        }
        if let Some(handle) = self.task.take() {
            let _ = handle.await;
        }
    }
}

async fn flush_once(buffer: &TelemetryBuffer, uploader: &TelemetryUploader, batch_size: usize) {
    loop {
        let events = buffer.drain(batch_size).await;
        if events.is_empty() {
            return;
        }
        match uploader
            .upload(&TelemetryBatch {
                events: events.clone(),
            })
            .await
        {
            Ok(_) => {
                debug!(count = events.len(), "telemetry batch uploaded");
                if events.len() < batch_size {
                    return;
                }
            }
            Err(e) => {
                warn!(error = %e, "telemetry upload failed, retrying later");
                buffer.requeue_front(events).await;
                return;
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use httpmock::prelude::*;

    fn sample_event(tool: &str) -> TelemetryEvent {
        let mut ev = TelemetryEvent::now(tool, "strategy_remote");
        ev.latency_ms = 42;
        ev
    }

    #[tokio::test]
    async fn test_buffer_record_and_drain() {
        let buf = TelemetryBuffer::new(10);
        buf.record(sample_event("a")).await;
        buf.record(sample_event("b")).await;
        assert_eq!(buf.len().await, 2);

        let drained = buf.drain(1).await;
        assert_eq!(drained.len(), 1);
        assert_eq!(drained[0].tool, "a");
        assert_eq!(buf.len().await, 1);
    }

    #[tokio::test]
    async fn test_buffer_drop_oldest_when_full() {
        let buf = TelemetryBuffer::new(2);
        buf.record(sample_event("a")).await;
        buf.record(sample_event("b")).await;
        buf.record(sample_event("c")).await;

        let drained = buf.drain(10).await;
        assert_eq!(drained.len(), 2);
        assert_eq!(drained[0].tool, "b");
        assert_eq!(drained[1].tool, "c");
    }

    #[tokio::test]
    async fn test_size_trigger_fires_when_threshold_reached() {
        let buf = TelemetryBuffer::new(10);
        buf.set_flush_threshold(3);
        let notify = buf.size_trigger();

        // Two events stay below the threshold — notify must not fire yet.
        buf.record(sample_event("a")).await;
        buf.record(sample_event("b")).await;
        let early = tokio::time::timeout(Duration::from_millis(50), notify.notified()).await;
        assert!(
            early.is_err(),
            "size_trigger should not fire below threshold"
        );

        // Third event lands on the threshold — notify must fire.
        buf.record(sample_event("c")).await;
        let fired = tokio::time::timeout(Duration::from_millis(100), notify.notified()).await;
        assert!(
            fired.is_ok(),
            "size_trigger must fire when queue reaches threshold"
        );
    }

    #[tokio::test]
    async fn test_buffer_requeue_front() {
        let buf = TelemetryBuffer::new(5);
        buf.record(sample_event("new")).await;
        buf.requeue_front(vec![sample_event("old1"), sample_event("old2")])
            .await;
        let drained = buf.drain(10).await;
        assert_eq!(
            drained.iter().map(|e| e.tool.as_str()).collect::<Vec<_>>(),
            vec!["old1", "old2", "new"]
        );
    }

    #[tokio::test]
    async fn test_uploader_sends_bearer_header_and_payload() {
        let server = MockServer::start_async().await;
        let mock = server
            .mock_async(|when, then| {
                when.method(POST)
                    .path("/api/telemetry/tool-invocations")
                    .header("authorization", "Bearer my-token")
                    .body_includes(r#""tool":"get_issues""#);
                then.status(202).body("");
            })
            .await;

        let uploader = TelemetryUploader::new(
            format!("{}/api/telemetry/tool-invocations", server.base_url()),
            TelemetryAuth {
                bearer_token: Some("my-token".into()),
            },
        )
        .unwrap();

        let batch = TelemetryBatch {
            events: vec![sample_event("get_issues")],
        };
        uploader.upload(&batch).await.unwrap();
        mock.assert_async().await;
    }

    #[tokio::test]
    async fn test_uploader_reports_error_on_5xx() {
        let server = MockServer::start_async().await;
        server
            .mock_async(|when, then| {
                when.method(POST);
                then.status(500).body("boom");
            })
            .await;

        let uploader = TelemetryUploader::new(
            format!("{}/tele", server.base_url()),
            TelemetryAuth::default(),
        )
        .unwrap();

        let err = uploader
            .upload(&TelemetryBatch {
                events: vec![sample_event("x")],
            })
            .await
            .unwrap_err();
        let msg = err.to_string();
        assert!(msg.contains("500"));
        assert!(msg.contains("boom"));
    }

    #[tokio::test]
    async fn test_pipeline_flush_uploads_all_and_returns_count() {
        let server = MockServer::start_async().await;
        server
            .mock_async(|when, then| {
                when.method(POST);
                then.status(200).body("");
            })
            .await;

        let cfg = ProxyTelemetryConfig {
            endpoint: Some(format!("{}/t", server.base_url())),
            ..Default::default()
        };

        let mut pipeline = TelemetryPipeline::new(cfg);
        pipeline.start(TelemetryAuth::default()).unwrap();
        pipeline.buffer().record(sample_event("a")).await;
        pipeline.buffer().record(sample_event("b")).await;

        let n = pipeline.flush().await.unwrap();
        assert_eq!(n, 2);
        pipeline.shutdown().await;
    }

    #[tokio::test]
    async fn test_pipeline_disabled_is_noop() {
        let cfg = ProxyTelemetryConfig {
            enabled: false,
            ..Default::default()
        };
        let mut pipeline = TelemetryPipeline::new(cfg);
        pipeline.start(TelemetryAuth::default()).unwrap();
        pipeline.buffer().record(sample_event("x")).await;

        // Flush is a no-op — uploader not wired.
        let n = pipeline.flush().await.unwrap();
        assert_eq!(n, 0);
        pipeline.shutdown().await;
    }

    #[tokio::test]
    async fn test_pipeline_without_endpoint_still_buffers_but_does_not_upload() {
        let cfg = ProxyTelemetryConfig {
            enabled: true,
            endpoint: None,
            ..Default::default()
        };
        let mut pipeline = TelemetryPipeline::new(cfg);
        pipeline.start(TelemetryAuth::default()).unwrap();
        pipeline.buffer().record(sample_event("x")).await;
        assert_eq!(pipeline.buffer().len().await, 1);
        pipeline.shutdown().await;
    }

    #[tokio::test]
    async fn test_flush_requeues_on_failure() {
        let server = MockServer::start_async().await;
        server
            .mock_async(|when, then| {
                when.method(POST);
                then.status(500).body("");
            })
            .await;

        let cfg = ProxyTelemetryConfig {
            endpoint: Some(format!("{}/t", server.base_url())),
            ..Default::default()
        };

        let mut pipeline = TelemetryPipeline::new(cfg);
        pipeline.start(TelemetryAuth::default()).unwrap();
        pipeline.buffer().record(sample_event("a")).await;

        assert!(pipeline.flush().await.is_err());
        assert_eq!(pipeline.buffer().len().await, 1);
        pipeline.shutdown().await;
    }
}