Skip to main content

starla_results/
lib.rs

1//! Results management and upload for RIPE Atlas probe
2//!
3//! This crate handles:
4//! - Queueing measurement results for upload (in-memory queue)
5//! - Batching multiple results for efficient upload
6//! - Retry logic with exponential backoff
7//! - RIPE Atlas result format wrapping
8
9pub mod format;
10pub mod host_telemetry;
11mod queue;
12mod system_status;
13mod time_sync;
14mod uploader;
15
16pub use format::{AtlasResult, ResultBundle};
17pub use host_telemetry::{HostTelemetry, HostTelemetryKind};
18pub use queue::{QueueStats, QueuedResult, ResultQueue};
19pub use time_sync::TimeSyncTracker;
20pub use uploader::{ResultUploader, UploadStream, UploadTransport, UploaderConfig};
21
22use starla_common::MeasurementResult;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::Mutex;
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, info, warn};
28
29/// Configuration for the result handler
30#[derive(Debug, Clone)]
31pub struct ResultHandlerConfig {
32    /// Upload interval
33    pub upload_interval: Duration,
34    /// Maximum result age before dropping (seconds)
35    pub max_result_age_secs: i64,
36    /// Maximum retry attempts per result
37    pub max_attempts: u32,
38    /// Cleanup interval for expired/failed results
39    pub cleanup_interval: Duration,
40    /// Maximum number of results in queue
41    pub max_queue_size: usize,
42}
43
44impl Default for ResultHandlerConfig {
45    fn default() -> Self {
46        Self {
47            upload_interval: Duration::from_secs(60),
48            max_result_age_secs: 3600, // 1 hour
49            max_attempts: 5,
50            cleanup_interval: Duration::from_secs(300), // 5 minutes
51            max_queue_size: 10000,
52        }
53    }
54}
55
56/// Main result handler that coordinates queuing and uploading
57pub struct ResultHandler {
58    queue: Arc<Mutex<ResultQueue>>,
59    uploader: Arc<Mutex<ResultUploader>>,
60    config: ResultHandlerConfig,
61    time_sync: Arc<TimeSyncTracker>,
62    session_id: Arc<Mutex<Option<String>>>,
63    host_telemetry: Arc<HostTelemetry>,
64    metrics: starla_metrics::MetricsRegistry,
65}
66
67impl ResultHandler {
68    /// Create a new result handler
69    pub fn new(
70        transport: Box<dyn UploadTransport>,
71        uploader_config: UploaderConfig,
72        config: ResultHandlerConfig,
73        metrics: starla_metrics::MetricsRegistry,
74    ) -> Self {
75        let queue = ResultQueue::new(config.max_queue_size);
76        let host_telemetry = HostTelemetry::new();
77        let uploader = ResultUploader::new(
78            transport,
79            uploader_config,
80            metrics.clone(),
81            Arc::clone(&host_telemetry),
82        );
83
84        Self {
85            queue: Arc::new(Mutex::new(queue)),
86            uploader: Arc::new(Mutex::new(uploader)),
87            config,
88            time_sync: Arc::new(TimeSyncTracker::new()),
89            session_id: Arc::new(Mutex::new(None)),
90            host_telemetry,
91            metrics,
92        }
93    }
94
95    /// Get a handle to the host-telemetry registry.
96    /// Returned `Arc` is shared with the uploader; callers schedule reports
97    /// via [`HostTelemetry::schedule_buddyinfo`] etc.
98    pub fn host_telemetry(&self) -> Arc<HostTelemetry> {
99        Arc::clone(&self.host_telemetry)
100    }
101
102    /// Set the upload endpoint path (call after controller connection)
103    pub async fn set_endpoint_path(&self, path: String) -> anyhow::Result<()> {
104        let mut uploader = self.uploader.lock().await;
105        uploader.set_endpoint_path(path)?;
106        debug!("Result upload endpoint path set");
107        Ok(())
108    }
109
110    /// Set the session ID (for upload body footer)
111    pub async fn set_session_id(&self, session_id: String) {
112        let mut sid = self.session_id.lock().await;
113        *sid = Some(session_id);
114        debug!("Session ID set for upload footer");
115    }
116
117    /// Check if an endpoint is configured
118    pub async fn has_endpoint(&self) -> bool {
119        let uploader = self.uploader.lock().await;
120        uploader.has_endpoint()
121    }
122
123    /// Mark time as synchronized
124    pub fn mark_time_synced(&self) {
125        self.time_sync.mark_synced();
126    }
127
128    /// Get the time sync tracker
129    pub fn time_sync(&self) -> Arc<TimeSyncTracker> {
130        Arc::clone(&self.time_sync)
131    }
132
133    /// Get current lts value
134    pub fn current_lts(&self) -> i64 {
135        self.time_sync.lts()
136    }
137
138    /// Submit a result for upload
139    pub async fn submit(&self, result: MeasurementResult) {
140        let mut queue = self.queue.lock().await;
141        let before = queue.stats().count;
142        queue.enqueue(result);
143        let after = queue.stats().count;
144
145        if after <= before && before > 0 {
146            self.metrics.record_queue_drop();
147        }
148
149        self.metrics.update_queue_depth(after as i64);
150        debug!("Result enqueued, queue: {}", after);
151    }
152
153    /// Get current queue statistics
154    pub async fn queue_stats(&self) -> QueueStats {
155        let queue = self.queue.lock().await;
156        queue.stats()
157    }
158
159    /// Try to upload pending results
160    pub async fn upload_pending(&self) -> anyhow::Result<usize> {
161        if !self.has_endpoint().await {
162            debug!("No upload endpoint configured, skipping upload");
163            return Ok(0);
164        }
165
166        // Drain all available results: upload everything each cycle,
167        // matching the official probe's httppost behavior
168        let results = {
169            let mut queue = self.queue.lock().await;
170            let res = queue.drain_all();
171            self.metrics.update_queue_depth(0);
172            res
173        };
174
175        if results.is_empty() {
176            return Ok(0);
177        }
178
179        let (uploadable, failed): (Vec<_>, Vec<_>) = results
180            .into_iter()
181            .partition(|r| r.attempts < self.config.max_attempts);
182
183        if !failed.is_empty() {
184            warn!(
185                "Dropping {} results that exceeded max attempts",
186                failed.len()
187            );
188            for _ in 0..failed.len() {
189                self.metrics.record_queue_drop();
190            }
191        }
192
193        if uploadable.is_empty() {
194            return Ok(0);
195        }
196
197        let count = uploadable.len();
198        info!("Uploading {} results", count);
199
200        let lts = self.time_sync.lts();
201        let session_id = self.session_id.lock().await;
202        let session_id_ref = session_id.as_deref();
203
204        let uploader = self.uploader.lock().await;
205        match uploader
206            .upload_batch(&uploadable, lts, session_id_ref)
207            .await
208        {
209            Ok(()) => {
210                info!("Successfully uploaded {} results", count);
211                self.metrics.record_upload_success();
212                Ok(count)
213            }
214            Err(e) => {
215                warn!("Upload failed: {}", e);
216                self.metrics.record_upload_failure();
217                let mut queue = self.queue.lock().await;
218                queue.requeue_failed(uploadable);
219                self.metrics.update_queue_depth(queue.stats().count as i64);
220                Err(e)
221            }
222        }
223    }
224
225    /// Run cleanup to remove expired and failed results
226    pub async fn run_cleanup(&self) {
227        let mut queue = self.queue.lock().await;
228        let removed_expired = queue.cleanup_expired(self.config.max_result_age_secs);
229        let removed_failed = queue.cleanup_failed(self.config.max_attempts);
230
231        let total_removed = removed_expired + removed_failed;
232        if total_removed > 0 {
233            for _ in 0..total_removed {
234                self.metrics.record_queue_drop();
235            }
236            self.metrics.update_queue_depth(queue.stats().count as i64);
237        }
238    }
239
240    /// Run the upload loop
241    pub async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
242        info!("Result handler starting");
243
244        let mut upload_interval = tokio::time::interval(self.config.upload_interval);
245        let mut cleanup_interval = tokio::time::interval(self.config.cleanup_interval);
246
247        let mut consecutive_failures = 0u32;
248        let mut backoff = Duration::from_secs(1);
249        let max_backoff = Duration::from_secs(300);
250
251        loop {
252            tokio::select! {
253                _ = cancel.cancelled() => {
254                    debug!("Result handler shutting down");
255                    break;
256                }
257
258                _ = upload_interval.tick() => {
259                    match self.upload_pending().await {
260                        Ok(count) if count > 0 => {
261                            consecutive_failures = 0;
262                            backoff = Duration::from_secs(1);
263                        }
264                        Ok(_) => {}
265                        Err(e) => {
266                            consecutive_failures += 1;
267                            let err_msg = e.to_string();
268                            if err_msg.contains("rate limited") {
269                                // Parse "rate limited N" for Retry-After seconds
270                                let retry_secs = err_msg
271                                    .strip_prefix("rate limited ")
272                                    .and_then(|s| s.parse::<u64>().ok())
273                                    .unwrap_or(60);
274                                backoff = Duration::from_secs(retry_secs).max(Duration::from_secs(60));
275                                warn!("Rate limited, retrying in {}s", backoff.as_secs());
276                                tokio::time::sleep(backoff).await;
277                            } else {
278                                warn!("Upload failed (attempt {}): {}", consecutive_failures, e);
279                                if consecutive_failures > 3 {
280                                    debug!("Applying backoff: {:?}", backoff);
281                                    tokio::time::sleep(backoff).await;
282                                    backoff = std::cmp::min(backoff * 2, max_backoff);
283                                }
284                            }
285                        }
286                    }
287                }
288
289                _ = cleanup_interval.tick() => {
290                    self.run_cleanup().await;
291                }
292            }
293        }
294
295        Ok(())
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use starla_common::{MeasurementData, MeasurementId, MeasurementType, ProbeId, Timestamp};
303    use std::net::IpAddr;
304
305    fn make_result(msm_id: u64) -> MeasurementResult {
306        MeasurementResult {
307            fw: 5120,
308            measurement_type: MeasurementType::Ping,
309            prb_id: ProbeId(12345),
310            msm_id: MeasurementId(msm_id),
311            timestamp: Timestamp::now(),
312            af: 4,
313            dst_addr: "8.8.8.8".parse::<IpAddr>().unwrap(),
314            dst_name: None,
315            src_addr: None,
316            proto: Some("ICMP".to_string()),
317            ttl: Some(64),
318            size: Some(32),
319            data: MeasurementData::Generic(serde_json::json!([{"rtt": 12.5}])),
320        }
321    }
322
323    struct NoopTransport;
324    impl UploadTransport for NoopTransport {
325        fn open(
326            &self,
327        ) -> std::pin::Pin<
328            Box<
329                dyn std::future::Future<Output = anyhow::Result<Box<dyn UploadStream>>> + Send + '_,
330            >,
331        > {
332            Box::pin(async { anyhow::bail!("no transport") })
333        }
334    }
335
336    #[tokio::test]
337    async fn test_result_handler_creation() {
338        let handler = ResultHandler::new(
339            Box::new(NoopTransport),
340            UploaderConfig::default(),
341            ResultHandlerConfig::default(),
342            starla_metrics::MetricsRegistry::new().unwrap(),
343        );
344
345        let stats = handler.queue_stats().await;
346        assert_eq!(stats.count, 0);
347    }
348
349    #[tokio::test]
350    async fn test_submit_and_stats() {
351        let handler = ResultHandler::new(
352            Box::new(NoopTransport),
353            UploaderConfig::default(),
354            ResultHandlerConfig::default(),
355            starla_metrics::MetricsRegistry::new().unwrap(),
356        );
357
358        handler.submit(make_result(1001)).await;
359
360        let stats = handler.queue_stats().await;
361        assert_eq!(stats.count, 1);
362    }
363}