Skip to main content

aura_agent/runtime/services/
local_health_observer.rs

1//! Runtime-owned local health observer.
2//!
3//! Derives smoothed local-only health snapshots for adaptive privacy policy.
4#![allow(dead_code)]
5
6use super::config_profiles::impl_service_config_profiles;
7use super::traits::{RuntimeService, RuntimeServiceContext, ServiceError, ServiceHealth};
8use async_trait::async_trait;
9use aura_core::service::{LocalHealthSnapshot, ProviderCandidate};
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13#[allow(dead_code, clippy::enum_variant_names)]
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15enum LocalHealthObserverCommand {
16    ObserveProviderSet,
17    ObserveRtt,
18    ObserveLoss,
19    ObserveTraffic,
20    ObserveChurn,
21    ObserveQueuePressure,
22    ObserveHoldOutcome,
23    ObserveSyncOpportunity,
24}
25
26#[derive(Debug, Clone)]
27pub struct LocalHealthObserverConfig {
28    pub ema_numerator: u32,
29    pub ema_denominator: u32,
30    pub queue_pressure_cap: u32,
31    /// Small EMA deltas below these thresholds are suppressed to avoid thrash.
32    pub rtt_hysteresis_ms: u32,
33    pub loss_hysteresis_bps: u32,
34    pub queue_pressure_hysteresis: u32,
35    /// Minimum interval between noisy smoothed updates unless the change is large.
36    pub min_smoothed_update_interval_ms: u64,
37}
38
39impl Default for LocalHealthObserverConfig {
40    fn default() -> Self {
41        Self {
42            ema_numerator: 1,
43            ema_denominator: 4,
44            queue_pressure_cap: 100,
45            rtt_hysteresis_ms: 10,
46            loss_hysteresis_bps: 50,
47            queue_pressure_hysteresis: 2,
48            min_smoothed_update_interval_ms: 100,
49        }
50    }
51}
52
53impl_service_config_profiles!(LocalHealthObserverConfig {
54    pub fn for_testing() -> Self {
55        Self {
56            ema_numerator: 1,
57            ema_denominator: 2,
58            queue_pressure_cap: 16,
59            rtt_hysteresis_ms: 5,
60            loss_hysteresis_bps: 10,
61            queue_pressure_hysteresis: 1,
62            min_smoothed_update_interval_ms: 25,
63        }
64    }
65});
66
67#[derive(Debug)]
68struct LocalHealthObserverState {
69    snapshot: Option<LocalHealthSnapshot>,
70    hold_successes: u32,
71    hold_failures: u32,
72    last_smoothed_update_ms: Option<u64>,
73    lifecycle: ServiceHealth,
74}
75
76impl Default for LocalHealthObserverState {
77    fn default() -> Self {
78        Self {
79            snapshot: None,
80            hold_successes: 0,
81            hold_failures: 0,
82            last_smoothed_update_ms: None,
83            lifecycle: ServiceHealth::NotStarted,
84        }
85    }
86}
87
88#[aura_macros::actor_owned(
89    owner = "local_health_observer",
90    domain = "adaptive_privacy_health",
91    gate = "local_health_command_ingress",
92    command = LocalHealthObserverCommand,
93    capacity = 64,
94    category = "actor_owned"
95)]
96#[derive(Default)]
97pub struct LocalHealthObserverService {
98    config: LocalHealthObserverConfig,
99    state: Arc<RwLock<LocalHealthObserverState>>,
100}
101
102impl LocalHealthObserverService {
103    pub fn new(config: LocalHealthObserverConfig) -> Self {
104        Self {
105            config,
106            state: Arc::new(RwLock::new(LocalHealthObserverState::default())),
107        }
108    }
109
110    pub async fn snapshot(&self) -> LocalHealthSnapshot {
111        self.state
112            .read()
113            .await
114            .snapshot
115            .clone()
116            .unwrap_or(LocalHealthSnapshot {
117                generated_at_ms: 0,
118                reachable_provider_count: 0,
119                ema_rtt_ms: 0,
120                ema_loss_bps: 0,
121                traffic_volume_bytes: 0,
122                sync_blended_retrieval_bytes: 0,
123                accountability_reply_bytes: 0,
124                churn_events: 0,
125                observed_route_diversity: 0,
126                queue_pressure: 0,
127                hold_success_bps: 10_000,
128                sync_opportunity_count: 0,
129            })
130    }
131
132    pub async fn observe_provider_set(
133        &self,
134        candidates: &[ProviderCandidate],
135        route_diversity: u8,
136        now_ms: u64,
137    ) -> LocalHealthSnapshot {
138        let mut state = self.state.write().await;
139        let mut snapshot = state.snapshot.clone().unwrap_or(LocalHealthSnapshot {
140            generated_at_ms: now_ms,
141            reachable_provider_count: 0,
142            ema_rtt_ms: 0,
143            ema_loss_bps: 0,
144            traffic_volume_bytes: 0,
145            sync_blended_retrieval_bytes: 0,
146            accountability_reply_bytes: 0,
147            churn_events: 0,
148            observed_route_diversity: route_diversity,
149            queue_pressure: 0,
150            hold_success_bps: 10_000,
151            sync_opportunity_count: 0,
152        });
153        snapshot.generated_at_ms = now_ms;
154        snapshot.reachable_provider_count = candidates
155            .iter()
156            .filter(|candidate| candidate.reachable)
157            .count() as u32;
158        snapshot.observed_route_diversity = route_diversity;
159        state.snapshot = Some(snapshot.clone());
160        snapshot
161    }
162
163    pub async fn observe_rtt_ms(&self, rtt_ms: u32, now_ms: u64) -> LocalHealthSnapshot {
164        self.update_snapshot(now_ms, |snapshot, config, _state| {
165            snapshot.ema_rtt_ms = apply_smoothed_value(
166                snapshot.ema_rtt_ms,
167                rtt_ms,
168                config.rtt_hysteresis_ms,
169                now_ms,
170                config,
171                _state,
172            );
173        })
174        .await
175    }
176
177    pub async fn observe_loss_bps(&self, loss_bps: u32, now_ms: u64) -> LocalHealthSnapshot {
178        self.update_snapshot(now_ms, |snapshot, config, _state| {
179            snapshot.ema_loss_bps = apply_smoothed_value(
180                snapshot.ema_loss_bps,
181                loss_bps,
182                config.loss_hysteresis_bps,
183                now_ms,
184                config,
185                _state,
186            );
187        })
188        .await
189    }
190
191    pub async fn observe_traffic_volume(
192        &self,
193        traffic_bytes: u64,
194        now_ms: u64,
195    ) -> LocalHealthSnapshot {
196        self.update_snapshot(now_ms, |snapshot, _config, _state| {
197            snapshot.traffic_volume_bytes =
198                snapshot.traffic_volume_bytes.saturating_add(traffic_bytes);
199        })
200        .await
201    }
202
203    pub async fn observe_sync_blended_retrieval_volume(
204        &self,
205        retrieval_bytes: u64,
206        now_ms: u64,
207    ) -> LocalHealthSnapshot {
208        self.update_snapshot(now_ms, |snapshot, _config, _state| {
209            snapshot.sync_blended_retrieval_bytes = snapshot
210                .sync_blended_retrieval_bytes
211                .saturating_add(retrieval_bytes);
212        })
213        .await
214    }
215
216    pub async fn observe_accountability_reply_volume(
217        &self,
218        reply_bytes: u64,
219        now_ms: u64,
220    ) -> LocalHealthSnapshot {
221        self.update_snapshot(now_ms, |snapshot, _config, _state| {
222            snapshot.accountability_reply_bytes = snapshot
223                .accountability_reply_bytes
224                .saturating_add(reply_bytes);
225        })
226        .await
227    }
228
229    pub async fn observe_churn(&self, churn_events: u32, now_ms: u64) -> LocalHealthSnapshot {
230        self.update_snapshot(now_ms, |snapshot, _config, _state| {
231            snapshot.churn_events = snapshot.churn_events.saturating_add(churn_events);
232        })
233        .await
234    }
235
236    pub async fn observe_queue_pressure(&self, pressure: u32, now_ms: u64) -> LocalHealthSnapshot {
237        self.update_snapshot(now_ms, |snapshot, config, _state| {
238            snapshot.queue_pressure = apply_smoothed_value(
239                snapshot.queue_pressure,
240                pressure.min(config.queue_pressure_cap),
241                config.queue_pressure_hysteresis,
242                now_ms,
243                config,
244                _state,
245            );
246        })
247        .await
248    }
249
250    pub async fn observe_hold_outcome(&self, success: bool, now_ms: u64) -> LocalHealthSnapshot {
251        self.update_snapshot(now_ms, |snapshot, _config, state| {
252            if success {
253                state.hold_successes = state.hold_successes.saturating_add(1);
254            } else {
255                state.hold_failures = state.hold_failures.saturating_add(1);
256            }
257            let total = state
258                .hold_successes
259                .saturating_add(state.hold_failures)
260                .max(1);
261            snapshot.hold_success_bps = (state.hold_successes.saturating_mul(10_000)) / total;
262        })
263        .await
264    }
265
266    pub async fn observe_sync_opportunity(&self, now_ms: u64) -> LocalHealthSnapshot {
267        self.update_snapshot(now_ms, |snapshot, _config, _state| {
268            snapshot.sync_opportunity_count = snapshot.sync_opportunity_count.saturating_add(1);
269        })
270        .await
271    }
272
273    async fn update_snapshot<F>(&self, now_ms: u64, mut update: F) -> LocalHealthSnapshot
274    where
275        F: FnMut(
276            &mut LocalHealthSnapshot,
277            &LocalHealthObserverConfig,
278            &mut LocalHealthObserverState,
279        ),
280    {
281        let mut state = self.state.write().await;
282        let mut snapshot = state.snapshot.clone().unwrap_or(LocalHealthSnapshot {
283            generated_at_ms: now_ms,
284            reachable_provider_count: 0,
285            ema_rtt_ms: 0,
286            ema_loss_bps: 0,
287            traffic_volume_bytes: 0,
288            sync_blended_retrieval_bytes: 0,
289            accountability_reply_bytes: 0,
290            churn_events: 0,
291            observed_route_diversity: 0,
292            queue_pressure: 0,
293            hold_success_bps: 10_000,
294            sync_opportunity_count: 0,
295        });
296        snapshot.generated_at_ms = now_ms;
297        update(&mut snapshot, &self.config, &mut state);
298        state.snapshot = Some(snapshot.clone());
299        snapshot
300    }
301}
302
303impl Clone for LocalHealthObserverService {
304    fn clone(&self) -> Self {
305        Self {
306            config: self.config.clone(),
307            state: self.state.clone(),
308        }
309    }
310}
311
312fn ema(previous: u32, sample: u32, config: &LocalHealthObserverConfig) -> u32 {
313    if previous == 0 {
314        return sample;
315    }
316    let numerator = previous
317        .saturating_mul(config.ema_denominator.saturating_sub(config.ema_numerator))
318        .saturating_add(sample.saturating_mul(config.ema_numerator));
319    numerator / config.ema_denominator.max(1)
320}
321
322fn apply_smoothed_value(
323    previous: u32,
324    sample: u32,
325    hysteresis: u32,
326    now_ms: u64,
327    config: &LocalHealthObserverConfig,
328    state: &mut LocalHealthObserverState,
329) -> u32 {
330    let candidate = ema(previous, sample, config);
331    let within_interval = state
332        .last_smoothed_update_ms
333        .map(|last| now_ms.saturating_sub(last) < config.min_smoothed_update_interval_ms)
334        .unwrap_or(false);
335    if within_interval && previous.abs_diff(candidate) < hysteresis {
336        return previous;
337    }
338    state.last_smoothed_update_ms = Some(now_ms);
339    candidate
340}
341
342#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
343#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
344impl RuntimeService for LocalHealthObserverService {
345    fn name(&self) -> &'static str {
346        "local_health_observer"
347    }
348
349    fn dependencies(&self) -> &[&'static str] {
350        &["rendezvous_manager", "move_manager", "hold_manager"]
351    }
352
353    async fn start(&self, _ctx: &RuntimeServiceContext) -> Result<(), ServiceError> {
354        self.state.write().await.lifecycle = ServiceHealth::Healthy;
355        Ok(())
356    }
357
358    async fn stop(&self) -> Result<(), ServiceError> {
359        self.state.write().await.lifecycle = ServiceHealth::Stopped;
360        Ok(())
361    }
362
363    async fn health(&self) -> ServiceHealth {
364        self.state.read().await.lifecycle.clone()
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use aura_core::service::{
372        LinkEndpoint, LinkProtocol, ProviderCandidate, ProviderEvidence, ServiceFamily,
373    };
374    use aura_core::types::identifiers::AuthorityId;
375
376    fn authority(seed: u8) -> AuthorityId {
377        AuthorityId::new_from_entropy([seed; 32])
378    }
379
380    fn candidate(seed: u8, reachable: bool) -> ProviderCandidate {
381        ProviderCandidate {
382            authority_id: authority(seed),
383            device_id: None,
384            family: ServiceFamily::Move,
385            evidence: vec![ProviderEvidence::Neighborhood],
386            link_endpoints: vec![LinkEndpoint::direct(
387                LinkProtocol::Tcp,
388                format!("127.0.0.1:{}", 8000 + seed as u16),
389            )],
390            route_layer_public_key: Some([seed; 32]),
391            reachable,
392        }
393    }
394
395    #[tokio::test]
396    async fn local_health_observer_smooths_rtt_and_tracks_local_signals() {
397        let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
398        observer
399            .observe_provider_set(&[candidate(1, true), candidate(2, false)], 2, 10)
400            .await;
401        let first = observer.observe_rtt_ms(100, 11).await;
402        let second = observer.observe_rtt_ms(200, 12).await;
403        assert_eq!(first.reachable_provider_count, 1);
404        assert_eq!(second.observed_route_diversity, 2);
405        assert!(second.ema_rtt_ms > 100);
406        assert!(second.ema_rtt_ms < 200);
407    }
408
409    #[tokio::test]
410    async fn local_health_observer_tracks_hold_success_ratio_and_sync_opportunities() {
411        let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
412        observer.observe_hold_outcome(true, 10).await;
413        observer.observe_hold_outcome(false, 11).await;
414        let snapshot = observer.observe_sync_opportunity(12).await;
415        assert_eq!(snapshot.hold_success_bps, 5000);
416        assert_eq!(snapshot.sync_opportunity_count, 1);
417    }
418
419    #[tokio::test]
420    async fn local_health_observer_tracks_loss_traffic_churn_and_queue_pressure() {
421        let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
422        observer.observe_loss_bps(120, 10).await;
423        observer.observe_traffic_volume(2048, 11).await;
424        observer
425            .observe_sync_blended_retrieval_volume(512, 12)
426            .await;
427        observer.observe_accountability_reply_volume(256, 13).await;
428        observer.observe_churn(2, 14).await;
429        let snapshot = observer.observe_queue_pressure(99, 15).await;
430        assert_eq!(snapshot.traffic_volume_bytes, 2048);
431        assert_eq!(snapshot.sync_blended_retrieval_bytes, 512);
432        assert_eq!(snapshot.accountability_reply_bytes, 256);
433        assert_eq!(snapshot.churn_events, 2);
434        assert_eq!(snapshot.ema_loss_bps, 120);
435        assert!(snapshot.queue_pressure <= 16);
436    }
437
438    #[tokio::test]
439    async fn local_health_observer_applies_hysteresis_and_rate_limit_to_smoothed_updates() {
440        let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
441        let first = observer.observe_rtt_ms(100, 10).await;
442        let second = observer.observe_rtt_ms(102, 20).await;
443        let third = observer.observe_rtt_ms(140, 40).await;
444        assert_eq!(first.ema_rtt_ms, 100);
445        assert_eq!(second.ema_rtt_ms, 100);
446        assert!(third.ema_rtt_ms > 100);
447    }
448}