atomcode_telemetry/sender/
mod.rs1pub mod http;
4
5use crate::queue::Queue;
6use crate::runtime::Counters;
7use http::{HttpSender, SendError};
8use std::path::PathBuf;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13use tokio::time::sleep;
14use tracing::warn;
15
16pub struct SenderRuntime {
17 queue: Arc<Mutex<Queue>>,
18 http: HttpSender,
19 counters: Arc<Counters>,
20 health_path: PathBuf,
21}
22
23impl SenderRuntime {
24 pub fn new(
25 queue: Arc<Mutex<Queue>>,
26 http: HttpSender,
27 counters: Arc<Counters>,
28 health_path: PathBuf,
29 ) -> Self {
30 Self {
31 queue,
32 http,
33 counters,
34 health_path,
35 }
36 }
37
38 pub async fn flush_one(&self) -> Result<Option<PathBuf>, SendError> {
40 let (seg, dropped) = {
41 let q = self.queue.lock().await;
42 (
43 q.claim_oldest_segment().map_err(|_| SendError::Other)?,
44 q.dropped,
45 )
46 };
47 let seg = match seg {
48 Some(s) => s,
49 None => return Ok(None),
50 };
51
52 self.counters
54 .events_dropped_disk
55 .store(dropped, Ordering::Relaxed);
56
57 let bytes = std::fs::metadata(&seg).map(|m| m.len()).unwrap_or(0);
59
60 match self.http.send_segment(&seg, dropped).await {
61 Ok(()) => {}
62 Err(SendError::BadRequest) => {
63 let q = self.queue.lock().await;
64 q.complete_claim(&seg).map_err(|_| SendError::Other)?;
65 return Err(SendError::BadRequest);
66 }
67 Err(e) => {
68 let q = self.queue.lock().await;
69 if let Err(restore_err) = q.restore_claim(&seg) {
70 warn!(?restore_err, segment = %seg.display(), "telemetry segment restore failed");
71 }
72 return Err(e);
73 }
74 }
75
76 {
77 let q = self.queue.lock().await;
78 if let Err(e) = q.complete_claim(&seg) {
79 warn!(?e, "delete segment failed");
80 }
81 }
82
83 self.counters
84 .segments_posted
85 .fetch_add(1, Ordering::Relaxed);
86 self.counters
87 .bytes_sent
88 .fetch_add(bytes, Ordering::Relaxed);
89 let now_ms = std::time::SystemTime::now()
90 .duration_since(std::time::UNIX_EPOCH)
91 .map(|d| d.as_millis() as i64)
92 .unwrap_or(0);
93 self.counters
94 .last_post_unix_ms
95 .store(now_ms, Ordering::Relaxed);
96
97 tracing::info!(segment = %seg.display(), bytes, "telemetry segment posted");
98
99 self.persist_health();
101
102 Ok(Some(seg))
103 }
104
105 pub fn backoff(attempt: u32) -> Duration {
107 match attempt {
108 0 => Duration::from_secs(2),
109 1 => Duration::from_secs(8),
110 2 => Duration::from_secs(30),
111 3 => Duration::from_secs(120),
112 _ => Duration::from_secs(300),
113 }
114 }
115
116 pub async fn drain_with_backoff(&self) {
118 let mut attempt = 0u32;
119 loop {
120 match self.flush_one().await {
121 Ok(None) => break,
122 Ok(Some(_)) => {
123 attempt = 0;
124 }
125 Err(SendError::Unauthorized) => {
126 warn!("telemetry unauthorized — holding for 1h");
127 sleep(Duration::from_secs(3600)).await;
128 attempt = 0;
129 }
130 Err(SendError::BadRequest) => {
131 warn!("telemetry 400 — dropped claimed segment");
133 attempt = 0;
134 }
135 Err(SendError::RateLimited(Some(d))) => {
136 sleep(d).await;
137 attempt += 1;
138 }
139 Err(e) => {
140 warn!(?e, "telemetry send error; backing off");
141 sleep(Self::backoff(attempt)).await;
142 attempt = attempt.saturating_add(1);
143 }
144 }
145 }
146 }
147
148 fn persist_health(&self) {
149 let snap = self.counters.snapshot();
150 if let Ok(json) = serde_json::to_string(&snap) {
151 if let Some(parent) = self.health_path.parent() {
152 let _ = std::fs::create_dir_all(parent);
153 }
154 let _ = std::fs::write(&self.health_path, json);
155 }
156 }
157}