1#![allow(dead_code)]
5
6use super::config_profiles::impl_service_config_profiles;
7use super::traits::{RuntimeService, RuntimeServiceContext, ServiceError, ServiceHealth};
8use async_trait::async_trait;
9use aura_core::service::{LocalHealthSnapshot, ProviderCandidate};
10use std::sync::Arc;
11use tokio::sync::RwLock;
12
13#[allow(dead_code, clippy::enum_variant_names)]
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15enum LocalHealthObserverCommand {
16 ObserveProviderSet,
17 ObserveRtt,
18 ObserveLoss,
19 ObserveTraffic,
20 ObserveChurn,
21 ObserveQueuePressure,
22 ObserveHoldOutcome,
23 ObserveSyncOpportunity,
24}
25
26#[derive(Debug, Clone)]
27pub struct LocalHealthObserverConfig {
28 pub ema_numerator: u32,
29 pub ema_denominator: u32,
30 pub queue_pressure_cap: u32,
31 pub rtt_hysteresis_ms: u32,
33 pub loss_hysteresis_bps: u32,
34 pub queue_pressure_hysteresis: u32,
35 pub min_smoothed_update_interval_ms: u64,
37}
38
39impl Default for LocalHealthObserverConfig {
40 fn default() -> Self {
41 Self {
42 ema_numerator: 1,
43 ema_denominator: 4,
44 queue_pressure_cap: 100,
45 rtt_hysteresis_ms: 10,
46 loss_hysteresis_bps: 50,
47 queue_pressure_hysteresis: 2,
48 min_smoothed_update_interval_ms: 100,
49 }
50 }
51}
52
53impl_service_config_profiles!(LocalHealthObserverConfig {
54 pub fn for_testing() -> Self {
55 Self {
56 ema_numerator: 1,
57 ema_denominator: 2,
58 queue_pressure_cap: 16,
59 rtt_hysteresis_ms: 5,
60 loss_hysteresis_bps: 10,
61 queue_pressure_hysteresis: 1,
62 min_smoothed_update_interval_ms: 25,
63 }
64 }
65});
66
67#[derive(Debug)]
68struct LocalHealthObserverState {
69 snapshot: Option<LocalHealthSnapshot>,
70 hold_successes: u32,
71 hold_failures: u32,
72 last_smoothed_update_ms: Option<u64>,
73 lifecycle: ServiceHealth,
74}
75
76impl Default for LocalHealthObserverState {
77 fn default() -> Self {
78 Self {
79 snapshot: None,
80 hold_successes: 0,
81 hold_failures: 0,
82 last_smoothed_update_ms: None,
83 lifecycle: ServiceHealth::NotStarted,
84 }
85 }
86}
87
88#[aura_macros::actor_owned(
89 owner = "local_health_observer",
90 domain = "adaptive_privacy_health",
91 gate = "local_health_command_ingress",
92 command = LocalHealthObserverCommand,
93 capacity = 64,
94 category = "actor_owned"
95)]
96#[derive(Default)]
97pub struct LocalHealthObserverService {
98 config: LocalHealthObserverConfig,
99 state: Arc<RwLock<LocalHealthObserverState>>,
100}
101
102impl LocalHealthObserverService {
103 pub fn new(config: LocalHealthObserverConfig) -> Self {
104 Self {
105 config,
106 state: Arc::new(RwLock::new(LocalHealthObserverState::default())),
107 }
108 }
109
110 pub async fn snapshot(&self) -> LocalHealthSnapshot {
111 self.state
112 .read()
113 .await
114 .snapshot
115 .clone()
116 .unwrap_or(LocalHealthSnapshot {
117 generated_at_ms: 0,
118 reachable_provider_count: 0,
119 ema_rtt_ms: 0,
120 ema_loss_bps: 0,
121 traffic_volume_bytes: 0,
122 sync_blended_retrieval_bytes: 0,
123 accountability_reply_bytes: 0,
124 churn_events: 0,
125 observed_route_diversity: 0,
126 queue_pressure: 0,
127 hold_success_bps: 10_000,
128 sync_opportunity_count: 0,
129 })
130 }
131
132 pub async fn observe_provider_set(
133 &self,
134 candidates: &[ProviderCandidate],
135 route_diversity: u8,
136 now_ms: u64,
137 ) -> LocalHealthSnapshot {
138 let mut state = self.state.write().await;
139 let mut snapshot = state.snapshot.clone().unwrap_or(LocalHealthSnapshot {
140 generated_at_ms: now_ms,
141 reachable_provider_count: 0,
142 ema_rtt_ms: 0,
143 ema_loss_bps: 0,
144 traffic_volume_bytes: 0,
145 sync_blended_retrieval_bytes: 0,
146 accountability_reply_bytes: 0,
147 churn_events: 0,
148 observed_route_diversity: route_diversity,
149 queue_pressure: 0,
150 hold_success_bps: 10_000,
151 sync_opportunity_count: 0,
152 });
153 snapshot.generated_at_ms = now_ms;
154 snapshot.reachable_provider_count = candidates
155 .iter()
156 .filter(|candidate| candidate.reachable)
157 .count() as u32;
158 snapshot.observed_route_diversity = route_diversity;
159 state.snapshot = Some(snapshot.clone());
160 snapshot
161 }
162
163 pub async fn observe_rtt_ms(&self, rtt_ms: u32, now_ms: u64) -> LocalHealthSnapshot {
164 self.update_snapshot(now_ms, |snapshot, config, _state| {
165 snapshot.ema_rtt_ms = apply_smoothed_value(
166 snapshot.ema_rtt_ms,
167 rtt_ms,
168 config.rtt_hysteresis_ms,
169 now_ms,
170 config,
171 _state,
172 );
173 })
174 .await
175 }
176
177 pub async fn observe_loss_bps(&self, loss_bps: u32, now_ms: u64) -> LocalHealthSnapshot {
178 self.update_snapshot(now_ms, |snapshot, config, _state| {
179 snapshot.ema_loss_bps = apply_smoothed_value(
180 snapshot.ema_loss_bps,
181 loss_bps,
182 config.loss_hysteresis_bps,
183 now_ms,
184 config,
185 _state,
186 );
187 })
188 .await
189 }
190
191 pub async fn observe_traffic_volume(
192 &self,
193 traffic_bytes: u64,
194 now_ms: u64,
195 ) -> LocalHealthSnapshot {
196 self.update_snapshot(now_ms, |snapshot, _config, _state| {
197 snapshot.traffic_volume_bytes =
198 snapshot.traffic_volume_bytes.saturating_add(traffic_bytes);
199 })
200 .await
201 }
202
203 pub async fn observe_sync_blended_retrieval_volume(
204 &self,
205 retrieval_bytes: u64,
206 now_ms: u64,
207 ) -> LocalHealthSnapshot {
208 self.update_snapshot(now_ms, |snapshot, _config, _state| {
209 snapshot.sync_blended_retrieval_bytes = snapshot
210 .sync_blended_retrieval_bytes
211 .saturating_add(retrieval_bytes);
212 })
213 .await
214 }
215
216 pub async fn observe_accountability_reply_volume(
217 &self,
218 reply_bytes: u64,
219 now_ms: u64,
220 ) -> LocalHealthSnapshot {
221 self.update_snapshot(now_ms, |snapshot, _config, _state| {
222 snapshot.accountability_reply_bytes = snapshot
223 .accountability_reply_bytes
224 .saturating_add(reply_bytes);
225 })
226 .await
227 }
228
229 pub async fn observe_churn(&self, churn_events: u32, now_ms: u64) -> LocalHealthSnapshot {
230 self.update_snapshot(now_ms, |snapshot, _config, _state| {
231 snapshot.churn_events = snapshot.churn_events.saturating_add(churn_events);
232 })
233 .await
234 }
235
236 pub async fn observe_queue_pressure(&self, pressure: u32, now_ms: u64) -> LocalHealthSnapshot {
237 self.update_snapshot(now_ms, |snapshot, config, _state| {
238 snapshot.queue_pressure = apply_smoothed_value(
239 snapshot.queue_pressure,
240 pressure.min(config.queue_pressure_cap),
241 config.queue_pressure_hysteresis,
242 now_ms,
243 config,
244 _state,
245 );
246 })
247 .await
248 }
249
250 pub async fn observe_hold_outcome(&self, success: bool, now_ms: u64) -> LocalHealthSnapshot {
251 self.update_snapshot(now_ms, |snapshot, _config, state| {
252 if success {
253 state.hold_successes = state.hold_successes.saturating_add(1);
254 } else {
255 state.hold_failures = state.hold_failures.saturating_add(1);
256 }
257 let total = state
258 .hold_successes
259 .saturating_add(state.hold_failures)
260 .max(1);
261 snapshot.hold_success_bps = (state.hold_successes.saturating_mul(10_000)) / total;
262 })
263 .await
264 }
265
266 pub async fn observe_sync_opportunity(&self, now_ms: u64) -> LocalHealthSnapshot {
267 self.update_snapshot(now_ms, |snapshot, _config, _state| {
268 snapshot.sync_opportunity_count = snapshot.sync_opportunity_count.saturating_add(1);
269 })
270 .await
271 }
272
273 async fn update_snapshot<F>(&self, now_ms: u64, mut update: F) -> LocalHealthSnapshot
274 where
275 F: FnMut(
276 &mut LocalHealthSnapshot,
277 &LocalHealthObserverConfig,
278 &mut LocalHealthObserverState,
279 ),
280 {
281 let mut state = self.state.write().await;
282 let mut snapshot = state.snapshot.clone().unwrap_or(LocalHealthSnapshot {
283 generated_at_ms: now_ms,
284 reachable_provider_count: 0,
285 ema_rtt_ms: 0,
286 ema_loss_bps: 0,
287 traffic_volume_bytes: 0,
288 sync_blended_retrieval_bytes: 0,
289 accountability_reply_bytes: 0,
290 churn_events: 0,
291 observed_route_diversity: 0,
292 queue_pressure: 0,
293 hold_success_bps: 10_000,
294 sync_opportunity_count: 0,
295 });
296 snapshot.generated_at_ms = now_ms;
297 update(&mut snapshot, &self.config, &mut state);
298 state.snapshot = Some(snapshot.clone());
299 snapshot
300 }
301}
302
303impl Clone for LocalHealthObserverService {
304 fn clone(&self) -> Self {
305 Self {
306 config: self.config.clone(),
307 state: self.state.clone(),
308 }
309 }
310}
311
312fn ema(previous: u32, sample: u32, config: &LocalHealthObserverConfig) -> u32 {
313 if previous == 0 {
314 return sample;
315 }
316 let numerator = previous
317 .saturating_mul(config.ema_denominator.saturating_sub(config.ema_numerator))
318 .saturating_add(sample.saturating_mul(config.ema_numerator));
319 numerator / config.ema_denominator.max(1)
320}
321
322fn apply_smoothed_value(
323 previous: u32,
324 sample: u32,
325 hysteresis: u32,
326 now_ms: u64,
327 config: &LocalHealthObserverConfig,
328 state: &mut LocalHealthObserverState,
329) -> u32 {
330 let candidate = ema(previous, sample, config);
331 let within_interval = state
332 .last_smoothed_update_ms
333 .map(|last| now_ms.saturating_sub(last) < config.min_smoothed_update_interval_ms)
334 .unwrap_or(false);
335 if within_interval && previous.abs_diff(candidate) < hysteresis {
336 return previous;
337 }
338 state.last_smoothed_update_ms = Some(now_ms);
339 candidate
340}
341
342#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
343#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
344impl RuntimeService for LocalHealthObserverService {
345 fn name(&self) -> &'static str {
346 "local_health_observer"
347 }
348
349 fn dependencies(&self) -> &[&'static str] {
350 &["rendezvous_manager", "move_manager", "hold_manager"]
351 }
352
353 async fn start(&self, _ctx: &RuntimeServiceContext) -> Result<(), ServiceError> {
354 self.state.write().await.lifecycle = ServiceHealth::Healthy;
355 Ok(())
356 }
357
358 async fn stop(&self) -> Result<(), ServiceError> {
359 self.state.write().await.lifecycle = ServiceHealth::Stopped;
360 Ok(())
361 }
362
363 async fn health(&self) -> ServiceHealth {
364 self.state.read().await.lifecycle.clone()
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use aura_core::service::{
372 LinkEndpoint, LinkProtocol, ProviderCandidate, ProviderEvidence, ServiceFamily,
373 };
374 use aura_core::types::identifiers::AuthorityId;
375
376 fn authority(seed: u8) -> AuthorityId {
377 AuthorityId::new_from_entropy([seed; 32])
378 }
379
380 fn candidate(seed: u8, reachable: bool) -> ProviderCandidate {
381 ProviderCandidate {
382 authority_id: authority(seed),
383 device_id: None,
384 family: ServiceFamily::Move,
385 evidence: vec![ProviderEvidence::Neighborhood],
386 link_endpoints: vec![LinkEndpoint::direct(
387 LinkProtocol::Tcp,
388 format!("127.0.0.1:{}", 8000 + seed as u16),
389 )],
390 route_layer_public_key: Some([seed; 32]),
391 reachable,
392 }
393 }
394
395 #[tokio::test]
396 async fn local_health_observer_smooths_rtt_and_tracks_local_signals() {
397 let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
398 observer
399 .observe_provider_set(&[candidate(1, true), candidate(2, false)], 2, 10)
400 .await;
401 let first = observer.observe_rtt_ms(100, 11).await;
402 let second = observer.observe_rtt_ms(200, 12).await;
403 assert_eq!(first.reachable_provider_count, 1);
404 assert_eq!(second.observed_route_diversity, 2);
405 assert!(second.ema_rtt_ms > 100);
406 assert!(second.ema_rtt_ms < 200);
407 }
408
409 #[tokio::test]
410 async fn local_health_observer_tracks_hold_success_ratio_and_sync_opportunities() {
411 let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
412 observer.observe_hold_outcome(true, 10).await;
413 observer.observe_hold_outcome(false, 11).await;
414 let snapshot = observer.observe_sync_opportunity(12).await;
415 assert_eq!(snapshot.hold_success_bps, 5000);
416 assert_eq!(snapshot.sync_opportunity_count, 1);
417 }
418
419 #[tokio::test]
420 async fn local_health_observer_tracks_loss_traffic_churn_and_queue_pressure() {
421 let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
422 observer.observe_loss_bps(120, 10).await;
423 observer.observe_traffic_volume(2048, 11).await;
424 observer
425 .observe_sync_blended_retrieval_volume(512, 12)
426 .await;
427 observer.observe_accountability_reply_volume(256, 13).await;
428 observer.observe_churn(2, 14).await;
429 let snapshot = observer.observe_queue_pressure(99, 15).await;
430 assert_eq!(snapshot.traffic_volume_bytes, 2048);
431 assert_eq!(snapshot.sync_blended_retrieval_bytes, 512);
432 assert_eq!(snapshot.accountability_reply_bytes, 256);
433 assert_eq!(snapshot.churn_events, 2);
434 assert_eq!(snapshot.ema_loss_bps, 120);
435 assert!(snapshot.queue_pressure <= 16);
436 }
437
438 #[tokio::test]
439 async fn local_health_observer_applies_hysteresis_and_rate_limit_to_smoothed_updates() {
440 let observer = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
441 let first = observer.observe_rtt_ms(100, 10).await;
442 let second = observer.observe_rtt_ms(102, 20).await;
443 let third = observer.observe_rtt_ms(140, 40).await;
444 assert_eq!(first.ema_rtt_ms, 100);
445 assert_eq!(second.ema_rtt_ms, 100);
446 assert!(third.ema_rtt_ms > 100);
447 }
448}