1use std::collections::VecDeque;
31use std::sync::Arc;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34use devboy_core::config::ProxyTelemetryConfig;
35use serde::{Deserialize, Serialize};
36use tokio::sync::{Mutex, Notify};
37use tokio::task::JoinHandle;
38use tracing::{debug, warn};
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum TelemetryStatus {
44 Success,
45 Error,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct TelemetryEvent {
51 pub tool: String,
53 pub routing_decision: String,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub routing_detail: Option<String>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub upstream: Option<String>,
61 pub status: TelemetryStatus,
62 pub latency_ms: u64,
64 pub timestamp_secs: u64,
66 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
68 pub was_fallback: bool,
69}
70
71impl TelemetryEvent {
72 pub fn now(tool: impl Into<String>, routing_decision: impl Into<String>) -> Self {
73 Self {
74 tool: tool.into(),
75 routing_decision: routing_decision.into(),
76 routing_detail: None,
77 upstream: None,
78 status: TelemetryStatus::Success,
79 latency_ms: 0,
80 timestamp_secs: unix_now(),
81 was_fallback: false,
82 }
83 }
84}
85
86fn unix_now() -> u64 {
87 SystemTime::now()
88 .duration_since(UNIX_EPOCH)
89 .map(|d| d.as_secs())
90 .unwrap_or(0)
91}
92
93#[derive(Clone, Default)]
95pub struct TelemetryAuth {
96 pub bearer_token: Option<secrecy::SecretString>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct TelemetryBatch {
102 pub events: Vec<TelemetryEvent>,
103}
104
105#[derive(Clone)]
112pub struct TelemetryBuffer {
113 inner: Arc<Mutex<VecDeque<TelemetryEvent>>>,
114 capacity: usize,
115 flush_threshold: Arc<std::sync::atomic::AtomicUsize>,
118 size_trigger: Arc<Notify>,
120}
121
122impl TelemetryBuffer {
123 pub fn new(capacity: usize) -> Self {
124 Self {
125 inner: Arc::new(Mutex::new(VecDeque::with_capacity(capacity.min(1024)))),
126 capacity,
127 flush_threshold: Arc::new(std::sync::atomic::AtomicUsize::new(capacity)),
128 size_trigger: Arc::new(Notify::new()),
129 }
130 }
131
132 pub fn set_flush_threshold(&self, threshold: usize) {
135 self.flush_threshold
136 .store(threshold.max(1), std::sync::atomic::Ordering::Relaxed);
137 }
138
139 pub fn size_trigger(&self) -> Arc<Notify> {
141 self.size_trigger.clone()
142 }
143
144 pub async fn record(&self, event: TelemetryEvent) {
148 let (len_after, threshold) = {
149 let mut guard = self.inner.lock().await;
150 while guard.len() >= self.capacity {
151 let dropped = guard.pop_front();
152 debug!(
153 dropped = ?dropped.as_ref().map(|e| e.tool.as_str()),
154 "telemetry buffer full, dropping oldest event"
155 );
156 }
157 guard.push_back(event);
158 (
159 guard.len(),
160 self.flush_threshold
161 .load(std::sync::atomic::Ordering::Relaxed),
162 )
163 };
164 if len_after >= threshold {
165 self.size_trigger.notify_one();
166 }
167 }
168
169 pub async fn drain(&self, max: usize) -> Vec<TelemetryEvent> {
171 let mut guard = self.inner.lock().await;
172 let n = guard.len().min(max);
173 guard.drain(..n).collect()
174 }
175
176 pub async fn requeue_front(&self, events: Vec<TelemetryEvent>) {
178 let mut guard = self.inner.lock().await;
179 for event in events.into_iter().rev() {
180 if guard.len() >= self.capacity {
181 guard.pop_back();
183 }
184 guard.push_front(event);
185 }
186 }
187
188 pub async fn len(&self) -> usize {
189 self.inner.lock().await.len()
190 }
191
192 pub async fn is_empty(&self) -> bool {
193 self.inner.lock().await.is_empty()
194 }
195}
196
197#[derive(Clone)]
199pub struct TelemetryUploader {
200 endpoint: String,
201 auth: TelemetryAuth,
202 http: reqwest::Client,
203}
204
205impl TelemetryUploader {
206 pub fn new(endpoint: String, auth: TelemetryAuth) -> devboy_core::Result<Self> {
207 let http = reqwest::Client::builder()
208 .timeout(Duration::from_secs(15))
209 .build()
210 .map_err(|e| devboy_core::Error::Http(format!("telemetry client build: {}", e)))?;
211 Ok(Self {
212 endpoint,
213 auth,
214 http,
215 })
216 }
217
218 pub async fn upload(&self, batch: &TelemetryBatch) -> devboy_core::Result<()> {
220 let mut req = self
221 .http
222 .post(&self.endpoint)
223 .header("content-type", "application/json");
224 if let Some(token) = &self.auth.bearer_token {
225 use secrecy::ExposeSecret;
226 req = req.header("authorization", format!("Bearer {}", token.expose_secret()));
227 }
228 let resp = req.json(batch).send().await.map_err(|e| {
229 devboy_core::Error::Http(format!("telemetry upload to {}: {}", self.endpoint, e))
230 })?;
231 let status = resp.status();
232 if !status.is_success() {
233 let body = resp.text().await.unwrap_or_default();
234 return Err(devboy_core::Error::Http(format!(
235 "telemetry upload rejected: HTTP {} — {}",
236 status, body
237 )));
238 }
239 Ok(())
240 }
241}
242
243pub struct TelemetryPipeline {
248 buffer: TelemetryBuffer,
249 config: ProxyTelemetryConfig,
250 uploader: Option<TelemetryUploader>,
251 task: Option<JoinHandle<()>>,
252 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
253}
254
255impl TelemetryPipeline {
256 pub fn new(config: ProxyTelemetryConfig) -> Self {
257 let capacity = config.offline_queue_max.max(16);
258 let buffer = TelemetryBuffer::new(capacity);
259 Self {
260 buffer,
261 config,
262 uploader: None,
263 task: None,
264 shutdown_tx: None,
265 }
266 }
267
268 pub fn buffer(&self) -> TelemetryBuffer {
269 self.buffer.clone()
270 }
271
272 pub fn config(&self) -> &ProxyTelemetryConfig {
273 &self.config
274 }
275
276 pub fn start(&mut self, auth: TelemetryAuth) -> devboy_core::Result<()> {
279 if !self.config.enabled {
280 debug!("telemetry disabled in config; skipping uploader");
281 return Ok(());
282 }
283 let Some(endpoint) = self.config.endpoint.clone() else {
284 debug!("telemetry endpoint unset; events buffered but not uploaded");
285 return Ok(());
286 };
287
288 let uploader = TelemetryUploader::new(endpoint, auth)?;
289 self.uploader = Some(uploader.clone());
290
291 let buffer = self.buffer.clone();
292 let batch_size = self.config.batch_size.max(1);
293 let interval = Duration::from_secs(self.config.batch_interval_secs.max(1));
294 buffer.set_flush_threshold(batch_size);
298 let size_trigger = buffer.size_trigger();
299 let (tx, mut rx) = tokio::sync::oneshot::channel();
300 self.shutdown_tx = Some(tx);
301
302 let task = tokio::spawn(async move {
303 let mut ticker = tokio::time::interval(interval);
304 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
305 loop {
306 tokio::select! {
307 _ = &mut rx => {
308 let events = buffer.drain(usize::MAX).await;
310 if !events.is_empty() {
311 let _ = uploader.upload(&TelemetryBatch { events }).await;
312 }
313 break;
314 }
315 _ = ticker.tick() => {
316 flush_once(&buffer, &uploader, batch_size).await;
317 }
318 _ = size_trigger.notified() => {
319 flush_once(&buffer, &uploader, batch_size).await;
320 }
321 }
322 }
323 });
324
325 self.task = Some(task);
326 Ok(())
327 }
328
329 pub async fn flush(&self) -> devboy_core::Result<usize> {
331 let Some(uploader) = &self.uploader else {
332 return Ok(0);
333 };
334 let events = self.buffer.drain(usize::MAX).await;
335 let n = events.len();
336 if !events.is_empty()
337 && let Err(e) = uploader
338 .upload(&TelemetryBatch {
339 events: events.clone(),
340 })
341 .await
342 {
343 self.buffer.requeue_front(events).await;
345 return Err(e);
346 }
347 Ok(n)
348 }
349
350 pub async fn shutdown(&mut self) {
352 if let Some(tx) = self.shutdown_tx.take() {
353 let _ = tx.send(());
354 }
355 if let Some(handle) = self.task.take() {
356 let _ = handle.await;
357 }
358 }
359}
360
361async fn flush_once(buffer: &TelemetryBuffer, uploader: &TelemetryUploader, batch_size: usize) {
362 loop {
363 let events = buffer.drain(batch_size).await;
364 if events.is_empty() {
365 return;
366 }
367 match uploader
368 .upload(&TelemetryBatch {
369 events: events.clone(),
370 })
371 .await
372 {
373 Ok(_) => {
374 debug!(count = events.len(), "telemetry batch uploaded");
375 if events.len() < batch_size {
376 return;
377 }
378 }
379 Err(e) => {
380 warn!(error = %e, "telemetry upload failed, retrying later");
381 buffer.requeue_front(events).await;
382 return;
383 }
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use httpmock::prelude::*;
392
393 fn sample_event(tool: &str) -> TelemetryEvent {
394 let mut ev = TelemetryEvent::now(tool, "strategy_remote");
395 ev.latency_ms = 42;
396 ev
397 }
398
399 #[tokio::test]
400 async fn test_buffer_record_and_drain() {
401 let buf = TelemetryBuffer::new(10);
402 buf.record(sample_event("a")).await;
403 buf.record(sample_event("b")).await;
404 assert_eq!(buf.len().await, 2);
405
406 let drained = buf.drain(1).await;
407 assert_eq!(drained.len(), 1);
408 assert_eq!(drained[0].tool, "a");
409 assert_eq!(buf.len().await, 1);
410 }
411
412 #[tokio::test]
413 async fn test_buffer_drop_oldest_when_full() {
414 let buf = TelemetryBuffer::new(2);
415 buf.record(sample_event("a")).await;
416 buf.record(sample_event("b")).await;
417 buf.record(sample_event("c")).await;
418
419 let drained = buf.drain(10).await;
420 assert_eq!(drained.len(), 2);
421 assert_eq!(drained[0].tool, "b");
422 assert_eq!(drained[1].tool, "c");
423 }
424
425 #[tokio::test]
426 async fn test_size_trigger_fires_when_threshold_reached() {
427 let buf = TelemetryBuffer::new(10);
428 buf.set_flush_threshold(3);
429 let notify = buf.size_trigger();
430
431 buf.record(sample_event("a")).await;
433 buf.record(sample_event("b")).await;
434 let early = tokio::time::timeout(Duration::from_millis(50), notify.notified()).await;
435 assert!(
436 early.is_err(),
437 "size_trigger should not fire below threshold"
438 );
439
440 buf.record(sample_event("c")).await;
442 let fired = tokio::time::timeout(Duration::from_millis(100), notify.notified()).await;
443 assert!(
444 fired.is_ok(),
445 "size_trigger must fire when queue reaches threshold"
446 );
447 }
448
449 #[tokio::test]
450 async fn test_buffer_requeue_front() {
451 let buf = TelemetryBuffer::new(5);
452 buf.record(sample_event("new")).await;
453 buf.requeue_front(vec![sample_event("old1"), sample_event("old2")])
454 .await;
455 let drained = buf.drain(10).await;
456 assert_eq!(
457 drained.iter().map(|e| e.tool.as_str()).collect::<Vec<_>>(),
458 vec!["old1", "old2", "new"]
459 );
460 }
461
462 #[tokio::test]
463 async fn test_uploader_sends_bearer_header_and_payload() {
464 let server = MockServer::start_async().await;
465 let mock = server
466 .mock_async(|when, then| {
467 when.method(POST)
468 .path("/api/telemetry/tool-invocations")
469 .header("authorization", "Bearer my-token")
470 .body_includes(r#""tool":"get_issues""#);
471 then.status(202).body("");
472 })
473 .await;
474
475 let uploader = TelemetryUploader::new(
476 format!("{}/api/telemetry/tool-invocations", server.base_url()),
477 TelemetryAuth {
478 bearer_token: Some("my-token".into()),
479 },
480 )
481 .unwrap();
482
483 let batch = TelemetryBatch {
484 events: vec![sample_event("get_issues")],
485 };
486 uploader.upload(&batch).await.unwrap();
487 mock.assert_async().await;
488 }
489
490 #[tokio::test]
491 async fn test_uploader_reports_error_on_5xx() {
492 let server = MockServer::start_async().await;
493 server
494 .mock_async(|when, then| {
495 when.method(POST);
496 then.status(500).body("boom");
497 })
498 .await;
499
500 let uploader = TelemetryUploader::new(
501 format!("{}/tele", server.base_url()),
502 TelemetryAuth::default(),
503 )
504 .unwrap();
505
506 let err = uploader
507 .upload(&TelemetryBatch {
508 events: vec![sample_event("x")],
509 })
510 .await
511 .unwrap_err();
512 let msg = err.to_string();
513 assert!(msg.contains("500"));
514 assert!(msg.contains("boom"));
515 }
516
517 #[tokio::test]
518 async fn test_pipeline_flush_uploads_all_and_returns_count() {
519 let server = MockServer::start_async().await;
520 server
521 .mock_async(|when, then| {
522 when.method(POST);
523 then.status(200).body("");
524 })
525 .await;
526
527 let cfg = ProxyTelemetryConfig {
528 endpoint: Some(format!("{}/t", server.base_url())),
529 ..Default::default()
530 };
531
532 let mut pipeline = TelemetryPipeline::new(cfg);
533 pipeline.start(TelemetryAuth::default()).unwrap();
534 pipeline.buffer().record(sample_event("a")).await;
535 pipeline.buffer().record(sample_event("b")).await;
536
537 let n = pipeline.flush().await.unwrap();
538 assert_eq!(n, 2);
539 pipeline.shutdown().await;
540 }
541
542 #[tokio::test]
543 async fn test_pipeline_disabled_is_noop() {
544 let cfg = ProxyTelemetryConfig {
545 enabled: false,
546 ..Default::default()
547 };
548 let mut pipeline = TelemetryPipeline::new(cfg);
549 pipeline.start(TelemetryAuth::default()).unwrap();
550 pipeline.buffer().record(sample_event("x")).await;
551
552 let n = pipeline.flush().await.unwrap();
554 assert_eq!(n, 0);
555 pipeline.shutdown().await;
556 }
557
558 #[tokio::test]
559 async fn test_pipeline_without_endpoint_still_buffers_but_does_not_upload() {
560 let cfg = ProxyTelemetryConfig {
561 enabled: true,
562 endpoint: None,
563 ..Default::default()
564 };
565 let mut pipeline = TelemetryPipeline::new(cfg);
566 pipeline.start(TelemetryAuth::default()).unwrap();
567 pipeline.buffer().record(sample_event("x")).await;
568 assert_eq!(pipeline.buffer().len().await, 1);
569 pipeline.shutdown().await;
570 }
571
572 #[tokio::test]
573 async fn test_flush_requeues_on_failure() {
574 let server = MockServer::start_async().await;
575 server
576 .mock_async(|when, then| {
577 when.method(POST);
578 then.status(500).body("");
579 })
580 .await;
581
582 let cfg = ProxyTelemetryConfig {
583 endpoint: Some(format!("{}/t", server.base_url())),
584 ..Default::default()
585 };
586
587 let mut pipeline = TelemetryPipeline::new(cfg);
588 pipeline.start(TelemetryAuth::default()).unwrap();
589 pipeline.buffer().record(sample_event("a")).await;
590
591 assert!(pipeline.flush().await.is_err());
592 assert_eq!(pipeline.buffer().len().await, 1);
593 pipeline.shutdown().await;
594 }
595}