Skip to main content

aura_agent/runtime/
system.rs

1//! Runtime System
2//!
3//! Main runtime system that orchestrates all agent operations.
4
5use super::services::ceremony_runner::CeremonyRunner;
6use super::services::rendezvous_manager::RendezvousManagerError;
7use super::services::AnonymousPathManager;
8use super::services::{
9    AuthorityManager, AuthorityStatus, CeremonyTracker, ContextManager, CoverTrafficGenerator,
10    FlowBudgetManager, HoldManager, LanTransportListenerService, LanTransportService,
11    LocalHealthObserver, MoveManager, ReactivePipelineService, ReceiptManager,
12    ReconfigurationManager, RendezvousManager, RuntimeMaintenanceService, RuntimeService,
13    RuntimeServiceContext, SelectionManager, ServiceError, ServiceErrorKind, ServiceHealth,
14    SocialManager, SyncServiceManager, ThresholdSigningService,
15};
16use super::{
17    AuraEffectSystem, EffectContext, EffectExecutor, LifecycleManager, RuntimeDiagnosticSink,
18    RuntimeShutdownEvent, TaskSupervisor,
19};
20use crate::core::{AgentConfig, AuthorityContext};
21use crate::handlers::RendezvousHandler;
22#[cfg(not(target_arch = "wasm32"))]
23use crate::task_registry::TaskGroup;
24use crate::task_registry::TaskSupervisionError;
25#[cfg(not(target_arch = "wasm32"))]
26use aura_chat::{ChatFact, CHAT_FACT_TYPE_ID};
27use aura_core::effects::time::PhysicalTimeEffects;
28#[cfg(not(target_arch = "wasm32"))]
29use aura_core::effects::transport::TransportEnvelope;
30#[cfg(not(target_arch = "wasm32"))]
31use aura_core::effects::{AmpChannelEffects, ChannelCreateParams, ChannelJoinParams};
32use aura_core::types::identifiers::AuthorityId;
33#[cfg(not(target_arch = "wasm32"))]
34use aura_core::util::serialization::from_slice;
35use aura_core::DeviceId;
36use aura_core::{
37    execute_with_timeout_budget, OwnedShutdownToken, OwnedTaskSpawner, TimeoutBudget,
38    TimeoutRunError,
39};
40#[cfg(not(target_arch = "wasm32"))]
41use aura_guards::GuardContextProvider;
42#[cfg(not(target_arch = "wasm32"))]
43use aura_journal::fact::{FactContent, RelationalFact};
44#[cfg(not(target_arch = "wasm32"))]
45use aura_journal::DomainFact;
46#[cfg(not(target_arch = "wasm32"))]
47use aura_protocol::amp::get_channel_state;
48use aura_rendezvous::{RendezvousDescriptor, TransportHint};
49#[cfg(not(target_arch = "wasm32"))]
50use futures::{SinkExt, StreamExt};
51use std::sync::atomic::{AtomicU8, Ordering};
52use std::sync::Arc;
53use std::time::Duration;
54#[cfg(not(target_arch = "wasm32"))]
55use tokio::io::AsyncReadExt;
56#[cfg(not(target_arch = "wasm32"))]
57use tokio_tungstenite::accept_async;
58
59const MIN_SYNC_PEER_RECONCILE_INTERVAL: Duration = Duration::from_secs(1);
60const MAX_SYNC_PEER_RECONCILE_INTERVAL: Duration = Duration::from_secs(30);
61#[cfg(not(target_arch = "wasm32"))]
62const CHAT_FACT_CONTENT_TYPE: &str = "application/aura-chat-fact";
63#[cfg(not(target_arch = "wasm32"))]
64const FACT_SYNC_REQUEST_CONTENT_TYPE: &str = "application/aura-fact-sync-request";
65#[cfg(not(target_arch = "wasm32"))]
66const FACT_SYNC_RESPONSE_CONTENT_TYPE: &str = "application/aura-fact-sync-response";
67
68mod lifecycle;
69
70pub(crate) fn sync_peer_reconcile_interval(sync_manager: &SyncServiceManager) -> Duration {
71    sync_manager.config().auto_sync_interval.clamp(
72        MIN_SYNC_PEER_RECONCILE_INTERVAL,
73        MAX_SYNC_PEER_RECONCILE_INTERVAL,
74    )
75}
76
77#[derive(Debug, thiserror::Error)]
78pub enum RuntimeShutdownError {
79    #[error("runtime task tree shutdown failed: {0}")]
80    TaskTree(#[from] TaskSupervisionError),
81    #[error("runtime service teardown failed: {0}")]
82    Service(#[from] ServiceError),
83    #[error("lifecycle shutdown failed: {0}")]
84    Lifecycle(crate::AgentError),
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum RuntimeActivityState {
89    Running,
90    Stopping,
91    Stopped,
92}
93
94impl RuntimeActivityState {
95    fn as_u8(self) -> u8 {
96        match self {
97            Self::Running => 0,
98            Self::Stopping => 1,
99            Self::Stopped => 2,
100        }
101    }
102
103    fn from_u8(value: u8) -> Self {
104        match value {
105            0 => Self::Running,
106            1 => Self::Stopping,
107            2 => Self::Stopped,
108            _ => Self::Stopped,
109        }
110    }
111}
112
113#[derive(Debug, thiserror::Error)]
114pub enum RuntimePublicOperationError {
115    #[error("runtime is {state:?} and no longer accepts new public operations")]
116    NotAccepting { state: RuntimeActivityState },
117}
118
119#[derive(Debug, Default)]
120pub struct RuntimeActivityGate {
121    state: AtomicU8,
122}
123
124impl RuntimeActivityGate {
125    pub fn new() -> Self {
126        Self {
127            state: AtomicU8::new(RuntimeActivityState::Running.as_u8()),
128        }
129    }
130
131    pub fn state(&self) -> RuntimeActivityState {
132        RuntimeActivityState::from_u8(self.state.load(Ordering::SeqCst))
133    }
134
135    pub fn begin_shutdown(&self) -> RuntimeActivityState {
136        match self.state.compare_exchange(
137            RuntimeActivityState::Running.as_u8(),
138            RuntimeActivityState::Stopping.as_u8(),
139            Ordering::SeqCst,
140            Ordering::SeqCst,
141        ) {
142            Ok(_) => RuntimeActivityState::Running,
143            Err(previous) => RuntimeActivityState::from_u8(previous),
144        }
145    }
146
147    pub fn mark_stopped(&self) {
148        self.state
149            .store(RuntimeActivityState::Stopped.as_u8(), Ordering::SeqCst);
150    }
151
152    pub fn ensure_accepting_public_operations(&self) -> Result<(), RuntimePublicOperationError> {
153        match self.state() {
154            RuntimeActivityState::Running => Ok(()),
155            state => Err(RuntimePublicOperationError::NotAccepting { state }),
156        }
157    }
158}
159
160/// Main runtime system for the agent
161pub struct RuntimeSystem {
162    /// Effect executor
163    #[allow(dead_code)] // Will be used for effect dispatch
164    effect_executor: EffectExecutor,
165
166    /// Effect system (immutable after construction, handlers have internal mutability)
167    effect_system: Arc<AuraEffectSystem>,
168
169    /// Context manager
170    context_manager: ContextManager,
171
172    /// Authority manager
173    authority_manager: AuthorityManager,
174
175    /// Flow budget manager
176    flow_budget_manager: FlowBudgetManager,
177
178    /// Receipt manager
179    receipt_manager: ReceiptManager,
180
181    /// Lifecycle manager
182    lifecycle_manager: LifecycleManager,
183
184    /// Sync service manager (optional, for background journal synchronization)
185    sync_manager: Option<SyncServiceManager>,
186
187    /// Rendezvous manager (optional, for peer discovery and channel establishment)
188    rendezvous_manager: Option<RendezvousManager>,
189
190    /// Move manager for bounded movement planning and delivery.
191    move_manager: Option<MoveManager>,
192
193    /// Local health observer for adaptive privacy policy.
194    local_health_observer: Option<LocalHealthObserver>,
195
196    /// Runtime-owned adaptive selection manager.
197    selection_manager: Option<SelectionManager>,
198
199    /// Anonymous path manager for reusable established anonymous paths.
200    anonymous_path_manager: Option<AnonymousPathManager>,
201
202    /// Hold manager for shared custody and selector-based retrieval.
203    hold_manager: Option<HoldManager>,
204
205    /// Cover traffic generator for shared move-substrate cover planning.
206    cover_traffic_generator: Option<CoverTrafficGenerator>,
207
208    /// Social manager (optional, for social topology and relay selection)
209    social_manager: Option<SocialManager>,
210
211    /// Ceremony tracker (for guardian ceremony coordination)
212    ceremony_tracker: CeremonyTracker,
213
214    /// Ceremony runner (shared Category C orchestration API)
215    ceremony_runner: CeremonyRunner,
216
217    /// Threshold signing service (shared state across runtime operations)
218    threshold_signing: ThresholdSigningService,
219
220    /// Service-owned reactive pipeline.
221    reactive_pipeline_service: ReactivePipelineService,
222
223    /// Service-owned LAN transport listeners.
224    lan_listener_service: Option<LanTransportListenerService>,
225
226    /// Service-owned runtime maintenance loops.
227    maintenance_service: RuntimeMaintenanceService,
228
229    /// Reconfiguration manager for link/delegate operations.
230    reconfiguration_manager: ReconfigurationManager,
231
232    /// Runtime task registry for background work
233    runtime_tasks: Arc<TaskSupervisor>,
234
235    /// Shared runtime activity gate used to reject new public work during shutdown.
236    activity_gate: Arc<RuntimeActivityGate>,
237
238    /// Shared diagnostics sink for surfaced async/runtime failures.
239    diagnostics: Arc<RuntimeDiagnosticSink>,
240
241    /// Configuration
242    #[allow(dead_code)] // Will be used for runtime configuration
243    config: AgentConfig,
244
245    /// Authority ID
246    authority_id: AuthorityId,
247}
248
249impl RuntimeSystem {
250    /// Create a new runtime system
251    #[allow(clippy::too_many_arguments)]
252    #[allow(dead_code)] // Factory retained for future runtime wiring
253    pub(crate) fn new(
254        effect_executor: EffectExecutor,
255        effect_system: Arc<AuraEffectSystem>,
256        context_manager: ContextManager,
257        authority_manager: AuthorityManager,
258        flow_budget_manager: FlowBudgetManager,
259        receipt_manager: ReceiptManager,
260        lifecycle_manager: LifecycleManager,
261        config: AgentConfig,
262        authority_id: AuthorityId,
263    ) -> Self {
264        let device_id = config.device_id;
265        let threshold_signing = ThresholdSigningService::new(effect_system.clone());
266        let time_effects: Arc<dyn PhysicalTimeEffects> =
267            Arc::new(effect_system.time_effects().clone());
268        let ceremony_tracker = CeremonyTracker::new(time_effects);
269        let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
270        let reconfiguration_manager = ReconfigurationManager::new();
271        let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
272        let reactive_pipeline_service =
273            ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
274        let maintenance_service = RuntimeMaintenanceService::new(
275            effect_system.clone(),
276            authority_id,
277            device_id,
278            ceremony_tracker.clone(),
279            ceremony_runner.clone(),
280            threshold_signing.clone(),
281            reconfiguration_manager.clone(),
282            None,
283            None,
284            None,
285            None,
286        );
287        let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
288        Self {
289            effect_executor,
290            effect_system,
291            context_manager,
292            authority_manager,
293            flow_budget_manager,
294            receipt_manager,
295            lifecycle_manager,
296            sync_manager: None,
297            rendezvous_manager: None,
298            move_manager: None,
299            local_health_observer: None,
300            selection_manager: None,
301            anonymous_path_manager: None,
302            hold_manager: None,
303            cover_traffic_generator: None,
304            social_manager: None,
305            ceremony_tracker,
306            ceremony_runner,
307            threshold_signing,
308            reactive_pipeline_service,
309            lan_listener_service: None,
310            maintenance_service,
311            reconfiguration_manager,
312            runtime_tasks,
313            activity_gate: Arc::new(RuntimeActivityGate::new()),
314            diagnostics,
315            config,
316            authority_id,
317        }
318    }
319
320    /// Create a new runtime system with sync service
321    #[allow(clippy::too_many_arguments)]
322    #[allow(dead_code)] // Factory retained for future sync-enabled runtime
323    pub(crate) fn new_with_sync(
324        effect_executor: EffectExecutor,
325        effect_system: Arc<AuraEffectSystem>,
326        context_manager: ContextManager,
327        authority_manager: AuthorityManager,
328        flow_budget_manager: FlowBudgetManager,
329        receipt_manager: ReceiptManager,
330        lifecycle_manager: LifecycleManager,
331        sync_manager: SyncServiceManager,
332        config: AgentConfig,
333        authority_id: AuthorityId,
334    ) -> Self {
335        let device_id = config.device_id;
336        let threshold_signing = ThresholdSigningService::new(effect_system.clone());
337        let time_effects: Arc<dyn PhysicalTimeEffects> =
338            Arc::new(effect_system.time_effects().clone());
339        let ceremony_tracker = CeremonyTracker::new(time_effects);
340        let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
341        let reconfiguration_manager = ReconfigurationManager::new();
342        let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
343        let reactive_pipeline_service =
344            ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
345        let maintenance_service = RuntimeMaintenanceService::new(
346            effect_system.clone(),
347            authority_id,
348            device_id,
349            ceremony_tracker.clone(),
350            ceremony_runner.clone(),
351            threshold_signing.clone(),
352            reconfiguration_manager.clone(),
353            Some(sync_manager.clone()),
354            None,
355            None,
356            None,
357        );
358        let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
359        Self {
360            effect_executor,
361            effect_system,
362            context_manager,
363            authority_manager,
364            flow_budget_manager,
365            receipt_manager,
366            lifecycle_manager,
367            sync_manager: Some(sync_manager),
368            rendezvous_manager: None,
369            move_manager: None,
370            local_health_observer: None,
371            selection_manager: None,
372            anonymous_path_manager: None,
373            hold_manager: None,
374            cover_traffic_generator: None,
375            social_manager: None,
376            ceremony_tracker,
377            ceremony_runner,
378            threshold_signing,
379            reactive_pipeline_service,
380            lan_listener_service: None,
381            maintenance_service,
382            reconfiguration_manager,
383            runtime_tasks,
384            activity_gate: Arc::new(RuntimeActivityGate::new()),
385            diagnostics,
386            config,
387            authority_id,
388        }
389    }
390
391    /// Create a new runtime system with rendezvous service
392    #[allow(clippy::too_many_arguments)]
393    #[allow(dead_code)] // Factory retained for future rendezvous-enabled runtime
394    pub(crate) fn new_with_rendezvous(
395        effect_executor: EffectExecutor,
396        effect_system: Arc<AuraEffectSystem>,
397        context_manager: ContextManager,
398        authority_manager: AuthorityManager,
399        flow_budget_manager: FlowBudgetManager,
400        receipt_manager: ReceiptManager,
401        lifecycle_manager: LifecycleManager,
402        rendezvous_manager: RendezvousManager,
403        config: AgentConfig,
404        authority_id: AuthorityId,
405    ) -> Self {
406        let device_id = config.device_id;
407        let threshold_signing = ThresholdSigningService::new(effect_system.clone());
408        let time_effects: Arc<dyn PhysicalTimeEffects> =
409            Arc::new(effect_system.time_effects().clone());
410        let ceremony_tracker = CeremonyTracker::new(time_effects);
411        let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
412        let reconfiguration_manager = ReconfigurationManager::new();
413        let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
414        let reactive_pipeline_service =
415            ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
416        let maintenance_service = RuntimeMaintenanceService::new(
417            effect_system.clone(),
418            authority_id,
419            device_id,
420            ceremony_tracker.clone(),
421            ceremony_runner.clone(),
422            threshold_signing.clone(),
423            reconfiguration_manager.clone(),
424            None,
425            Some(rendezvous_manager.clone()),
426            None,
427            None,
428        );
429        let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
430        Self {
431            effect_executor,
432            effect_system,
433            context_manager,
434            authority_manager,
435            flow_budget_manager,
436            receipt_manager,
437            lifecycle_manager,
438            sync_manager: None,
439            rendezvous_manager: Some(rendezvous_manager),
440            move_manager: None,
441            local_health_observer: None,
442            selection_manager: None,
443            anonymous_path_manager: None,
444            hold_manager: None,
445            cover_traffic_generator: None,
446            social_manager: None,
447            ceremony_tracker,
448            ceremony_runner,
449            threshold_signing,
450            reactive_pipeline_service,
451            lan_listener_service: None,
452            maintenance_service,
453            reconfiguration_manager,
454            runtime_tasks,
455            activity_gate: Arc::new(RuntimeActivityGate::new()),
456            diagnostics,
457            config,
458            authority_id,
459        }
460    }
461
462    /// Create a new runtime system with all services
463    #[allow(clippy::too_many_arguments)]
464    pub(crate) fn new_with_services(
465        effect_executor: EffectExecutor,
466        effect_system: Arc<AuraEffectSystem>,
467        context_manager: ContextManager,
468        authority_manager: AuthorityManager,
469        flow_budget_manager: FlowBudgetManager,
470        receipt_manager: ReceiptManager,
471        lifecycle_manager: LifecycleManager,
472        sync_manager: Option<SyncServiceManager>,
473        rendezvous_manager: Option<RendezvousManager>,
474        move_manager: Option<MoveManager>,
475        local_health_observer: Option<LocalHealthObserver>,
476        selection_manager: Option<SelectionManager>,
477        anonymous_path_manager: Option<AnonymousPathManager>,
478        hold_manager: Option<HoldManager>,
479        cover_traffic_generator: Option<CoverTrafficGenerator>,
480        rendezvous_handler: Option<RendezvousHandler>,
481        lan_transport: Option<Arc<LanTransportService>>,
482        social_manager: Option<SocialManager>,
483        config: AgentConfig,
484        authority_id: AuthorityId,
485    ) -> Self {
486        let device_id = config.device_id;
487        let threshold_signing = ThresholdSigningService::new(effect_system.clone());
488        let time_effects: Arc<dyn PhysicalTimeEffects> =
489            Arc::new(effect_system.time_effects().clone());
490        let ceremony_tracker = CeremonyTracker::new(time_effects);
491        let ceremony_runner = CeremonyRunner::new(ceremony_tracker.clone());
492        let reconfiguration_manager = ReconfigurationManager::new();
493        let diagnostics = Arc::new(RuntimeDiagnosticSink::new());
494        let reactive_pipeline_service =
495            ReactivePipelineService::new(effect_system.clone(), authority_id, diagnostics.clone());
496        let lan_listener_service = lan_transport.clone().map(|lan_transport| {
497            LanTransportListenerService::new(effect_system.clone(), lan_transport)
498        });
499        let maintenance_service = RuntimeMaintenanceService::new(
500            effect_system.clone(),
501            authority_id,
502            device_id,
503            ceremony_tracker.clone(),
504            ceremony_runner.clone(),
505            threshold_signing.clone(),
506            reconfiguration_manager.clone(),
507            sync_manager.clone(),
508            rendezvous_manager.clone(),
509            rendezvous_handler.clone(),
510            lan_transport.clone(),
511        );
512        let runtime_tasks = Arc::new(TaskSupervisor::with_diagnostics(diagnostics.clone()));
513        Self {
514            effect_executor,
515            effect_system,
516            context_manager,
517            authority_manager,
518            flow_budget_manager,
519            receipt_manager,
520            lifecycle_manager,
521            sync_manager,
522            rendezvous_manager,
523            move_manager,
524            local_health_observer,
525            selection_manager,
526            anonymous_path_manager,
527            hold_manager,
528            cover_traffic_generator,
529            social_manager,
530            ceremony_tracker,
531            ceremony_runner,
532            threshold_signing,
533            reactive_pipeline_service,
534            lan_listener_service,
535            maintenance_service,
536            reconfiguration_manager,
537            runtime_tasks,
538            activity_gate: Arc::new(RuntimeActivityGate::new()),
539            diagnostics,
540            config,
541            authority_id,
542        }
543    }
544
545    /// Get the ceremony tracker
546    pub fn ceremony_tracker(&self) -> &CeremonyTracker {
547        &self.ceremony_tracker
548    }
549
550    /// Get the ceremony runner
551    pub fn ceremony_runner(&self) -> &CeremonyRunner {
552        &self.ceremony_runner
553    }
554
555    /// Get the shared threshold signing service.
556    pub fn threshold_signing(&self) -> ThresholdSigningService {
557        self.threshold_signing.clone()
558    }
559
560    /// Get runtime reconfiguration manager.
561    pub fn reconfiguration(&self) -> &ReconfigurationManager {
562        &self.reconfiguration_manager
563    }
564
565    /// Get the runtime task registry.
566    pub fn tasks(&self) -> Arc<TaskSupervisor> {
567        self.runtime_tasks.clone()
568    }
569
570    pub fn activity_gate(&self) -> Arc<RuntimeActivityGate> {
571        self.activity_gate.clone()
572    }
573
574    pub fn runtime_activity_state(&self) -> RuntimeActivityState {
575        self.activity_gate.state()
576    }
577
578    pub fn diagnostics(&self) -> Arc<RuntimeDiagnosticSink> {
579        self.diagnostics.clone()
580    }
581
582    /// Get the runtime task spawner through the sanctioned owned wrapper.
583    pub fn task_spawner(&self) -> OwnedTaskSpawner {
584        OwnedTaskSpawner::new(
585            self.runtime_tasks.clone(),
586            OwnedShutdownToken::attached(self.runtime_tasks.cancellation_token()),
587        )
588    }
589
590    /// Get the authority ID
591    pub fn authority_id(&self) -> AuthorityId {
592        self.authority_id
593    }
594
595    /// Device id for this runtime instance.
596    pub fn device_id(&self) -> DeviceId {
597        self.config.device_id
598    }
599
600    /// Get the effect system
601    ///
602    /// Returns a shared reference to the effect system. The effect system is
603    /// immutable after construction; individual handlers manage their own
604    /// internal state as needed.
605    pub fn effects(&self) -> Arc<AuraEffectSystem> {
606        self.effect_system.clone()
607    }
608
609    /// Check whether the service-owned reactive pipeline is running.
610    pub async fn reactive_pipeline_running(&self) -> bool {
611        self.reactive_pipeline_service.is_running().await
612    }
613
614    /// Get the context manager
615    pub fn contexts(&self) -> &ContextManager {
616        &self.context_manager
617    }
618
619    /// Get the authority manager
620    pub fn authorities(&self) -> &AuthorityManager {
621        &self.authority_manager
622    }
623
624    /// Get the flow budget manager
625    pub fn flow_budgets(&self) -> &FlowBudgetManager {
626        &self.flow_budget_manager
627    }
628
629    /// Get the receipt manager
630    pub fn receipts(&self) -> &ReceiptManager {
631        &self.receipt_manager
632    }
633
634    /// Get the lifecycle manager
635    pub fn lifecycle(&self) -> &LifecycleManager {
636        &self.lifecycle_manager
637    }
638
639    /// Get the sync service manager (if enabled)
640    pub fn sync(&self) -> Option<&SyncServiceManager> {
641        self.sync_manager.as_ref()
642    }
643
644    /// Check if sync service is enabled
645    pub fn has_sync(&self) -> bool {
646        self.sync_manager.is_some()
647    }
648
649    /// Get the rendezvous manager (if enabled)
650    pub fn rendezvous(&self) -> Option<&RendezvousManager> {
651        self.rendezvous_manager.as_ref()
652    }
653
654    /// Check if rendezvous service is enabled
655    pub fn has_rendezvous(&self) -> bool {
656        self.rendezvous_manager.is_some()
657    }
658
659    /// Get the social manager (if enabled)
660    pub fn social(&self) -> Option<&SocialManager> {
661        self.social_manager.as_ref()
662    }
663
664    /// Get the hold service manager (if enabled).
665    pub fn hold(&self) -> Option<&HoldManager> {
666        self.hold_manager.as_ref()
667    }
668
669    /// Check if hold service is enabled.
670    pub fn has_hold(&self) -> bool {
671        self.hold_manager.is_some()
672    }
673
674    /// Check if social service is enabled
675    pub fn has_social(&self) -> bool {
676        self.social_manager.is_some()
677    }
678
679    pub async fn shutdown_typed(self, ctx: &EffectContext) -> Result<(), RuntimeShutdownError> {
680        let prior_state = self.activity_gate.begin_shutdown();
681        if prior_state != RuntimeActivityState::Running {
682            tracing::info!(
683                event = RuntimeShutdownEvent::AlreadyInProgress.as_event_name(),
684                previous_state = ?prior_state,
685                "Runtime shutdown requested after shutdown had already started"
686            );
687            self.activity_gate.mark_stopped();
688            return Ok(());
689        }
690
691        let runtime_tasks = self.runtime_tasks.clone();
692        let mut shutdown_error: Option<RuntimeShutdownError> = None;
693
694        // Drain the reactive scheduler before cancelling the broader runtime task tree.
695        tracing::info!(
696            event = RuntimeShutdownEvent::Stage.as_event_name(),
697            stage = "reactive_pipeline",
698            "Starting runtime shutdown"
699        );
700        if let Err(error) = self.reactive_pipeline_service.stop().await {
701            tracing::warn!(
702                event = RuntimeShutdownEvent::ReactivePipelineSignalFailed.as_event_name(),
703                error = %error,
704                "Reactive pipeline shutdown failed during runtime shutdown"
705            );
706        }
707
708        tracing::info!(
709            event = RuntimeShutdownEvent::Stage.as_event_name(),
710            stage = "task_tree",
711            "Cancelling runtime task tree"
712        );
713        if let Err(error) = runtime_tasks
714            .shutdown_with_timeout(Duration::from_secs(5))
715            .await
716        {
717            tracing::warn!(
718                event = RuntimeShutdownEvent::TaskTreeEscalated.as_event_name(),
719                error = %error,
720                "Runtime task tree required forced shutdown"
721            );
722            shutdown_error.get_or_insert(RuntimeShutdownError::TaskTree(error));
723        }
724
725        // Stop services after background runtime work has been cancelled.
726        tracing::info!(
727            event = RuntimeShutdownEvent::Stage.as_event_name(),
728            stage = "services",
729            "Stopping runtime services"
730        );
731        if let Err(e) = self.stop_services().await {
732            tracing::warn!(
733                event = RuntimeShutdownEvent::ServicesFailed.as_event_name(),
734                error = %e,
735                "Failed to stop runtime services during shutdown"
736            );
737            shutdown_error.get_or_insert(RuntimeShutdownError::Service(e));
738        }
739
740        let RuntimeSystem {
741            lifecycle_manager,
742            sync_manager: _sync_manager,
743            rendezvous_manager: _rendezvous_manager,
744            ..
745        } = self;
746
747        tracing::info!(
748            event = RuntimeShutdownEvent::Stage.as_event_name(),
749            stage = "lifecycle_manager",
750            "Shutting down lifecycle manager"
751        );
752        if let Err(error) = lifecycle_manager.shutdown(ctx).await {
753            tracing::warn!(
754                event = RuntimeShutdownEvent::LifecycleFailed.as_event_name(),
755                error = %error,
756                "Lifecycle manager shutdown failed"
757            );
758            shutdown_error.get_or_insert(RuntimeShutdownError::Lifecycle(error));
759        }
760
761        self.activity_gate.mark_stopped();
762
763        match shutdown_error {
764            Some(error) => Err(error),
765            None => Ok(()),
766        }
767    }
768}
769
770#[cfg(not(target_arch = "wasm32"))]
771pub(crate) fn spawn_lan_transport_listener_tasks(
772    parent_tasks: TaskGroup,
773    effects: Arc<AuraEffectSystem>,
774    lan_transport: Arc<LanTransportService>,
775) {
776    let listener = lan_transport.listener();
777    let websocket_listener = lan_transport.websocket_listener();
778    let metrics = lan_transport.metrics_handle();
779    let time_effects: Arc<dyn PhysicalTimeEffects + Send + Sync> =
780        Arc::new(effects.time_effects().clone());
781    let websocket_effects = effects.clone();
782    let tcp_accept_group = parent_tasks.clone();
783    let tcp_connection_group = tcp_accept_group.clone();
784    let _tcp_accept_task_handle =
785        tcp_accept_group.spawn_cancellable_named("tcp_accept_loop", async move {
786            loop {
787                let (mut stream, addr) = match listener.accept().await {
788                    Ok((stream, addr)) => (stream, addr),
789                    Err(err) => {
790                        tracing::warn!(error = %err, "LAN transport accept failed");
791                        let now_ms = time_effects
792                            .physical_time()
793                            .await
794                            .ok()
795                            .map(|t| t.ts_ms)
796                            .unwrap_or(0);
797                        let mut metrics = metrics.write().await;
798                        metrics.accept_errors = metrics.accept_errors.saturating_add(1);
799                        if now_ms > 0 {
800                            metrics.last_error_ms = now_ms;
801                        }
802                        continue;
803                    }
804                };
805
806                let effects = effects.clone();
807                let metrics = metrics.clone();
808                let time_effects = time_effects.clone();
809                let connection_group = tcp_connection_group.clone();
810                let now_ms = time_effects
811                    .physical_time()
812                    .await
813                    .ok()
814                    .map(|t| t.ts_ms)
815                    .unwrap_or(0);
816                {
817                    let mut metrics = metrics.write().await;
818                    metrics.connections_accepted = metrics.connections_accepted.saturating_add(1);
819                    if now_ms > 0 {
820                        metrics.last_accept_ms = now_ms;
821                    }
822                }
823                let _connection_task =
824                connection_group.spawn_named(format!("tcp_connection.{addr}"), async move {
825                let mut len_buf = [0u8; 4];
826                if let Err(err) = stream.read_exact(&mut len_buf).await {
827                    tracing::debug!(error = %err, addr = %addr, "LAN transport read len failed");
828                    let now_ms = time_effects
829                        .physical_time()
830                        .await
831                        .ok()
832                        .map(|t| t.ts_ms)
833                        .unwrap_or(0);
834                    let mut metrics = metrics.write().await;
835                    metrics.read_errors = metrics.read_errors.saturating_add(1);
836                    if now_ms > 0 {
837                        metrics.last_error_ms = now_ms;
838                    }
839                    return;
840                }
841                let len = u32::from_be_bytes(len_buf) as usize;
842                if len == 0 || len > 1024 * 1024 {
843                    tracing::debug!(addr = %addr, len = len, "LAN transport invalid frame size");
844                    let now_ms = time_effects
845                        .physical_time()
846                        .await
847                        .ok()
848                        .map(|t| t.ts_ms)
849                        .unwrap_or(0);
850                    let mut metrics = metrics.write().await;
851                    metrics.decode_errors = metrics.decode_errors.saturating_add(1);
852                    if now_ms > 0 {
853                        metrics.last_error_ms = now_ms;
854                    }
855                    return;
856                }
857                let mut payload = vec![0u8; len];
858                if let Err(err) = stream.read_exact(&mut payload).await {
859                    tracing::debug!(
860                        error = %err,
861                        addr = %addr,
862                        "LAN transport read payload failed"
863                    );
864                    let now_ms = time_effects
865                        .physical_time()
866                        .await
867                        .ok()
868                        .map(|t| t.ts_ms)
869                        .unwrap_or(0);
870                    let mut metrics = metrics.write().await;
871                    metrics.read_errors = metrics.read_errors.saturating_add(1);
872                    if now_ms > 0 {
873                        metrics.last_error_ms = now_ms;
874                    }
875                    return;
876                }
877
878                let envelope = match aura_core::util::serialization::from_slice(&payload) {
879                    Ok(envelope) => envelope,
880                    Err(err) => {
881                        tracing::debug!(error = %err, addr = %addr, "LAN transport decode failed");
882                        let now_ms = time_effects
883                            .physical_time()
884                            .await
885                            .ok()
886                            .map(|t| t.ts_ms)
887                            .unwrap_or(0);
888                        let mut metrics = metrics.write().await;
889                        metrics.decode_errors = metrics.decode_errors.saturating_add(1);
890                        if now_ms > 0 {
891                            metrics.last_error_ms = now_ms;
892                        }
893                        return;
894                    }
895                };
896                let now_ms = time_effects
897                    .physical_time()
898                    .await
899                    .ok()
900                    .map(|t| t.ts_ms)
901                    .unwrap_or(0);
902                {
903                    let mut metrics = metrics.write().await;
904                    metrics.frames_received = metrics.frames_received.saturating_add(1);
905                    metrics.bytes_received = metrics.bytes_received.saturating_add(len as u64);
906                    if now_ms > 0 {
907                        metrics.last_frame_ms = now_ms;
908                    }
909                }
910
911                let _ = handle_inbound_transport_envelope(effects, envelope).await;
912                });
913            }
914        });
915
916    let metrics = lan_transport.metrics_handle();
917    let time_effects: Arc<dyn PhysicalTimeEffects + Send + Sync> =
918        Arc::new(websocket_effects.time_effects().clone());
919    let websocket_accept_group = parent_tasks.clone();
920    let websocket_connection_group = websocket_accept_group.clone();
921    let _websocket_accept_task_handle =
922        websocket_accept_group.spawn_cancellable_named("websocket_accept_loop", async move {
923            loop {
924                let (stream, addr) = match websocket_listener.accept().await {
925                    Ok((stream, addr)) => (stream, addr),
926                    Err(err) => {
927                        tracing::warn!(error = %err, "LAN websocket accept failed");
928                        continue;
929                    }
930                };
931
932                let effects = websocket_effects.clone();
933                let metrics = metrics.clone();
934                let time_effects = time_effects.clone();
935                let connection_group = websocket_connection_group.clone();
936                let _connection_task = connection_group.spawn_named(
937                    format!("websocket_connection.{addr}"),
938                    async move {
939                        let websocket = match accept_async(stream).await {
940                            Ok(websocket) => websocket,
941                            Err(err) => {
942                                tracing::debug!(
943                                    error = %err,
944                                    addr = %addr,
945                                    "LAN websocket handshake failed"
946                                );
947                                return;
948                            }
949                        };
950                        let (mut sink, mut stream) = websocket.split();
951                        while let Some(message) = stream.next().await {
952                            let message = match message {
953                                Ok(message) => message,
954                                Err(err) => {
955                                    tracing::debug!(
956                                        error = %err,
957                                        addr = %addr,
958                                        "LAN websocket read failed"
959                                    );
960                                    return;
961                                }
962                            };
963
964                            if !message.is_binary() {
965                                continue;
966                            }
967
968                            let payload = message.into_data();
969                            let envelope = match aura_core::util::serialization::from_slice::<
970                                TransportEnvelope,
971                            >(&payload)
972                            {
973                                Ok(envelope) => envelope,
974                                Err(err) => {
975                                    tracing::debug!(
976                                        error = %err,
977                                        addr = %addr,
978                                        "LAN websocket decode failed"
979                                    );
980                                    let mut metrics = metrics.write().await;
981                                    metrics.decode_errors = metrics.decode_errors.saturating_add(1);
982                                    continue;
983                                }
984                            };
985
986                            let now_ms = time_effects
987                                .physical_time()
988                                .await
989                                .ok()
990                                .map(|t| t.ts_ms)
991                                .unwrap_or(0);
992                            {
993                                let mut metrics = metrics.write().await;
994                                metrics.frames_received = metrics.frames_received.saturating_add(1);
995                                metrics.bytes_received =
996                                    metrics.bytes_received.saturating_add(payload.len() as u64);
997                                if now_ms > 0 {
998                                    metrics.last_frame_ms = now_ms;
999                                }
1000                            }
1001
1002                            if let Some(response) =
1003                                handle_inbound_transport_envelope(effects.clone(), envelope).await
1004                            {
1005                                match aura_core::util::serialization::to_vec(&response) {
1006                                    Ok(bytes) => {
1007                                        if let Err(err) = sink
1008                                            .send(tokio_tungstenite::tungstenite::Message::Binary(
1009                                                bytes,
1010                                            ))
1011                                            .await
1012                                        {
1013                                            tracing::debug!(
1014                                                error = %err,
1015                                                addr = %addr,
1016                                                "LAN websocket response send failed"
1017                                            );
1018                                            return;
1019                                        }
1020                                    }
1021                                    Err(err) => {
1022                                        tracing::debug!(
1023                                            error = %err,
1024                                            addr = %addr,
1025                                            "LAN websocket response encode failed"
1026                                        );
1027                                    }
1028                                }
1029                            }
1030                        }
1031                    },
1032                );
1033            }
1034        });
1035}
1036
1037#[cfg(not(target_arch = "wasm32"))]
1038async fn handle_inbound_transport_envelope(
1039    effects: Arc<AuraEffectSystem>,
1040    envelope: TransportEnvelope,
1041) -> Option<TransportEnvelope> {
1042    if envelope
1043        .metadata
1044        .get("content-type")
1045        .is_some_and(|content_type| content_type == FACT_SYNC_REQUEST_CONTENT_TYPE)
1046    {
1047        let local_authority = GuardContextProvider::authority_id(effects.as_ref());
1048        let facts = match effects.load_committed_facts(local_authority).await {
1049            Ok(facts) => facts
1050                .into_iter()
1051                .filter_map(|fact| match fact.content {
1052                    FactContent::Relational(rel) => Some(rel),
1053                    _ => None,
1054                })
1055                .collect::<Vec<_>>(),
1056            Err(err) => {
1057                tracing::debug!(
1058                    error = %err,
1059                    "Failed to load committed facts for fact sync response"
1060                );
1061                Vec::new()
1062            }
1063        };
1064
1065        let payload = match aura_core::util::serialization::to_vec(&facts) {
1066            Ok(payload) => payload,
1067            Err(err) => {
1068                tracing::debug!(
1069                    error = %err,
1070                    "Failed to encode fact sync response payload"
1071                );
1072                return None;
1073            }
1074        };
1075
1076        let mut metadata = std::collections::HashMap::new();
1077        metadata.insert(
1078            "content-type".to_string(),
1079            FACT_SYNC_RESPONSE_CONTENT_TYPE.to_string(),
1080        );
1081
1082        return Some(TransportEnvelope {
1083            destination: envelope.source,
1084            source: envelope.destination,
1085            context: envelope.context,
1086            payload,
1087            metadata,
1088            receipt: None,
1089        });
1090    }
1091
1092    if envelope
1093        .metadata
1094        .get("content-type")
1095        .is_some_and(|content_type| content_type == CHAT_FACT_CONTENT_TYPE)
1096    {
1097        tracing::debug!(
1098            source = %envelope.source,
1099            destination = %envelope.destination,
1100            context = %envelope.context,
1101            "recv-chat-fact"
1102        );
1103        match from_slice::<RelationalFact>(&envelope.payload) {
1104            Ok(fact) => {
1105                if let RelationalFact::Generic {
1106                    envelope: chat_envelope,
1107                    ..
1108                } = &fact
1109                {
1110                    if chat_envelope.type_id.as_str() == CHAT_FACT_TYPE_ID {
1111                        if let Some(ChatFact::ChannelCreated {
1112                            context_id,
1113                            channel_id,
1114                            creator_id,
1115                            ..
1116                        }) = ChatFact::from_envelope(chat_envelope)
1117                        {
1118                            let local_authority = envelope.destination;
1119                            if get_channel_state(effects.as_ref(), context_id, channel_id)
1120                                .await
1121                                .is_err()
1122                            {
1123                                if let Err(err) = effects
1124                                    .create_channel(ChannelCreateParams {
1125                                        context: context_id,
1126                                        channel: Some(channel_id),
1127                                        skip_window: None,
1128                                        topic: None,
1129                                    })
1130                                    .await
1131                                {
1132                                    if get_channel_state(effects.as_ref(), context_id, channel_id)
1133                                        .await
1134                                        .is_err()
1135                                    {
1136                                        tracing::warn!(
1137                                            context_id = %context_id,
1138                                            channel_id = %channel_id,
1139                                            error = %err,
1140                                            "Failed to provision AMP channel checkpoint from inbound chat fact"
1141                                        );
1142                                    }
1143                                }
1144
1145                                let mut participants = vec![local_authority];
1146                                if creator_id != local_authority {
1147                                    participants.push(creator_id);
1148                                }
1149
1150                                for participant in participants {
1151                                    if let Err(err) = effects
1152                                        .join_channel(ChannelJoinParams {
1153                                            context: context_id,
1154                                            channel: channel_id,
1155                                            participant,
1156                                        })
1157                                        .await
1158                                    {
1159                                        tracing::debug!(
1160                                            context_id = %context_id,
1161                                            channel_id = %channel_id,
1162                                            participant = %participant,
1163                                            error = %err,
1164                                            "AMP join provisioning from inbound chat fact failed"
1165                                        );
1166                                    }
1167                                }
1168                            }
1169                        }
1170                    }
1171                }
1172
1173                if let Err(err) = effects.commit_relational_facts(vec![fact]).await {
1174                    tracing::debug!(
1175                        error = %err,
1176                        "LAN transport failed to commit incoming chat fact envelope"
1177                    );
1178                }
1179                return None;
1180            }
1181            Err(err) => {
1182                tracing::warn!(
1183                    error = %err,
1184                    "LAN transport received invalid chat fact envelope payload"
1185                );
1186                return None;
1187            }
1188        }
1189    }
1190
1191    effects.requeue_envelope(envelope);
1192    None
1193}
1194
1195pub(crate) async fn publish_lan_descriptor_with(
1196    effects: Arc<AuraEffectSystem>,
1197    authority_id: AuthorityId,
1198    device_id: DeviceId,
1199    rendezvous_manager: &RendezvousManager,
1200    lan_transport: &LanTransportService,
1201) -> Result<(), ServiceError> {
1202    async fn install_lan_descriptor(
1203        rendezvous_manager: &RendezvousManager,
1204        descriptor: RendezvousDescriptor,
1205    ) -> Result<(), ServiceError> {
1206        rendezvous_manager
1207            .cache_descriptor(descriptor.clone())
1208            .await
1209            .map_err(|error| ServiceError::startup_failed("rendezvous_cache", error.to_string()))?;
1210        rendezvous_manager.set_lan_descriptor(descriptor).await;
1211        Ok(())
1212    }
1213
1214    let authority_context = AuthorityContext::new_with_device(authority_id, device_id);
1215    let handler = RendezvousHandler::new(authority_context.clone())
1216        .map_err(|e| ServiceError::startup_failed("rendezvous_handler", e.to_string()))?;
1217    let context_id = authority_context.default_context_id();
1218
1219    let mut hints = Vec::new();
1220    let tcp_addrs = lan_transport.advertised_addrs();
1221    let websocket_addrs = lan_transport.websocket_addrs();
1222    tracing::info!(
1223        authority = %authority_id,
1224        tcp_addrs = ?tcp_addrs,
1225        websocket_addrs = ?websocket_addrs,
1226        "publish_lan_descriptor_with transport addresses"
1227    );
1228    let mut invalid_tcp_hints = 0usize;
1229    for addr in tcp_addrs {
1230        match TransportHint::tcp_direct(addr) {
1231            Ok(hint) => hints.push(hint),
1232            Err(err) => {
1233                invalid_tcp_hints += 1;
1234                tracing::warn!(addr = %addr, error = %err, "Skipping invalid LAN transport hint");
1235            }
1236        }
1237    }
1238    let mut invalid_websocket_hints = 0usize;
1239    for addr in websocket_addrs {
1240        match TransportHint::websocket_direct(addr) {
1241            Ok(hint) => hints.push(hint),
1242            Err(err) => {
1243                invalid_websocket_hints += 1;
1244                tracing::warn!(
1245                    addr = %addr,
1246                    error = %err,
1247                    "Skipping invalid LAN websocket transport hint"
1248                );
1249            }
1250        }
1251    }
1252
1253    if hints.is_empty() {
1254        tracing::warn!(
1255            authority = %authority_id,
1256            tcp_addrs = ?tcp_addrs,
1257            websocket_addrs = ?websocket_addrs,
1258            invalid_tcp_hints,
1259            invalid_websocket_hints,
1260            "LAN listeners are bound, but no rendezvous descriptor was published because every advertised address was rejected as an invalid direct transport hint; direct LAN discovery will be unavailable until at least one valid address is advertisable"
1261        );
1262        return Ok(());
1263    }
1264
1265    let result = handler
1266        .publish_descriptor(&effects, context_id, hints.clone(), [0u8; 32], 0)
1267        .await
1268        .map_err(|error| ServiceError::startup_failed("rendezvous_publish", error.to_string()))?;
1269    let descriptor = require_published_lan_descriptor(result, device_id)?;
1270    install_lan_descriptor(rendezvous_manager, descriptor).await?;
1271
1272    if let Err(error) = register_bootstrap_candidate_with(rendezvous_manager, lan_transport).await {
1273        tracing::debug!(
1274            error = %error,
1275            "Failed to register bootstrap candidate after publishing LAN descriptor"
1276        );
1277    }
1278
1279    Ok(())
1280}
1281
1282pub(crate) async fn register_bootstrap_candidate_with(
1283    rendezvous_manager: &RendezvousManager,
1284    lan_transport: &LanTransportService,
1285) -> Result<(), RendezvousManagerError> {
1286    let tcp_addrs = lan_transport.advertised_addrs();
1287    let websocket_addrs = lan_transport.websocket_addrs();
1288    let Some(address) = websocket_addrs
1289        .first()
1290        .cloned()
1291        .or_else(|| tcp_addrs.first().cloned())
1292    else {
1293        return Ok(());
1294    };
1295
1296    rendezvous_manager
1297        .register_bootstrap_candidate(address, None)
1298        .await
1299}
1300
1301fn require_published_lan_descriptor(
1302    result: crate::handlers::rendezvous::RendezvousResult,
1303    device_id: DeviceId,
1304) -> Result<RendezvousDescriptor, ServiceError> {
1305    if !result.success {
1306        return Err(ServiceError::startup_failed(
1307            "rendezvous_publish",
1308            result
1309                .error
1310                .unwrap_or_else(|| "LAN descriptor publish failed".to_string()),
1311        ));
1312    }
1313
1314    let descriptor = result.descriptor.ok_or_else(|| {
1315        ServiceError::startup_failed(
1316            "rendezvous_publish",
1317            "LAN descriptor publish succeeded without descriptor payload".to_string(),
1318        )
1319    })?;
1320
1321    Ok(RendezvousDescriptor {
1322        device_id: Some(device_id),
1323        ..descriptor
1324    })
1325}
1326
1327#[cfg(test)]
1328mod tests {
1329    use super::*;
1330    use crate::runtime::builder::EffectSystemBuilder;
1331    use crate::runtime::services::SyncManagerConfig;
1332    use aura_core::ContextId;
1333
1334    #[test]
1335    fn runtime_activity_gate_transitions_and_rejects_new_public_work() {
1336        let gate = RuntimeActivityGate::new();
1337        assert_eq!(gate.state(), RuntimeActivityState::Running);
1338        assert!(gate.ensure_accepting_public_operations().is_ok());
1339
1340        assert_eq!(gate.begin_shutdown(), RuntimeActivityState::Running);
1341        assert_eq!(gate.state(), RuntimeActivityState::Stopping);
1342        assert!(matches!(
1343            gate.ensure_accepting_public_operations(),
1344            Err(RuntimePublicOperationError::NotAccepting {
1345                state: RuntimeActivityState::Stopping
1346            })
1347        ));
1348
1349        assert_eq!(gate.begin_shutdown(), RuntimeActivityState::Stopping);
1350        gate.mark_stopped();
1351        assert_eq!(gate.state(), RuntimeActivityState::Stopped);
1352    }
1353
1354    #[test]
1355    fn sync_peer_reconcile_interval_follows_fast_sync_config() {
1356        let manager = SyncServiceManager::new(SyncManagerConfig {
1357            auto_sync_interval: Duration::from_secs(2),
1358            ..SyncManagerConfig::default()
1359        });
1360
1361        assert_eq!(
1362            sync_peer_reconcile_interval(&manager),
1363            Duration::from_secs(2)
1364        );
1365    }
1366
1367    #[test]
1368    fn sync_peer_reconcile_interval_clamps_large_values() {
1369        let manager = SyncServiceManager::new(SyncManagerConfig {
1370            auto_sync_interval: Duration::from_secs(120),
1371            ..SyncManagerConfig::default()
1372        });
1373
1374        assert_eq!(
1375            sync_peer_reconcile_interval(&manager),
1376            Duration::from_secs(30)
1377        );
1378    }
1379
1380    #[test]
1381    fn sync_peer_reconcile_interval_clamps_small_values() {
1382        let manager = SyncServiceManager::new(SyncManagerConfig {
1383            auto_sync_interval: Duration::from_millis(100),
1384            ..SyncManagerConfig::default()
1385        });
1386
1387        assert_eq!(
1388            sync_peer_reconcile_interval(&manager),
1389            Duration::from_secs(1)
1390        );
1391    }
1392
1393    #[test]
1394    fn runtime_services_include_runtime_maintenance() {
1395        let authority_id = AuthorityId::new_from_entropy([11u8; 32]);
1396        let runtime = EffectSystemBuilder::testing()
1397            .with_authority(authority_id)
1398            .build_sync()
1399            .expect("build_sync should succeed in testing mode");
1400
1401        let service_names = runtime
1402            .runtime_services_in_start_order()
1403            .expect("runtime services should sort cleanly")
1404            .into_iter()
1405            .map(RuntimeService::name)
1406            .collect::<Vec<_>>();
1407
1408        assert!(service_names.contains(&"runtime_maintenance"));
1409    }
1410
1411    #[test]
1412    fn lan_descriptor_publish_requires_descriptor_payload() {
1413        let context_id = ContextId::new_from_entropy([12u8; 32]);
1414        let device_id = DeviceId::new_from_entropy([14u8; 32]);
1415        let result = crate::handlers::rendezvous::RendezvousResult {
1416            success: true,
1417            context_id,
1418            peer: None,
1419            descriptor: None,
1420            error: None,
1421        };
1422
1423        let error = require_published_lan_descriptor(result, device_id)
1424            .expect_err("missing descriptor payload must fail closed");
1425
1426        assert!(error.to_string().contains("descriptor payload"));
1427        assert!(error.to_string().contains("rendezvous_publish"));
1428    }
1429
1430    #[test]
1431    fn lan_descriptor_publish_requires_success_result() {
1432        let context_id = ContextId::new_from_entropy([15u8; 32]);
1433        let device_id = DeviceId::new_from_entropy([16u8; 32]);
1434        let result = crate::handlers::rendezvous::RendezvousResult {
1435            success: false,
1436            context_id,
1437            peer: None,
1438            descriptor: None,
1439            error: Some("guard denied".to_string()),
1440        };
1441
1442        let error = require_published_lan_descriptor(result, device_id)
1443            .expect_err("failed publication must stay terminal");
1444
1445        assert!(error.to_string().contains("guard denied"));
1446        assert!(error.to_string().contains("rendezvous_publish"));
1447    }
1448
1449    #[test]
1450    fn lan_descriptor_publish_preserves_device_binding() {
1451        let authority_id = AuthorityId::new_from_entropy([17u8; 32]);
1452        let context_id = ContextId::new_from_entropy([18u8; 32]);
1453        let device_id = DeviceId::new_from_entropy([19u8; 32]);
1454        let result = crate::handlers::rendezvous::RendezvousResult {
1455            success: true,
1456            context_id,
1457            peer: None,
1458            descriptor: Some(RendezvousDescriptor {
1459                authority_id,
1460                device_id: None,
1461                context_id,
1462                transport_hints: vec![TransportHint::tcp_direct("127.0.0.1:7000").unwrap()],
1463                handshake_psk_commitment: [0u8; 32],
1464                public_key: [0u8; 32],
1465                valid_from: 1,
1466                valid_until: 2,
1467                nonce: [0u8; 32],
1468                nickname_suggestion: None,
1469            }),
1470            error: None,
1471        };
1472
1473        let descriptor = require_published_lan_descriptor(result, device_id)
1474            .expect("successful publish with payload should keep device binding");
1475
1476        assert_eq!(descriptor.device_id, Some(device_id));
1477        assert_eq!(descriptor.context_id, context_id);
1478        assert_eq!(descriptor.authority_id, authority_id);
1479    }
1480}