1use super::services::ceremony_runner::CeremonyRunner;
6use super::services::rendezvous_manager::RendezvousManagerError;
7use super::services::AnonymousPathManager;
8use super::services::{
9 AuthorityManager, AuthorityStatus, CeremonyTracker, ContextManager, CoverTrafficGenerator,
10 FlowBudgetManager, HoldManager, LanTransportListenerService, LanTransportService,
11 LocalHealthObserver, MoveManager, ReactivePipelineService, ReceiptManager,
12 ReconfigurationManager, RendezvousManager, RuntimeMaintenanceService, RuntimeService,
13 RuntimeServiceContext, SelectionManager, ServiceError, ServiceErrorKind, ServiceHealth,
14 SocialManager, SyncServiceManager, ThresholdSigningService,
15};
16use super::{
17 AuraEffectSystem, EffectContext, EffectExecutor, LifecycleManager, RuntimeDiagnosticSink,
18 RuntimeShutdownEvent, TaskSupervisor,
19};
20use crate::core::{AgentConfig, AuthorityContext};
21use crate::handlers::RendezvousHandler;
22#[cfg(not(target_arch = "wasm32"))]
23use crate::task_registry::TaskGroup;
24use crate::task_registry::TaskSupervisionError;
25#[cfg(not(target_arch = "wasm32"))]
26use aura_chat::{ChatFact, CHAT_FACT_TYPE_ID};
27use aura_core::effects::time::PhysicalTimeEffects;
28#[cfg(not(target_arch = "wasm32"))]
29use aura_core::effects::transport::TransportEnvelope;
30#[cfg(not(target_arch = "wasm32"))]
31use aura_core::effects::{AmpChannelEffects, ChannelCreateParams, ChannelJoinParams};
32use aura_core::types::identifiers::AuthorityId;
33#[cfg(not(target_arch = "wasm32"))]
34use aura_core::util::serialization::from_slice;
35use aura_core::DeviceId;
36use aura_core::{
37 execute_with_timeout_budget, OwnedShutdownToken, OwnedTaskSpawner, TimeoutBudget,
38 TimeoutRunError,
39};
40#[cfg(not(target_arch = "wasm32"))]
41use aura_guards::GuardContextProvider;
42#[cfg(not(target_arch = "wasm32"))]
43use aura_journal::fact::{FactContent, RelationalFact};
44#[cfg(not(target_arch = "wasm32"))]
45use aura_journal::DomainFact;
46#[cfg(not(target_arch = "wasm32"))]
47use aura_protocol::amp::get_channel_state;
48use aura_rendezvous::{RendezvousDescriptor, TransportHint};
49#[cfg(not(target_arch = "wasm32"))]
50use futures::{SinkExt, StreamExt};
51use std::sync::atomic::{AtomicU8, Ordering};
52use std::sync::Arc;
53use std::time::Duration;
54#[cfg(not(target_arch = "wasm32"))]
55use tokio::io::AsyncReadExt;
56#[cfg(not(target_arch = "wasm32"))]
57use tokio_tungstenite::accept_async;
58
59const MIN_SYNC_PEER_RECONCILE_INTERVAL: Duration = Duration::from_secs(1);
60const MAX_SYNC_PEER_RECONCILE_INTERVAL: Duration = Duration::from_secs(30);
61#[cfg(not(target_arch = "wasm32"))]
62const CHAT_FACT_CONTENT_TYPE: &str = "application/aura-chat-fact";
63#[cfg(not(target_arch = "wasm32"))]
64const FACT_SYNC_REQUEST_CONTENT_TYPE: &str = "application/aura-fact-sync-request";
65#[cfg(not(target_arch = "wasm32"))]
66const FACT_SYNC_RESPONSE_CONTENT_TYPE: &str = "application/aura-fact-sync-response";
67
68mod lifecycle;
69
70pub(crate) fn sync_peer_reconcile_interval(sync_manager: &SyncServiceManager) -> Duration {
71 sync_manager.config().auto_sync_interval.clamp(
72 MIN_SYNC_PEER_RECONCILE_INTERVAL,
73 MAX_SYNC_PEER_RECONCILE_INTERVAL,
74 )
75}
76
77#[derive(Debug, thiserror::Error)]
78pub enum RuntimeShutdownError {
79 #[error("runtime task tree shutdown failed: {0}")]
80 TaskTree(#[from] TaskSupervisionError),
81 #[error("runtime service teardown failed: {0}")]
82 Service(#[from] ServiceError),
83 #[error("lifecycle shutdown failed: {0}")]
84 Lifecycle(crate::AgentError),
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum RuntimeActivityState {
89 Running,
90 Stopping,
91 Stopped,
92}
93
94impl RuntimeActivityState {
95 fn as_u8(self) -> u8 {
96 match self {
97 Self::Running => 0,
98 Self::Stopping => 1,
99 Self::Stopped => 2,
100 }
101 }
102
103 fn from_u8(value: u8) -> Self {
104 match value {
105 0 => Self::Running,
106 1 => Self::Stopping,
107 2 => Self::Stopped,
108 _ => Self::Stopped,
109 }
110 }
111}
112
113#[derive(Debug, thiserror::Error)]
114pub enum RuntimePublicOperationError {
115 #[error("runtime is {state:?} and no longer accepts new public operations")]
116 NotAccepting { state: RuntimeActivityState },
117}
118
119#[derive(Debug, Default)]
120pub struct RuntimeActivityGate {
121 state: AtomicU8,
122}
123
124impl RuntimeActivityGate {
125 pub fn new() -> Self {
126 Self {
127 state: AtomicU8::new(RuntimeActivityState::Running.as_u8()),
128 }
129 }
130
131 pub fn state(&self) -> RuntimeActivityState {
132 RuntimeActivityState::from_u8(self.state.load(Ordering::SeqCst))
133 }
134
135 pub fn begin_shutdown(&self) -> RuntimeActivityState {
136 match self.state.compare_exchange(
137 RuntimeActivityState::Running.as_u8(),
138 RuntimeActivityState::Stopping.as_u8(),
139 Ordering::SeqCst,
140 Ordering::SeqCst,
141 ) {
142 Ok(_) => RuntimeActivityState::Running,
143 Err(previous) => RuntimeActivityState::from_u8(previous),
144 }
145 }
146
147 pub fn mark_stopped(&self) {
148 self.state
149 .store(RuntimeActivityState::Stopped.as_u8(), Ordering::SeqCst);
150 }
151
152 pub fn ensure_accepting_public_operations(&self) -> Result<(), RuntimePublicOperationError> {
153 match self.state() {
154 RuntimeActivityState::Running => Ok(()),
155 state => Err(RuntimePublicOperationError::NotAccepting { state }),
156 }
157 }
158}
159
160pub struct RuntimeSystem {
162 #[allow(dead_code)] effect_executor: EffectExecutor,
165
166 effect_system: Arc<AuraEffectSystem>,
168
169 context_manager: ContextManager,
171
172 authority_manager: AuthorityManager,
174
175 flow_budget_manager: FlowBudgetManager,
177
178 receipt_manager: ReceiptManager,
180
181 lifecycle_manager: LifecycleManager,
183
184 sync_manager: Option<SyncServiceManager>,
186
187 rendezvous_manager: Option<RendezvousManager>,
189
190 move_manager: Option<MoveManager>,
192
193 local_health_observer: Option<LocalHealthObserver>,
195
196 selection_manager: Option<SelectionManager>,
198
199 anonymous_path_manager: Option<AnonymousPathManager>,
201
202 hold_manager: Option<HoldManager>,
204
205 cover_traffic_generator: Option<CoverTrafficGenerator>,
207
208 social_manager: Option<SocialManager>,
210
211 ceremony_tracker: CeremonyTracker,
213
214 ceremony_runner: CeremonyRunner,
216
217 threshold_signing: ThresholdSigningService,
219
220 reactive_pipeline_service: ReactivePipelineService,
222
223 lan_listener_service: Option<LanTransportListenerService>,
225
226 maintenance_service: RuntimeMaintenanceService,
228
229 reconfiguration_manager: ReconfigurationManager,
231
232 runtime_tasks: Arc<TaskSupervisor>,
234
235 activity_gate: Arc<RuntimeActivityGate>,
237
238 diagnostics: Arc<RuntimeDiagnosticSink>,
240
241 #[allow(dead_code)] config: AgentConfig,
244
245 authority_id: AuthorityId,
247}
248
249impl RuntimeSystem {
250 #[allow(clippy::too_many_arguments)]
252 #[allow(dead_code)] pub(crate) fn new(
254 effect_executor: EffectExecutor,
255 effect_system: Arc<AuraEffectSystem>,
256 context_manager: ContextManager,
257 authority_manager: AuthorityManager,
258 flow_budget_manager: FlowBudgetManager,
259 receipt_manager: ReceiptManager,
260 lifecycle_manager: LifecycleManager,
261 config: AgentConfig,
262 authority_id: AuthorityId,
263 ) -> Self {
264 let device_id = config.device_id;
265 let threshold_signing = ThresholdSigningService::new(effect_system.clone());
266 let time_effects: Arc<dyn PhysicalTimeEffects> =
267 Arc::new(effect_system.time_effects().clone());
268 let ceremony_tracker = CeremonyTracker::new(time_effects);
269 let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
270 let reconfiguration_manager = ReconfigurationManager::new();
271 let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
272 let reactive_pipeline_service =
273 ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
274 let maintenance_service = RuntimeMaintenanceService::new(
275 effect_system.clone(),
276 authority_id,
277 device_id,
278 ceremony_tracker.clone(),
279 ceremony_runner.clone(),
280 threshold_signing.clone(),
281 reconfiguration_manager.clone(),
282 None,
283 None,
284 None,
285 None,
286 );
287 let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
288 Self {
289 effect_executor,
290 effect_system,
291 context_manager,
292 authority_manager,
293 flow_budget_manager,
294 receipt_manager,
295 lifecycle_manager,
296 sync_manager: None,
297 rendezvous_manager: None,
298 move_manager: None,
299 local_health_observer: None,
300 selection_manager: None,
301 anonymous_path_manager: None,
302 hold_manager: None,
303 cover_traffic_generator: None,
304 social_manager: None,
305 ceremony_tracker,
306 ceremony_runner,
307 threshold_signing,
308 reactive_pipeline_service,
309 lan_listener_service: None,
310 maintenance_service,
311 reconfiguration_manager,
312 runtime_tasks,
313 activity_gate: Arc::new(RuntimeActivityGate::new()),
314 diagnostics,
315 config,
316 authority_id,
317 }
318 }
319
320 #[allow(clippy::too_many_arguments)]
322 #[allow(dead_code)] pub(crate) fn new_with_sync(
324 effect_executor: EffectExecutor,
325 effect_system: Arc<AuraEffectSystem>,
326 context_manager: ContextManager,
327 authority_manager: AuthorityManager,
328 flow_budget_manager: FlowBudgetManager,
329 receipt_manager: ReceiptManager,
330 lifecycle_manager: LifecycleManager,
331 sync_manager: SyncServiceManager,
332 config: AgentConfig,
333 authority_id: AuthorityId,
334 ) -> Self {
335 let device_id = config.device_id;
336 let threshold_signing = ThresholdSigningService::new(effect_system.clone());
337 let time_effects: Arc<dyn PhysicalTimeEffects> =
338 Arc::new(effect_system.time_effects().clone());
339 let ceremony_tracker = CeremonyTracker::new(time_effects);
340 let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
341 let reconfiguration_manager = ReconfigurationManager::new();
342 let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
343 let reactive_pipeline_service =
344 ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
345 let maintenance_service = RuntimeMaintenanceService::new(
346 effect_system.clone(),
347 authority_id,
348 device_id,
349 ceremony_tracker.clone(),
350 ceremony_runner.clone(),
351 threshold_signing.clone(),
352 reconfiguration_manager.clone(),
353 Some(sync_manager.clone()),
354 None,
355 None,
356 None,
357 );
358 let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
359 Self {
360 effect_executor,
361 effect_system,
362 context_manager,
363 authority_manager,
364 flow_budget_manager,
365 receipt_manager,
366 lifecycle_manager,
367 sync_manager: Some(sync_manager),
368 rendezvous_manager: None,
369 move_manager: None,
370 local_health_observer: None,
371 selection_manager: None,
372 anonymous_path_manager: None,
373 hold_manager: None,
374 cover_traffic_generator: None,
375 social_manager: None,
376 ceremony_tracker,
377 ceremony_runner,
378 threshold_signing,
379 reactive_pipeline_service,
380 lan_listener_service: None,
381 maintenance_service,
382 reconfiguration_manager,
383 runtime_tasks,
384 activity_gate: Arc::new(RuntimeActivityGate::new()),
385 diagnostics,
386 config,
387 authority_id,
388 }
389 }
390
391 #[allow(clippy::too_many_arguments)]
393 #[allow(dead_code)] pub(crate) fn new_with_rendezvous(
395 effect_executor: EffectExecutor,
396 effect_system: Arc<AuraEffectSystem>,
397 context_manager: ContextManager,
398 authority_manager: AuthorityManager,
399 flow_budget_manager: FlowBudgetManager,
400 receipt_manager: ReceiptManager,
401 lifecycle_manager: LifecycleManager,
402 rendezvous_manager: RendezvousManager,
403 config: AgentConfig,
404 authority_id: AuthorityId,
405 ) -> Self {
406 let device_id = config.device_id;
407 let threshold_signing = ThresholdSigningService::new(effect_system.clone());
408 let time_effects: Arc<dyn PhysicalTimeEffects> =
409 Arc::new(effect_system.time_effects().clone());
410 let ceremony_tracker = CeremonyTracker::new(time_effects);
411 let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
412 let reconfiguration_manager = ReconfigurationManager::new();
413 let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
414 let reactive_pipeline_service =
415 ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
416 let maintenance_service = RuntimeMaintenanceService::new(
417 effect_system.clone(),
418 authority_id,
419 device_id,
420 ceremony_tracker.clone(),
421 ceremony_runner.clone(),
422 threshold_signing.clone(),
423 reconfiguration_manager.clone(),
424 None,
425 Some(rendezvous_manager.clone()),
426 None,
427 None,
428 );
429 let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
430 Self {
431 effect_executor,
432 effect_system,
433 context_manager,
434 authority_manager,
435 flow_budget_manager,
436 receipt_manager,
437 lifecycle_manager,
438 sync_manager: None,
439 rendezvous_manager: Some(rendezvous_manager),
440 move_manager: None,
441 local_health_observer: None,
442 selection_manager: None,
443 anonymous_path_manager: None,
444 hold_manager: None,
445 cover_traffic_generator: None,
446 social_manager: None,
447 ceremony_tracker,
448 ceremony_runner,
449 threshold_signing,
450 reactive_pipeline_service,
451 lan_listener_service: None,
452 maintenance_service,
453 reconfiguration_manager,
454 runtime_tasks,
455 activity_gate: Arc::new(RuntimeActivityGate::new()),
456 diagnostics,
457 config,
458 authority_id,
459 }
460 }
461
462 #[allow(clippy::too_many_arguments)]
464 pub(crate) fn new_with_services(
465 effect_executor: EffectExecutor,
466 effect_system: Arc<AuraEffectSystem>,
467 context_manager: ContextManager,
468 authority_manager: AuthorityManager,
469 flow_budget_manager: FlowBudgetManager,
470 receipt_manager: ReceiptManager,
471 lifecycle_manager: LifecycleManager,
472 sync_manager: Option<SyncServiceManager>,
473 rendezvous_manager: Option<RendezvousManager>,
474 move_manager: Option<MoveManager>,
475 local_health_observer: Option<LocalHealthObserver>,
476 selection_manager: Option<SelectionManager>,
477 anonymous_path_manager: Option<AnonymousPathManager>,
478 hold_manager: Option<HoldManager>,
479 cover_traffic_generator: Option<CoverTrafficGenerator>,
480 rendezvous_handler: Option<RendezvousHandler>,
481 lan_transport: Option<Arc<LanTransportService>>,
482 social_manager: Option<SocialManager>,
483 config: AgentConfig,
484 authority_id: AuthorityId,
485 ) -> Self {
486 let device_id = config.device_id;
487 let threshold_signing = ThresholdSigningService::new(effect_system.clone());
488 let time_effects: Arc<dyn PhysicalTimeEffects> =
489 Arc::new(effect_system.time_effects().clone());
490 let ceremony_tracker = CeremonyTracker::new(time_effects);
491 let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
492 let reconfiguration_manager = ReconfigurationManager::new();
493 let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
494 let reactive_pipeline_service =
495 ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
496 let lan_listener_service = lan_transport.clone().map(|lan_transport| {
497 LanTransportListenerService::new(effect_system.clone(), lan_transport)
498 });
499 let maintenance_service = RuntimeMaintenanceService::new(
500 effect_system.clone(),
501 authority_id,
502 device_id,
503 ceremony_tracker.clone(),
504 ceremony_runner.clone(),
505 threshold_signing.clone(),
506 reconfiguration_manager.clone(),
507 sync_manager.clone(),
508 rendezvous_manager.clone(),
509 rendezvous_handler.clone(),
510 lan_transport.clone(),
511 );
512 let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
513 Self {
514 effect_executor,
515 effect_system,
516 context_manager,
517 authority_manager,
518 flow_budget_manager,
519 receipt_manager,
520 lifecycle_manager,
521 sync_manager,
522 rendezvous_manager,
523 move_manager,
524 local_health_observer,
525 selection_manager,
526 anonymous_path_manager,
527 hold_manager,
528 cover_traffic_generator,
529 social_manager,
530 ceremony_tracker,
531 ceremony_runner,
532 threshold_signing,
533 reactive_pipeline_service,
534 lan_listener_service,
535 maintenance_service,
536 reconfiguration_manager,
537 runtime_tasks,
538 activity_gate: Arc::new(RuntimeActivityGate::new()),
539 diagnostics,
540 config,
541 authority_id,
542 }
543 }
544
545 pub fn ceremony_tracker(&self) -> &CeremonyTracker {
547 &self.ceremony_tracker
548 }
549
550 pub fn ceremony_runner(&self) -> &CeremonyRunner {
552 &self.ceremony_runner
553 }
554
555 pub fn threshold_signing(&self) -> ThresholdSigningService {
557 self.threshold_signing.clone()
558 }
559
560 pub fn reconfiguration(&self) -> &ReconfigurationManager {
562 &self.reconfiguration_manager
563 }
564
565 pub fn tasks(&self) -> Arc<TaskSupervisor> {
567 self.runtime_tasks.clone()
568 }
569
570 pub fn activity_gate(&self) -> Arc<RuntimeActivityGate> {
571 self.activity_gate.clone()
572 }
573
574 pub fn runtime_activity_state(&self) -> RuntimeActivityState {
575 self.activity_gate.state()
576 }
577
578 pub fn diagnostics(&self) -> Arc<RuntimeDiagnosticSink> {
579 self.diagnostics.clone()
580 }
581
582 pub fn task_spawner(&self) -> OwnedTaskSpawner {
584 OwnedTaskSpawner::new(
585 self.runtime_tasks.clone(),
586 OwnedShutdownToken::attached(self.runtime_tasks.cancellation_token()),
587 )
588 }
589
590 pub fn authority_id(&self) -> AuthorityId {
592 self.authority_id
593 }
594
595 pub fn device_id(&self) -> DeviceId {
597 self.config.device_id
598 }
599
600 pub fn effects(&self) -> Arc<AuraEffectSystem> {
606 self.effect_system.clone()
607 }
608
609 pub async fn reactive_pipeline_running(&self) -> bool {
611 self.reactive_pipeline_service.is_running().await
612 }
613
614 pub fn contexts(&self) -> &ContextManager {
616 &self.context_manager
617 }
618
619 pub fn authorities(&self) -> &AuthorityManager {
621 &self.authority_manager
622 }
623
624 pub fn flow_budgets(&self) -> &FlowBudgetManager {
626 &self.flow_budget_manager
627 }
628
629 pub fn receipts(&self) -> &ReceiptManager {
631 &self.receipt_manager
632 }
633
634 pub fn lifecycle(&self) -> &LifecycleManager {
636 &self.lifecycle_manager
637 }
638
639 pub fn sync(&self) -> Option<&SyncServiceManager> {
641 self.sync_manager.as_ref()
642 }
643
644 pub fn has_sync(&self) -> bool {
646 self.sync_manager.is_some()
647 }
648
649 pub fn rendezvous(&self) -> Option<&RendezvousManager> {
651 self.rendezvous_manager.as_ref()
652 }
653
654 pub fn has_rendezvous(&self) -> bool {
656 self.rendezvous_manager.is_some()
657 }
658
659 pub fn social(&self) -> Option<&SocialManager> {
661 self.social_manager.as_ref()
662 }
663
664 pub fn hold(&self) -> Option<&HoldManager> {
666 self.hold_manager.as_ref()
667 }
668
669 pub fn has_hold(&self) -> bool {
671 self.hold_manager.is_some()
672 }
673
674 pub fn has_social(&self) -> bool {
676 self.social_manager.is_some()
677 }
678
679 pub async fn shutdown_typed(self, ctx: &EffectContext) -> Result<(), RuntimeShutdownError> {
680 let prior_state = self.activity_gate.begin_shutdown();
681 if prior_state != RuntimeActivityState::Running {
682 tracing::info!(
683 event = RuntimeShutdownEvent::AlreadyInProgress.as_event_name(),
684 previous_state = ?prior_state,
685 "Runtime shutdown requested after shutdown had already started"
686 );
687 self.activity_gate.mark_stopped();
688 return Ok(());
689 }
690
691 let runtime_tasks = self.runtime_tasks.clone();
692 let mut shutdown_error: Option<RuntimeShutdownError> = None;
693
694 tracing::info!(
696 event = RuntimeShutdownEvent::Stage.as_event_name(),
697 stage = "reactive_pipeline",
698 "Starting runtime shutdown"
699 );
700 if let Err(error) = self.reactive_pipeline_service.stop().await {
701 tracing::warn!(
702 event = RuntimeShutdownEvent::ReactivePipelineSignalFailed.as_event_name(),
703 error = %error,
704 "Reactive pipeline shutdown failed during runtime shutdown"
705 );
706 }
707
708 tracing::info!(
709 event = RuntimeShutdownEvent::Stage.as_event_name(),
710 stage = "task_tree",
711 "Cancelling runtime task tree"
712 );
713 if let Err(error) = runtime_tasks
714 .shutdown_with_timeout(Duration::from_secs(5))
715 .await
716 {
717 tracing::warn!(
718 event = RuntimeShutdownEvent::TaskTreeEscalated.as_event_name(),
719 error = %error,
720 "Runtime task tree required forced shutdown"
721 );
722 shutdown_error.get_or_insert(RuntimeShutdownError::TaskTree(error));
723 }
724
725 tracing::info!(
727 event = RuntimeShutdownEvent::Stage.as_event_name(),
728 stage = "services",
729 "Stopping runtime services"
730 );
731 if let Err(e) = self.stop_services().await {
732 tracing::warn!(
733 event = RuntimeShutdownEvent::ServicesFailed.as_event_name(),
734 error = %e,
735 "Failed to stop runtime services during shutdown"
736 );
737 shutdown_error.get_or_insert(RuntimeShutdownError::Service(e));
738 }
739
740 let RuntimeSystem {
741 lifecycle_manager,
742 sync_manager: _sync_manager,
743 rendezvous_manager: _rendezvous_manager,
744 ..
745 } = self;
746
747 tracing::info!(
748 event = RuntimeShutdownEvent::Stage.as_event_name(),
749 stage = "lifecycle_manager",
750 "Shutting down lifecycle manager"
751 );
752 if let Err(error) = lifecycle_manager.shutdown(ctx).await {
753 tracing::warn!(
754 event = RuntimeShutdownEvent::LifecycleFailed.as_event_name(),
755 error = %error,
756 "Lifecycle manager shutdown failed"
757 );
758 shutdown_error.get_or_insert(RuntimeShutdownError::Lifecycle(error));
759 }
760
761 self.activity_gate.mark_stopped();
762
763 match shutdown_error {
764 Some(error) => Err(error),
765 None => Ok(()),
766 }
767 }
768}
769
770#[cfg(not(target_arch = "wasm32"))]
771pub(crate) fn spawn_lan_transport_listener_tasks(
772 parent_tasks: TaskGroup,
773 effects: Arc<AuraEffectSystem>,
774 lan_transport: Arc<LanTransportService>,
775) {
776 let listener = lan_transport.listener();
777 let websocket_listener = lan_transport.websocket_listener();
778 let metrics = lan_transport.metrics_handle();
779 let time_effects: Arc<dyn PhysicalTimeEffects + Send + Sync> =
780 Arc::new(effects.time_effects().clone());
781 let websocket_effects = effects.clone();
782 let tcp_accept_group = parent_tasks.clone();
783 let tcp_connection_group = tcp_accept_group.clone();
784 let _tcp_accept_task_handle =
785 tcp_accept_group.spawn_cancellable_named("tcp_accept_loop", async move {
786 loop {
787 let (mut stream, addr) = match listener.accept().await {
788 Ok((stream, addr)) => (stream, addr),
789 Err(err) => {
790 tracing::warn!(error = %err, "LAN transport accept failed");
791 let now_ms = time_effects
792 .physical_time()
793 .await
794 .ok()
795 .map(|t| t.ts_ms)
796 .unwrap_or(0);
797 let mut metrics = metrics.write().await;
798 metrics.accept_errors = metrics.accept_errors.saturating_add(1);
799 if now_ms > 0 {
800 metrics.last_error_ms = now_ms;
801 }
802 continue;
803 }
804 };
805
806 let effects = effects.clone();
807 let metrics = metrics.clone();
808 let time_effects = time_effects.clone();
809 let connection_group = tcp_connection_group.clone();
810 let now_ms = time_effects
811 .physical_time()
812 .await
813 .ok()
814 .map(|t| t.ts_ms)
815 .unwrap_or(0);
816 {
817 let mut metrics = metrics.write().await;
818 metrics.connections_accepted = metrics.connections_accepted.saturating_add(1);
819 if now_ms > 0 {
820 metrics.last_accept_ms = now_ms;
821 }
822 }
823 let _connection_task =
824 connection_group.spawn_named(format!("tcp_connection.{addr}"), async move {
825 let mut len_buf = [0u8; 4];
826 if let Err(err) = stream.read_exact(&mut len_buf).await {
827 tracing::debug!(error = %err, addr = %addr, "LAN transport read len failed");
828 let now_ms = time_effects
829 .physical_time()
830 .await
831 .ok()
832 .map(|t| t.ts_ms)
833 .unwrap_or(0);
834 let mut metrics = metrics.write().await;
835 metrics.read_errors = metrics.read_errors.saturating_add(1);
836 if now_ms > 0 {
837 metrics.last_error_ms = now_ms;
838 }
839 return;
840 }
841 let len = u32::from_be_bytes(len_buf) as usize;
842 if len == 0 || len > 1024 * 1024 {
843 tracing::debug!(addr = %addr, len = len, "LAN transport invalid frame size");
844 let now_ms = time_effects
845 .physical_time()
846 .await
847 .ok()
848 .map(|t| t.ts_ms)
849 .unwrap_or(0);
850 let mut metrics = metrics.write().await;
851 metrics.decode_errors = metrics.decode_errors.saturating_add(1);
852 if now_ms > 0 {
853 metrics.last_error_ms = now_ms;
854 }
855 return;
856 }
857 let mut payload = vec![0u8; len];
858 if let Err(err) = stream.read_exact(&mut payload).await {
859 tracing::debug!(
860 error = %err,
861 addr = %addr,
862 "LAN transport read payload failed"
863 );
864 let now_ms = time_effects
865 .physical_time()
866 .await
867 .ok()
868 .map(|t| t.ts_ms)
869 .unwrap_or(0);
870 let mut metrics = metrics.write().await;
871 metrics.read_errors = metrics.read_errors.saturating_add(1);
872 if now_ms > 0 {
873 metrics.last_error_ms = now_ms;
874 }
875 return;
876 }
877
878 let envelope = match aura_core::util::serialization::from_slice(&payload) {
879 Ok(envelope) => envelope,
880 Err(err) => {
881 tracing::debug!(error = %err, addr = %addr, "LAN transport decode failed");
882 let now_ms = time_effects
883 .physical_time()
884 .await
885 .ok()
886 .map(|t| t.ts_ms)
887 .unwrap_or(0);
888 let mut metrics = metrics.write().await;
889 metrics.decode_errors = metrics.decode_errors.saturating_add(1);
890 if now_ms > 0 {
891 metrics.last_error_ms = now_ms;
892 }
893 return;
894 }
895 };
896 let now_ms = time_effects
897 .physical_time()
898 .await
899 .ok()
900 .map(|t| t.ts_ms)
901 .unwrap_or(0);
902 {
903 let mut metrics = metrics.write().await;
904 metrics.frames_received = metrics.frames_received.saturating_add(1);
905 metrics.bytes_received = metrics.bytes_received.saturating_add(len as u64);
906 if now_ms > 0 {
907 metrics.last_frame_ms = now_ms;
908 }
909 }
910
911 let _ = handle_inbound_transport_envelope(effects, envelope).await;
912 });
913 }
914 });
915
916 let metrics = lan_transport.metrics_handle();
917 let time_effects: Arc<dyn PhysicalTimeEffects + Send + Sync> =
918 Arc::new(websocket_effects.time_effects().clone());
919 let websocket_accept_group = parent_tasks.clone();
920 let websocket_connection_group = websocket_accept_group.clone();
921 let _websocket_accept_task_handle =
922 websocket_accept_group.spawn_cancellable_named("websocket_accept_loop", async move {
923 loop {
924 let (stream, addr) = match websocket_listener.accept().await {
925 Ok((stream, addr)) => (stream, addr),
926 Err(err) => {
927 tracing::warn!(error = %err, "LAN websocket accept failed");
928 continue;
929 }
930 };
931
932 let effects = websocket_effects.clone();
933 let metrics = metrics.clone();
934 let time_effects = time_effects.clone();
935 let connection_group = websocket_connection_group.clone();
936 let _connection_task = connection_group.spawn_named(
937 format!("websocket_connection.{addr}"),
938 async move {
939 let websocket = match accept_async(stream).await {
940 Ok(websocket) => websocket,
941 Err(err) => {
942 tracing::debug!(
943 error = %err,
944 addr = %addr,
945 "LAN websocket handshake failed"
946 );
947 return;
948 }
949 };
950 let (mut sink, mut stream) = websocket.split();
951 while let Some(message) = stream.next().await {
952 let message = match message {
953 Ok(message) => message,
954 Err(err) => {
955 tracing::debug!(
956 error = %err,
957 addr = %addr,
958 "LAN websocket read failed"
959 );
960 return;
961 }
962 };
963
964 if !message.is_binary() {
965 continue;
966 }
967
968 let payload = message.into_data();
969 let envelope = match aura_core::util::serialization::from_slice::<
970 TransportEnvelope,
971 >(&payload)
972 {
973 Ok(envelope) => envelope,
974 Err(err) => {
975 tracing::debug!(
976 error = %err,
977 addr = %addr,
978 "LAN websocket decode failed"
979 );
980 let mut metrics = metrics.write().await;
981 metrics.decode_errors = metrics.decode_errors.saturating_add(1);
982 continue;
983 }
984 };
985
986 let now_ms = time_effects
987 .physical_time()
988 .await
989 .ok()
990 .map(|t| t.ts_ms)
991 .unwrap_or(0);
992 {
993 let mut metrics = metrics.write().await;
994 metrics.frames_received = metrics.frames_received.saturating_add(1);
995 metrics.bytes_received =
996 metrics.bytes_received.saturating_add(payload.len() as u64);
997 if now_ms > 0 {
998 metrics.last_frame_ms = now_ms;
999 }
1000 }
1001
1002 if let Some(response) =
1003 handle_inbound_transport_envelope(effects.clone(), envelope).await
1004 {
1005 match aura_core::util::serialization::to_vec(&response) {
1006 Ok(bytes) => {
1007 if let Err(err) = sink
1008 .send(tokio_tungstenite::tungstenite::Message::Binary(
1009 bytes,
1010 ))
1011 .await
1012 {
1013 tracing::debug!(
1014 error = %err,
1015 addr = %addr,
1016 "LAN websocket response send failed"
1017 );
1018 return;
1019 }
1020 }
1021 Err(err) => {
1022 tracing::debug!(
1023 error = %err,
1024 addr = %addr,
1025 "LAN websocket response encode failed"
1026 );
1027 }
1028 }
1029 }
1030 }
1031 },
1032 );
1033 }
1034 });
1035}
1036
1037#[cfg(not(target_arch = "wasm32"))]
1038async fn handle_inbound_transport_envelope(
1039 effects: Arc<AuraEffectSystem>,
1040 envelope: TransportEnvelope,
1041) -> Option<TransportEnvelope> {
1042 if envelope
1043 .metadata
1044 .get("content-type")
1045 .is_some_and(|content_type| content_type == FACT_SYNC_REQUEST_CONTENT_TYPE)
1046 {
1047 let local_authority = GuardContextProvider::authority_id(effects.as_ref());
1048 let facts = match effects.load_committed_facts(local_authority).await {
1049 Ok(facts) => facts
1050 .into_iter()
1051 .filter_map(|fact| match fact.content {
1052 FactContent::Relational(rel) => Some(rel),
1053 _ => None,
1054 })
1055 .collect::<Vec<_>>(),
1056 Err(err) => {
1057 tracing::debug!(
1058 error = %err,
1059 "Failed to load committed facts for fact sync response"
1060 );
1061 Vec::new()
1062 }
1063 };
1064
1065 let payload = match aura_core::util::serialization::to_vec(&facts) {
1066 Ok(payload) => payload,
1067 Err(err) => {
1068 tracing::debug!(
1069 error = %err,
1070 "Failed to encode fact sync response payload"
1071 );
1072 return None;
1073 }
1074 };
1075
1076 let mut metadata = std::collections::HashMap::new();
1077 metadata.insert(
1078 "content-type".to_string(),
1079 FACT_SYNC_RESPONSE_CONTENT_TYPE.to_string(),
1080 );
1081
1082 return Some(TransportEnvelope {
1083 destination: envelope.source,
1084 source: envelope.destination,
1085 context: envelope.context,
1086 payload,
1087 metadata,
1088 receipt: None,
1089 });
1090 }
1091
1092 if envelope
1093 .metadata
1094 .get("content-type")
1095 .is_some_and(|content_type| content_type == CHAT_FACT_CONTENT_TYPE)
1096 {
1097 tracing::debug!(
1098 source = %envelope.source,
1099 destination = %envelope.destination,
1100 context = %envelope.context,
1101 "recv-chat-fact"
1102 );
1103 match from_slice::<RelationalFact>(&envelope.payload) {
1104 Ok(fact) => {
1105 if let RelationalFact::Generic {
1106 envelope: chat_envelope,
1107 ..
1108 } = &fact
1109 {
1110 if chat_envelope.type_id.as_str() == CHAT_FACT_TYPE_ID {
1111 if let Some(ChatFact::ChannelCreated {
1112 context_id,
1113 channel_id,
1114 creator_id,
1115 ..
1116 }) = ChatFact::from_envelope(chat_envelope)
1117 {
1118 let local_authority = envelope.destination;
1119 if get_channel_state(effects.as_ref(), context_id, channel_id)
1120 .await
1121 .is_err()
1122 {
1123 if let Err(err) = effects
1124 .create_channel(ChannelCreateParams {
1125 context: context_id,
1126 channel: Some(channel_id),
1127 skip_window: None,
1128 topic: None,
1129 })
1130 .await
1131 {
1132 if get_channel_state(effects.as_ref(), context_id, channel_id)
1133 .await
1134 .is_err()
1135 {
1136 tracing::warn!(
1137 context_id = %context_id,
1138 channel_id = %channel_id,
1139 error = %err,
1140 "Failed to provision AMP channel checkpoint from inbound chat fact"
1141 );
1142 }
1143 }
1144
1145 let mut participants = vec![local_authority];
1146 if creator_id != local_authority {
1147 participants.push(creator_id);
1148 }
1149
1150 for participant in participants {
1151 if let Err(err) = effects
1152 .join_channel(ChannelJoinParams {
1153 context: context_id,
1154 channel: channel_id,
1155 participant,
1156 })
1157 .await
1158 {
1159 tracing::debug!(
1160 context_id = %context_id,
1161 channel_id = %channel_id,
1162 participant = %participant,
1163 error = %err,
1164 "AMP join provisioning from inbound chat fact failed"
1165 );
1166 }
1167 }
1168 }
1169 }
1170 }
1171 }
1172
1173 if let Err(err) = effects.commit_relational_facts(vec![fact]).await {
1174 tracing::debug!(
1175 error = %err,
1176 "LAN transport failed to commit incoming chat fact envelope"
1177 );
1178 }
1179 return None;
1180 }
1181 Err(err) => {
1182 tracing::warn!(
1183 error = %err,
1184 "LAN transport received invalid chat fact envelope payload"
1185 );
1186 return None;
1187 }
1188 }
1189 }
1190
1191 effects.requeue_envelope(envelope);
1192 None
1193}
1194
1195pub(crate) async fn publish_lan_descriptor_with(
1196 effects: Arc<AuraEffectSystem>,
1197 authority_id: AuthorityId,
1198 device_id: DeviceId,
1199 rendezvous_manager: &RendezvousManager,
1200 lan_transport: &LanTransportService,
1201) -> Result<(), ServiceError> {
1202 async fn install_lan_descriptor(
1203 rendezvous_manager: &RendezvousManager,
1204 descriptor: RendezvousDescriptor,
1205 ) -> Result<(), ServiceError> {
1206 rendezvous_manager
1207 .cache_descriptor(descriptor.clone())
1208 .await
1209 .map_err(|error| ServiceError::startup_failed("rendezvous_cache", error.to_string()))?;
1210 rendezvous_manager.set_lan_descriptor(descriptor).await;
1211 Ok(())
1212 }
1213
1214 let authority_context = AuthorityContext::new_with_device(authority_id, device_id);
1215 let handler = RendezvousHandler::new(authority_context.clone())
1216 .map_err(|e| ServiceError::startup_failed("rendezvous_handler", e.to_string()))?;
1217 let context_id = authority_context.default_context_id();
1218
1219 let mut hints = Vec::new();
1220 let tcp_addrs = lan_transport.advertised_addrs();
1221 let websocket_addrs = lan_transport.websocket_addrs();
1222 tracing::info!(
1223 authority = %authority_id,
1224 tcp_addrs = ?tcp_addrs,
1225 websocket_addrs = ?websocket_addrs,
1226 "publish_lan_descriptor_with transport addresses"
1227 );
1228 let mut invalid_tcp_hints = 0usize;
1229 for addr in tcp_addrs {
1230 match TransportHint::tcp_direct(addr) {
1231 Ok(hint) => hints.push(hint),
1232 Err(err) => {
1233 invalid_tcp_hints += 1;
1234 tracing::warn!(addr = %addr, error = %err, "Skipping invalid LAN transport hint");
1235 }
1236 }
1237 }
1238 let mut invalid_websocket_hints = 0usize;
1239 for addr in websocket_addrs {
1240 match TransportHint::websocket_direct(addr) {
1241 Ok(hint) => hints.push(hint),
1242 Err(err) => {
1243 invalid_websocket_hints += 1;
1244 tracing::warn!(
1245 addr = %addr,
1246 error = %err,
1247 "Skipping invalid LAN websocket transport hint"
1248 );
1249 }
1250 }
1251 }
1252
1253 if hints.is_empty() {
1254 tracing::warn!(
1255 authority = %authority_id,
1256 tcp_addrs = ?tcp_addrs,
1257 websocket_addrs = ?websocket_addrs,
1258 invalid_tcp_hints,
1259 invalid_websocket_hints,
1260 "LAN listeners are bound, but no rendezvous descriptor was published because every advertised address was rejected as an invalid direct transport hint; direct LAN discovery will be unavailable until at least one valid address is advertisable"
1261 );
1262 return Ok(());
1263 }
1264
1265 let result = handler
1266 .publish_descriptor(&effects, context_id, hints.clone(), [0u8; 32], 0)
1267 .await
1268 .map_err(|error| ServiceError::startup_failed("rendezvous_publish", error.to_string()))?;
1269 let descriptor = require_published_lan_descriptor(result, device_id)?;
1270 install_lan_descriptor(rendezvous_manager, descriptor).await?;
1271
1272 if let Err(error) = register_bootstrap_candidate_with(rendezvous_manager, lan_transport).await {
1273 tracing::debug!(
1274 error = %error,
1275 "Failed to register bootstrap candidate after publishing LAN descriptor"
1276 );
1277 }
1278
1279 Ok(())
1280}
1281
1282pub(crate) async fn register_bootstrap_candidate_with(
1283 rendezvous_manager: &RendezvousManager,
1284 lan_transport: &LanTransportService,
1285) -> Result<(), RendezvousManagerError> {
1286 let tcp_addrs = lan_transport.advertised_addrs();
1287 let websocket_addrs = lan_transport.websocket_addrs();
1288 let Some(address) = websocket_addrs
1289 .first()
1290 .cloned()
1291 .or_else(|| tcp_addrs.first().cloned())
1292 else {
1293 return Ok(());
1294 };
1295
1296 rendezvous_manager
1297 .register_bootstrap_candidate(address, None)
1298 .await
1299}
1300
1301fn require_published_lan_descriptor(
1302 result: crate::handlers::rendezvous::RendezvousResult,
1303 device_id: DeviceId,
1304) -> Result<RendezvousDescriptor, ServiceError> {
1305 if !result.success {
1306 return Err(ServiceError::startup_failed(
1307 "rendezvous_publish",
1308 result
1309 .error
1310 .unwrap_or_else(|| "LAN descriptor publish failed".to_string()),
1311 ));
1312 }
1313
1314 let descriptor = result.descriptor.ok_or_else(|| {
1315 ServiceError::startup_failed(
1316 "rendezvous_publish",
1317 "LAN descriptor publish succeeded without descriptor payload".to_string(),
1318 )
1319 })?;
1320
1321 Ok(RendezvousDescriptor {
1322 device_id: Some(device_id),
1323 ..descriptor
1324 })
1325}
1326
1327#[cfg(test)]
1328mod tests {
1329 use super::*;
1330 use crate::runtime::builder::EffectSystemBuilder;
1331 use crate::runtime::services::SyncManagerConfig;
1332 use aura_core::ContextId;
1333
1334 #[test]
1335 fn runtime_activity_gate_transitions_and_rejects_new_public_work() {
1336 let gate = RuntimeActivityGate::new();
1337 assert_eq!(gate.state(), RuntimeActivityState::Running);
1338 assert!(gate.ensure_accepting_public_operations().is_ok());
1339
1340 assert_eq!(gate.begin_shutdown(), RuntimeActivityState::Running);
1341 assert_eq!(gate.state(), RuntimeActivityState::Stopping);
1342 assert!(matches!(
1343 gate.ensure_accepting_public_operations(),
1344 Err(RuntimePublicOperationError::NotAccepting {
1345 state: RuntimeActivityState::Stopping
1346 })
1347 ));
1348
1349 assert_eq!(gate.begin_shutdown(), RuntimeActivityState::Stopping);
1350 gate.mark_stopped();
1351 assert_eq!(gate.state(), RuntimeActivityState::Stopped);
1352 }
1353
1354 #[test]
1355 fn sync_peer_reconcile_interval_follows_fast_sync_config() {
1356 let manager = SyncServiceManager::new(SyncManagerConfig {
1357 auto_sync_interval: Duration::from_secs(2),
1358 ..SyncManagerConfig::default()
1359 });
1360
1361 assert_eq!(
1362 sync_peer_reconcile_interval(&manager),
1363 Duration::from_secs(2)
1364 );
1365 }
1366
1367 #[test]
1368 fn sync_peer_reconcile_interval_clamps_large_values() {
1369 let manager = SyncServiceManager::new(SyncManagerConfig {
1370 auto_sync_interval: Duration::from_secs(120),
1371 ..SyncManagerConfig::default()
1372 });
1373
1374 assert_eq!(
1375 sync_peer_reconcile_interval(&manager),
1376 Duration::from_secs(30)
1377 );
1378 }
1379
1380 #[test]
1381 fn sync_peer_reconcile_interval_clamps_small_values() {
1382 let manager = SyncServiceManager::new(SyncManagerConfig {
1383 auto_sync_interval: Duration::from_millis(100),
1384 ..SyncManagerConfig::default()
1385 });
1386
1387 assert_eq!(
1388 sync_peer_reconcile_interval(&manager),
1389 Duration::from_secs(1)
1390 );
1391 }
1392
1393 #[test]
1394 fn runtime_services_include_runtime_maintenance() {
1395 let authority_id = AuthorityId::new_from_entropy([11u8; 32]);
1396 let runtime = EffectSystemBuilder::testing()
1397 .with_authority(authority_id)
1398 .build_sync()
1399 .expect("build_sync should succeed in testing mode");
1400
1401 let service_names = runtime
1402 .runtime_services_in_start_order()
1403 .expect("runtime services should sort cleanly")
1404 .into_iter()
1405 .map(RuntimeService::name)
1406 .collect::<Vec<_>>();
1407
1408 assert!(service_names.contains(&"runtime_maintenance"));
1409 }
1410
1411 #[test]
1412 fn lan_descriptor_publish_requires_descriptor_payload() {
1413 let context_id = ContextId::new_from_entropy([12u8; 32]);
1414 let device_id = DeviceId::new_from_entropy([14u8; 32]);
1415 let result = crate::handlers::rendezvous::RendezvousResult {
1416 success: true,
1417 context_id,
1418 peer: None,
1419 descriptor: None,
1420 error: None,
1421 };
1422
1423 let error = require_published_lan_descriptor(result, device_id)
1424 .expect_err("missing descriptor payload must fail closed");
1425
1426 assert!(error.to_string().contains("descriptor payload"));
1427 assert!(error.to_string().contains("rendezvous_publish"));
1428 }
1429
1430 #[test]
1431 fn lan_descriptor_publish_requires_success_result() {
1432 let context_id = ContextId::new_from_entropy([15u8; 32]);
1433 let device_id = DeviceId::new_from_entropy([16u8; 32]);
1434 let result = crate::handlers::rendezvous::RendezvousResult {
1435 success: false,
1436 context_id,
1437 peer: None,
1438 descriptor: None,
1439 error: Some("guard denied".to_string()),
1440 };
1441
1442 let error = require_published_lan_descriptor(result, device_id)
1443 .expect_err("failed publication must stay terminal");
1444
1445 assert!(error.to_string().contains("guard denied"));
1446 assert!(error.to_string().contains("rendezvous_publish"));
1447 }
1448
1449 #[test]
1450 fn lan_descriptor_publish_preserves_device_binding() {
1451 let authority_id = AuthorityId::new_from_entropy([17u8; 32]);
1452 let context_id = ContextId::new_from_entropy([18u8; 32]);
1453 let device_id = DeviceId::new_from_entropy([19u8; 32]);
1454 let result = crate::handlers::rendezvous::RendezvousResult {
1455 success: true,
1456 context_id,
1457 peer: None,
1458 descriptor: Some(RendezvousDescriptor {
1459 authority_id,
1460 device_id: None,
1461 context_id,
1462 transport_hints: vec![TransportHint::tcp_direct("127.0.0.1:7000").unwrap()],
1463 handshake_psk_commitment: [0u8; 32],
1464 public_key: [0u8; 32],
1465 valid_from: 1,
1466 valid_until: 2,
1467 nonce: [0u8; 32],
1468 nickname_suggestion: None,
1469 }),
1470 error: None,
1471 };
1472
1473 let descriptor = require_published_lan_descriptor(result, device_id)
1474 .expect("successful publish with payload should keep device binding");
1475
1476 assert_eq!(descriptor.device_id, Some(device_id));
1477 assert_eq!(descriptor.context_id, context_id);
1478 assert_eq!(descriptor.authority_id, authority_id);
1479 }
1480}