Skip to main content

atomcode_telemetry/sender/
mod.rs

1//! Background sender task: drains rolled segments and POSTs them.
2
3pub mod http;
4
5use crate::queue::Queue;
6use crate::runtime::Counters;
7use http::{HttpSender, SendError};
8use std::path::PathBuf;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13use tokio::time::sleep;
14use tracing::warn;
15
16pub struct SenderRuntime {
17    queue: Arc<Mutex<Queue>>,
18    http: HttpSender,
19    counters: Arc<Counters>,
20    health_path: PathBuf,
21}
22
23impl SenderRuntime {
24    pub fn new(
25        queue: Arc<Mutex<Queue>>,
26        http: HttpSender,
27        counters: Arc<Counters>,
28        health_path: PathBuf,
29    ) -> Self {
30        Self {
31            queue,
32            http,
33            counters,
34            health_path,
35        }
36    }
37
38    /// Process one pending segment (oldest). Returns `Ok(None)` if queue empty.
39    pub async fn flush_one(&self) -> Result<Option<PathBuf>, SendError> {
40        let (seg, dropped) = {
41            let q = self.queue.lock().await;
42            (
43                q.claim_oldest_segment().map_err(|_| SendError::Other)?,
44                q.dropped,
45            )
46        };
47        let seg = match seg {
48            Some(s) => s,
49            None => return Ok(None),
50        };
51
52        // Mirror queue's disk-eviction counter into our atomics (absolute value).
53        self.counters
54            .events_dropped_disk
55            .store(dropped, Ordering::Relaxed);
56
57        // Read segment size before deleting it.
58        let bytes = std::fs::metadata(&seg).map(|m| m.len()).unwrap_or(0);
59
60        match self.http.send_segment(&seg, dropped).await {
61            Ok(()) => {}
62            Err(SendError::BadRequest) => {
63                let q = self.queue.lock().await;
64                q.complete_claim(&seg).map_err(|_| SendError::Other)?;
65                return Err(SendError::BadRequest);
66            }
67            Err(e) => {
68                let q = self.queue.lock().await;
69                if let Err(restore_err) = q.restore_claim(&seg) {
70                    warn!(?restore_err, segment = %seg.display(), "telemetry segment restore failed");
71                }
72                return Err(e);
73            }
74        }
75
76        {
77            let q = self.queue.lock().await;
78            if let Err(e) = q.complete_claim(&seg) {
79                warn!(?e, "delete segment failed");
80            }
81        }
82
83        self.counters
84            .segments_posted
85            .fetch_add(1, Ordering::Relaxed);
86        self.counters
87            .bytes_sent
88            .fetch_add(bytes, Ordering::Relaxed);
89        let now_ms = std::time::SystemTime::now()
90            .duration_since(std::time::UNIX_EPOCH)
91            .map(|d| d.as_millis() as i64)
92            .unwrap_or(0);
93        self.counters
94            .last_post_unix_ms
95            .store(now_ms, Ordering::Relaxed);
96
97        tracing::info!(segment = %seg.display(), bytes, "telemetry segment posted");
98
99        // Persist health snapshot (best-effort).
100        self.persist_health();
101
102        Ok(Some(seg))
103    }
104
105    /// Backoff schedule per spec §6: 2s, 8s, 30s, 120s, 300s (capped).
106    pub fn backoff(attempt: u32) -> Duration {
107        match attempt {
108            0 => Duration::from_secs(2),
109            1 => Duration::from_secs(8),
110            2 => Duration::from_secs(30),
111            3 => Duration::from_secs(120),
112            _ => Duration::from_secs(300),
113        }
114    }
115
116    /// Drain-loop step. Caller owns the tick-or-shutdown select.
117    pub async fn drain_with_backoff(&self) {
118        let mut attempt = 0u32;
119        loop {
120            match self.flush_one().await {
121                Ok(None) => break,
122                Ok(Some(_)) => {
123                    attempt = 0;
124                }
125                Err(SendError::Unauthorized) => {
126                    warn!("telemetry unauthorized — holding for 1h");
127                    sleep(Duration::from_secs(3600)).await;
128                    attempt = 0;
129                }
130                Err(SendError::BadRequest) => {
131                    // Corrupt segment was already claimed and deleted by flush_one.
132                    warn!("telemetry 400 — dropped claimed segment");
133                    attempt = 0;
134                }
135                Err(SendError::RateLimited(Some(d))) => {
136                    sleep(d).await;
137                    attempt += 1;
138                }
139                Err(e) => {
140                    warn!(?e, "telemetry send error; backing off");
141                    sleep(Self::backoff(attempt)).await;
142                    attempt = attempt.saturating_add(1);
143                }
144            }
145        }
146    }
147
148    fn persist_health(&self) {
149        let snap = self.counters.snapshot();
150        if let Ok(json) = serde_json::to_string(&snap) {
151            if let Some(parent) = self.health_path.parent() {
152                let _ = std::fs::create_dir_all(parent);
153            }
154            let _ = std::fs::write(&self.health_path, json);
155        }
156    }
157}