1pub mod format;
10pub mod host_telemetry;
11mod queue;
12mod system_status;
13mod time_sync;
14mod uploader;
15
16pub use format::{AtlasResult, ResultBundle};
17pub use host_telemetry::{HostTelemetry, HostTelemetryKind};
18pub use queue::{QueueStats, QueuedResult, ResultQueue};
19pub use time_sync::TimeSyncTracker;
20pub use uploader::{ResultUploader, UploadStream, UploadTransport, UploaderConfig};
21
22use starla_common::MeasurementResult;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::Mutex;
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, info, warn};
28
29#[derive(Debug, Clone)]
31pub struct ResultHandlerConfig {
32 pub upload_interval: Duration,
34 pub max_result_age_secs: i64,
36 pub max_attempts: u32,
38 pub cleanup_interval: Duration,
40 pub max_queue_size: usize,
42}
43
44impl Default for ResultHandlerConfig {
45 fn default() -> Self {
46 Self {
47 upload_interval: Duration::from_secs(60),
48 max_result_age_secs: 3600, max_attempts: 5,
50 cleanup_interval: Duration::from_secs(300), max_queue_size: 10000,
52 }
53 }
54}
55
56pub struct ResultHandler {
58 queue: Arc<Mutex<ResultQueue>>,
59 uploader: Arc<Mutex<ResultUploader>>,
60 config: ResultHandlerConfig,
61 time_sync: Arc<TimeSyncTracker>,
62 session_id: Arc<Mutex<Option<String>>>,
63 host_telemetry: Arc<HostTelemetry>,
64 metrics: starla_metrics::MetricsRegistry,
65}
66
67impl ResultHandler {
68 pub fn new(
70 transport: Box<dyn UploadTransport>,
71 uploader_config: UploaderConfig,
72 config: ResultHandlerConfig,
73 metrics: starla_metrics::MetricsRegistry,
74 ) -> Self {
75 let queue = ResultQueue::new(config.max_queue_size);
76 let host_telemetry = HostTelemetry::new();
77 let uploader = ResultUploader::new(
78 transport,
79 uploader_config,
80 metrics.clone(),
81 Arc::clone(&host_telemetry),
82 );
83
84 Self {
85 queue: Arc::new(Mutex::new(queue)),
86 uploader: Arc::new(Mutex::new(uploader)),
87 config,
88 time_sync: Arc::new(TimeSyncTracker::new()),
89 session_id: Arc::new(Mutex::new(None)),
90 host_telemetry,
91 metrics,
92 }
93 }
94
95 pub fn host_telemetry(&self) -> Arc<HostTelemetry> {
99 Arc::clone(&self.host_telemetry)
100 }
101
102 pub async fn set_endpoint_path(&self, path: String) -> anyhow::Result<()> {
104 let mut uploader = self.uploader.lock().await;
105 uploader.set_endpoint_path(path)?;
106 debug!("Result upload endpoint path set");
107 Ok(())
108 }
109
110 pub async fn set_session_id(&self, session_id: String) {
112 let mut sid = self.session_id.lock().await;
113 *sid = Some(session_id);
114 debug!("Session ID set for upload footer");
115 }
116
117 pub async fn has_endpoint(&self) -> bool {
119 let uploader = self.uploader.lock().await;
120 uploader.has_endpoint()
121 }
122
123 pub fn mark_time_synced(&self) {
125 self.time_sync.mark_synced();
126 }
127
128 pub fn time_sync(&self) -> Arc<TimeSyncTracker> {
130 Arc::clone(&self.time_sync)
131 }
132
133 pub fn current_lts(&self) -> i64 {
135 self.time_sync.lts()
136 }
137
138 pub async fn submit(&self, result: MeasurementResult) {
140 let mut queue = self.queue.lock().await;
141 let before = queue.stats().count;
142 queue.enqueue(result);
143 let after = queue.stats().count;
144
145 if after <= before && before > 0 {
146 self.metrics.record_queue_drop();
147 }
148
149 self.metrics.update_queue_depth(after as i64);
150 debug!("Result enqueued, queue: {}", after);
151 }
152
153 pub async fn queue_stats(&self) -> QueueStats {
155 let queue = self.queue.lock().await;
156 queue.stats()
157 }
158
159 pub async fn upload_pending(&self) -> anyhow::Result<usize> {
161 if !self.has_endpoint().await {
162 debug!("No upload endpoint configured, skipping upload");
163 return Ok(0);
164 }
165
166 let results = {
169 let mut queue = self.queue.lock().await;
170 let res = queue.drain_all();
171 self.metrics.update_queue_depth(0);
172 res
173 };
174
175 if results.is_empty() {
176 return Ok(0);
177 }
178
179 let (uploadable, failed): (Vec<_>, Vec<_>) = results
180 .into_iter()
181 .partition(|r| r.attempts < self.config.max_attempts);
182
183 if !failed.is_empty() {
184 warn!(
185 "Dropping {} results that exceeded max attempts",
186 failed.len()
187 );
188 for _ in 0..failed.len() {
189 self.metrics.record_queue_drop();
190 }
191 }
192
193 if uploadable.is_empty() {
194 return Ok(0);
195 }
196
197 let count = uploadable.len();
198 info!("Uploading {} results", count);
199
200 let lts = self.time_sync.lts();
201 let session_id = self.session_id.lock().await;
202 let session_id_ref = session_id.as_deref();
203
204 let uploader = self.uploader.lock().await;
205 match uploader
206 .upload_batch(&uploadable, lts, session_id_ref)
207 .await
208 {
209 Ok(()) => {
210 info!("Successfully uploaded {} results", count);
211 self.metrics.record_upload_success();
212 Ok(count)
213 }
214 Err(e) => {
215 warn!("Upload failed: {}", e);
216 self.metrics.record_upload_failure();
217 let mut queue = self.queue.lock().await;
218 queue.requeue_failed(uploadable);
219 self.metrics.update_queue_depth(queue.stats().count as i64);
220 Err(e)
221 }
222 }
223 }
224
225 pub async fn run_cleanup(&self) {
227 let mut queue = self.queue.lock().await;
228 let removed_expired = queue.cleanup_expired(self.config.max_result_age_secs);
229 let removed_failed = queue.cleanup_failed(self.config.max_attempts);
230
231 let total_removed = removed_expired + removed_failed;
232 if total_removed > 0 {
233 for _ in 0..total_removed {
234 self.metrics.record_queue_drop();
235 }
236 self.metrics.update_queue_depth(queue.stats().count as i64);
237 }
238 }
239
240 pub async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
242 info!("Result handler starting");
243
244 let mut upload_interval = tokio::time::interval(self.config.upload_interval);
245 let mut cleanup_interval = tokio::time::interval(self.config.cleanup_interval);
246
247 let mut consecutive_failures = 0u32;
248 let mut backoff = Duration::from_secs(1);
249 let max_backoff = Duration::from_secs(300);
250
251 loop {
252 tokio::select! {
253 _ = cancel.cancelled() => {
254 debug!("Result handler shutting down");
255 break;
256 }
257
258 _ = upload_interval.tick() => {
259 match self.upload_pending().await {
260 Ok(count) if count > 0 => {
261 consecutive_failures = 0;
262 backoff = Duration::from_secs(1);
263 }
264 Ok(_) => {}
265 Err(e) => {
266 consecutive_failures += 1;
267 let err_msg = e.to_string();
268 if err_msg.contains("rate limited") {
269 let retry_secs = err_msg
271 .strip_prefix("rate limited ")
272 .and_then(|s| s.parse::<u64>().ok())
273 .unwrap_or(60);
274 backoff = Duration::from_secs(retry_secs).max(Duration::from_secs(60));
275 warn!("Rate limited, retrying in {}s", backoff.as_secs());
276 tokio::time::sleep(backoff).await;
277 } else {
278 warn!("Upload failed (attempt {}): {}", consecutive_failures, e);
279 if consecutive_failures > 3 {
280 debug!("Applying backoff: {:?}", backoff);
281 tokio::time::sleep(backoff).await;
282 backoff = std::cmp::min(backoff * 2, max_backoff);
283 }
284 }
285 }
286 }
287 }
288
289 _ = cleanup_interval.tick() => {
290 self.run_cleanup().await;
291 }
292 }
293 }
294
295 Ok(())
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use starla_common::{MeasurementData, MeasurementId, MeasurementType, ProbeId, Timestamp};
303 use std::net::IpAddr;
304
305 fn make_result(msm_id: u64) -> MeasurementResult {
306 MeasurementResult {
307 fw: 5120,
308 measurement_type: MeasurementType::Ping,
309 prb_id: ProbeId(12345),
310 msm_id: MeasurementId(msm_id),
311 timestamp: Timestamp::now(),
312 af: 4,
313 dst_addr: "8.8.8.8".parse::<IpAddr>().unwrap(),
314 dst_name: None,
315 src_addr: None,
316 proto: Some("ICMP".to_string()),
317 ttl: Some(64),
318 size: Some(32),
319 data: MeasurementData::Generic(serde_json::json!([{"rtt": 12.5}])),
320 }
321 }
322
323 struct NoopTransport;
324 impl UploadTransport for NoopTransport {
325 fn open(
326 &self,
327 ) -> std::pin::Pin<
328 Box<
329 dyn std::future::Future<Output = anyhow::Result<Box<dyn UploadStream>>> + Send + '_,
330 >,
331 > {
332 Box::pin(async { anyhow::bail!("no transport") })
333 }
334 }
335
336 #[tokio::test]
337 async fn test_result_handler_creation() {
338 let handler = ResultHandler::new(
339 Box::new(NoopTransport),
340 UploaderConfig::default(),
341 ResultHandlerConfig::default(),
342 starla_metrics::MetricsRegistry::new().unwrap(),
343 );
344
345 let stats = handler.queue_stats().await;
346 assert_eq!(stats.count, 0);
347 }
348
349 #[tokio::test]
350 async fn test_submit_and_stats() {
351 let handler = ResultHandler::new(
352 Box::new(NoopTransport),
353 UploaderConfig::default(),
354 ResultHandlerConfig::default(),
355 starla_metrics::MetricsRegistry::new().unwrap(),
356 );
357
358 handler.submit(make_result(1001)).await;
359
360 let stats = handler.queue_stats().await;
361 assert_eq!(stats.count, 1);
362 }
363}