Skip to main content

aura_agent/runtime/services/
sync_manager.rs

1//! Sync Service Manager
2//!
3//! Wraps `aura_sync::SyncService` for integration with the agent runtime.
4//! Provides lifecycle management and configuration for automatic background sync.
5
6use super::config_profiles::impl_service_config_profiles;
7use super::service_actor::{
8    validate_actor_transition, ActorLifecyclePhase, ActorOwnedServiceRoot, ServiceActorHandle,
9};
10use super::traits::{RuntimeService, RuntimeServiceContext, ServiceError, ServiceHealth};
11use super::{ReconfigurationManager, ReconfigurationManagerError, SessionDelegationTransfer};
12use crate::core::default_context_id_for_authority;
13use crate::runtime::vm_host_bridge::{AuraVmHostWaitStatus, AuraVmRoundDisposition};
14use crate::runtime::{
15    open_owned_manifest_vm_session_admitted, AuraEffectSystem, RuntimeChoreographySessionId,
16    TaskGroup,
17};
18use async_trait::async_trait;
19use aura_core::effects::indexed::{IndexedFact, IndexedJournalEffects};
20use aura_core::effects::PhysicalTimeEffects;
21use aura_core::hash::hash;
22use aura_core::util::serialization::to_vec;
23use aura_core::{AuthorityId, DelegationReceipt, DeviceId, OwnershipCategory};
24use aura_protocol::effects::{ChoreographicRole, RoleIndex};
25use aura_sync::protocols::epoch_runners::EpochRotationProtocolRole;
26use aura_sync::protocols::{EpochCommit, EpochConfirmation, EpochRotationProposal};
27use aura_sync::services::{Service, SyncService, SyncServiceConfig};
28use aura_sync::verification::{MerkleVerifier, VerificationResult};
29use std::collections::BTreeMap;
30use std::sync::Arc;
31use std::time::Duration;
32use telltale_machine::StepResult;
33use thiserror::Error;
34use tokio::sync::{oneshot, Mutex};
35use uuid::Uuid;
36
37/// Configuration for the sync service manager
38#[derive(Debug, Clone)]
39pub struct SyncManagerConfig {
40    /// Enable automatic periodic sync
41    pub auto_sync_enabled: bool,
42
43    /// Interval between automatic sync rounds (default: 60s)
44    pub auto_sync_interval: Duration,
45
46    /// Maximum concurrent sync sessions
47    pub max_concurrent_syncs: usize,
48
49    /// Initial peers to sync with (can be empty if using discovery)
50    pub initial_peers: Vec<DeviceId>,
51
52    /// Enable periodic maintenance cleanup
53    pub maintenance_enabled: bool,
54
55    /// Interval between maintenance runs
56    pub maintenance_interval: Duration,
57
58    /// TTL for stale peer states
59    pub peer_state_ttl: Duration,
60
61    /// Maximum tracked peer states before pruning
62    pub max_peer_states: usize,
63}
64
65impl Default for SyncManagerConfig {
66    fn default() -> Self {
67        Self {
68            auto_sync_enabled: true,
69            auto_sync_interval: Duration::from_secs(60),
70            max_concurrent_syncs: 5,
71            initial_peers: Vec::new(),
72            maintenance_enabled: true,
73            maintenance_interval: Duration::from_secs(60),
74            peer_state_ttl: Duration::from_secs(6 * 60 * 60),
75            max_peer_states: 1024,
76        }
77    }
78}
79
80impl_service_config_profiles!(SyncManagerConfig {
81    /// Create config for testing (shorter intervals)
82    pub fn for_testing() -> Self {
83        Self {
84            auto_sync_enabled: true,
85            auto_sync_interval: Duration::from_secs(5),
86            max_concurrent_syncs: 3,
87            initial_peers: Vec::new(),
88            maintenance_enabled: true,
89            maintenance_interval: Duration::from_secs(5),
90            peer_state_ttl: Duration::from_secs(60),
91            max_peer_states: 128,
92        }
93    }
94
95    /// Create config with auto-sync disabled
96    pub fn manual_only() -> Self {
97        Self {
98            auto_sync_enabled: false,
99            auto_sync_interval: Duration::from_secs(60),
100            max_concurrent_syncs: 5,
101            initial_peers: Vec::new(),
102            maintenance_enabled: true,
103            maintenance_interval: Duration::from_secs(60),
104            peer_state_ttl: Duration::from_secs(6 * 60 * 60),
105            max_peer_states: 1024,
106        }
107    }
108});
109
110/// State of the sync service manager
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum SyncManagerState {
113    /// Service not yet started
114    Stopped,
115    /// Service starting up
116    Starting,
117    /// Service running and actively syncing
118    Running,
119    /// Service shutting down
120    Stopping,
121    /// Service hit an observable lifecycle failure.
122    Failed,
123}
124
125#[derive(Debug, Error)]
126pub enum SyncManagerError {
127    #[error(transparent)]
128    Service(#[from] ServiceError),
129    #[error("sync service not started")]
130    NotStarted,
131    #[error("sync operation failed: {0}")]
132    Sync(String),
133    #[error("epoch rotation delegation requires bundle evidence")]
134    DelegationRequiresBundleEvidence,
135    #[error("epoch rotation delegation failed")]
136    Delegation(#[source] ReconfigurationManagerError),
137    #[error("epoch rotation participant role required")]
138    ParticipantRoleRequired,
139    #[error("epoch rotation VM session open failed: {0}")]
140    VmSessionOpen(String),
141    #[error("epoch rotation VM advance failed: {0}")]
142    VmAdvance(String),
143    #[error("epoch rotation VM round handling failed: {0}")]
144    VmRoundHandling(String),
145    #[error("epoch proposal encode failed: {0}")]
146    ProposalEncode(String),
147    #[error("epoch confirmation encode failed: {0}")]
148    ConfirmationEncode(String),
149    #[error("epoch confirmation decode failed: {0}")]
150    ConfirmationDecode(String),
151    #[error("epoch commit encode failed: {0}")]
152    CommitEncode(String),
153    #[error("epoch rotation {role} VM timed out while waiting for receive")]
154    VmTimedOut { role: &'static str },
155    #[error("epoch rotation {role} VM cancelled while waiting for receive")]
156    VmCancelled { role: &'static str },
157    #[error("epoch rotation {role} VM became stuck without a pending receive")]
158    VmStuck { role: &'static str },
159}
160
161struct SyncState {
162    service: Arc<SyncService>,
163    peers: Vec<DeviceId>,
164}
165
166struct SyncManagerShared {
167    owner: ActorOwnedServiceRoot<SyncServiceManager, SyncCommand, SyncManagerState>,
168    configured_peers: Mutex<Vec<DeviceId>>,
169}
170
171#[derive(Clone)]
172struct SyncStateSnapshot {
173    service: Option<Arc<SyncService>>,
174    status: SyncManagerState,
175    peers: Vec<DeviceId>,
176}
177
178enum SyncCommand {
179    SnapshotState {
180        reply: oneshot::Sender<SyncStateSnapshot>,
181    },
182    AddPeer {
183        peer: DeviceId,
184        reply: oneshot::Sender<()>,
185    },
186    RemovePeer {
187        peer: DeviceId,
188        reply: oneshot::Sender<()>,
189    },
190}
191
192impl SyncState {
193    fn new_running(service: Arc<SyncService>, initial_peers: Vec<DeviceId>) -> Self {
194        Self {
195            service,
196            peers: initial_peers,
197        }
198    }
199}
200
201impl SyncManagerState {
202    fn phase(self) -> ActorLifecyclePhase {
203        match self {
204            Self::Stopped => ActorLifecyclePhase::Stopped,
205            Self::Starting => ActorLifecyclePhase::Starting,
206            Self::Running => ActorLifecyclePhase::Running,
207            Self::Stopping => ActorLifecyclePhase::Stopping,
208            Self::Failed => ActorLifecyclePhase::Failed,
209        }
210    }
211}
212
213/// Manager for background journal synchronization
214///
215/// Integrates `aura_sync::SyncService` into the agent runtime lifecycle.
216/// Handles startup, shutdown, and coordination with other agent services.
217#[aura_macros::actor_owned(
218    owner = "sync_service_manager",
219    domain = "sync",
220    gate = "sync_command_ingress",
221    command = SyncCommand,
222    capacity = 64,
223    category = "actor_owned"
224)]
225#[derive(Clone)]
226pub struct SyncServiceManager {
227    /// Configuration
228    config: SyncManagerConfig,
229    /// Optional Merkle verifier for fact sync (requires indexed journal)
230    merkle_verifier: Option<Arc<MerkleVerifier>>,
231    /// Shared lifecycle boundary for service-local mutable state.
232    shared: Arc<SyncManagerShared>,
233    /// Reconfiguration/session-footprint state for sync choreography delegation.
234    reconfiguration: ReconfigurationManager,
235}
236
237impl SyncServiceManager {
238    pub const OWNERSHIP_CATEGORY: OwnershipCategory = OwnershipCategory::ActorOwned;
239
240    async fn command_handle(
241        &self,
242    ) -> Result<ServiceActorHandle<SyncServiceManager, SyncCommand>, ServiceError> {
243        self.shared
244            .owner
245            .command_handle(
246                self.name(),
247                "sync command actor unavailable; service is not fully started",
248            )
249            .await
250    }
251
252    fn spawn_command_actor(
253        &self,
254        tasks: &TaskGroup,
255        mut state: SyncState,
256    ) -> ServiceActorHandle<SyncServiceManager, SyncCommand> {
257        let (commands, mut mailbox) =
258            ServiceActorHandle::<SyncServiceManager, SyncCommand>::bounded(self.name(), 64);
259
260        let _command_actor_handle = tasks.spawn_named("command_actor", async move {
261            while let Some(command) = mailbox.recv().await {
262                match command {
263                    SyncCommand::SnapshotState { reply } => {
264                        let _ = reply.send(SyncStateSnapshot {
265                            service: Some(state.service.clone()),
266                            status: SyncManagerState::Running,
267                            peers: state.peers.clone(),
268                        });
269                    }
270                    SyncCommand::AddPeer { peer, reply } => {
271                        if !state.peers.contains(&peer) {
272                            state.peers.push(peer);
273                            tracing::debug!("Added peer {} to sync manager", peer);
274                        }
275                        let _ = reply.send(());
276                    }
277                    SyncCommand::RemovePeer { peer, reply } => {
278                        state.peers.retain(|p| p != &peer);
279                        tracing::debug!("Removed peer {} from sync manager", peer);
280                        let _ = reply.send(());
281                    }
282                }
283            }
284        });
285
286        commands
287    }
288
289    async fn state_snapshot(&self) -> Result<SyncStateSnapshot, ServiceError> {
290        match self.command_handle().await {
291            Ok(commands) => {
292                commands
293                    .request(|reply| SyncCommand::SnapshotState { reply })
294                    .await
295            }
296            Err(_) => Ok(SyncStateSnapshot {
297                service: None,
298                status: self.shared.owner.state().await,
299                peers: self.shared.configured_peers.lock().await.clone(),
300            }),
301        }
302    }
303
304    fn shared_for_config(config: &SyncManagerConfig) -> Arc<SyncManagerShared> {
305        Arc::new(SyncManagerShared {
306            owner: ActorOwnedServiceRoot::new(SyncManagerState::Stopped),
307            configured_peers: Mutex::new(config.initial_peers.clone()),
308        })
309    }
310
311    /// Create a new sync service manager
312    pub fn new(config: SyncManagerConfig) -> Self {
313        Self {
314            config: config.clone(),
315            merkle_verifier: None,
316            shared: Self::shared_for_config(&config),
317            reconfiguration: ReconfigurationManager::new(),
318        }
319    }
320
321    /// Create a new sync service manager with indexed journal for Merkle verification
322    ///
323    /// This enables fact sync with cryptographic verification of facts using
324    /// Merkle trees and Bloom filters from the indexed journal.
325    pub fn with_indexed_journal(
326        config: SyncManagerConfig,
327        indexed_journal: Arc<dyn IndexedJournalEffects + Send + Sync>,
328        time: Arc<dyn PhysicalTimeEffects>,
329    ) -> Self {
330        Self {
331            config: config.clone(),
332            merkle_verifier: Some(Arc::new(MerkleVerifier::new(indexed_journal, time))),
333            shared: Self::shared_for_config(&config),
334            reconfiguration: ReconfigurationManager::new(),
335        }
336    }
337
338    /// Create with default configuration
339    pub fn with_defaults() -> Self {
340        Self::new(SyncManagerConfig::default())
341    }
342
343    /// Get the current state
344    pub async fn state(&self) -> SyncManagerState {
345        self.state_snapshot()
346            .await
347            .map(|snapshot| snapshot.status)
348            .unwrap_or(self.shared.owner.state().await)
349    }
350
351    /// Check if the service is running
352    pub async fn is_running(&self) -> bool {
353        self.state().await == SyncManagerState::Running
354    }
355
356    async fn start_managed(&self, context: &RuntimeServiceContext) -> Result<(), ServiceError> {
357        let _lifecycle_guard = self.shared.owner.lifecycle().lock().await;
358        let current_state = self.shared.owner.state().await;
359        if current_state == SyncManagerState::Running {
360            return Ok(());
361        }
362        validate_actor_transition(
363            self.name(),
364            current_state.phase(),
365            ActorLifecyclePhase::Starting,
366        )?;
367
368        self.shared
369            .owner
370            .set_state(SyncManagerState::Starting)
371            .await;
372
373        // Build aura-sync service config from our config
374        let sync_config = SyncServiceConfig {
375            auto_sync_enabled: self.config.auto_sync_enabled,
376            auto_sync_interval: self.config.auto_sync_interval,
377            max_concurrent_syncs: self.config.max_concurrent_syncs as u32,
378            ..Default::default()
379        };
380
381        // Create the underlying sync service
382        let now_instant = SyncService::monotonic_now();
383        let service = match SyncService::new(sync_config, context.time_effects(), now_instant).await
384        {
385            Ok(service) => service,
386            Err(error) => {
387                self.shared.owner.set_state(SyncManagerState::Failed).await;
388                return Err(ServiceError::startup_failed(
389                    "sync_service",
390                    error.to_string(),
391                ));
392            }
393        };
394
395        // Start the service
396        if let Err(error) = service.start(now_instant).await {
397            self.shared.owner.set_state(SyncManagerState::Failed).await;
398            return Err(ServiceError::startup_failed(
399                "sync_service",
400                error.to_string(),
401            ));
402        }
403
404        let initial_peers = self.shared.configured_peers.lock().await.clone();
405        let maintenance_group = context.tasks().group(self.name());
406        let command_handle = self.spawn_command_actor(
407            &maintenance_group,
408            SyncState::new_running(Arc::new(service), initial_peers),
409        );
410        self.shared.owner.install_commands(command_handle).await;
411        self.spawn_maintenance_task(maintenance_group.clone(), context.time_effects());
412        self.shared.owner.set_state(SyncManagerState::Running).await;
413        self.shared.owner.install_tasks(maintenance_group).await;
414
415        tracing::info!(
416            event = "runtime.service.sync.started",
417            service = self.name(),
418            "Sync service manager started"
419        );
420        Ok(())
421    }
422
423    async fn stop_managed(&self) -> Result<(), ServiceError> {
424        let _lifecycle_guard = self.shared.owner.lifecycle().lock().await;
425        let current_state = self.shared.owner.state().await;
426        if current_state == SyncManagerState::Stopped {
427            return Ok(());
428        }
429        validate_actor_transition(
430            self.name(),
431            current_state.phase(),
432            ActorLifecyclePhase::Stopping,
433        )?;
434
435        self.shared
436            .owner
437            .set_state(SyncManagerState::Stopping)
438            .await;
439
440        let snapshot = self.state_snapshot().await.ok();
441        if let Some(snapshot) = snapshot.as_ref() {
442            *self.shared.configured_peers.lock().await = snapshot.peers.clone();
443        }
444        let service = snapshot.and_then(|snapshot| snapshot.service);
445        self.shared.owner.take_commands().await;
446
447        let maintenance_shutdown_error =
448            if let Some(task_group) = self.shared.owner.take_tasks().await {
449                match task_group
450                    .shutdown_with_timeout(Duration::from_secs(2))
451                    .await
452                {
453                    Ok(()) => None,
454                    Err(crate::task_registry::TaskSupervisionError::ForcedAbort {
455                        aborted_tasks,
456                        ..
457                    }) => {
458                        tracing::warn!(
459                            service = self.name(),
460                            aborted_tasks = ?aborted_tasks,
461                            "Sync service stop force-aborted owned background tasks"
462                        );
463                        None
464                    }
465                    Err(error) => Some(ServiceError::shutdown_failed(
466                        self.name(),
467                        format!("failed to stop maintenance task group: {error}"),
468                    )),
469                }
470            } else {
471                None
472            };
473
474        // Stop the underlying service
475        if let Some(service) = service.as_ref() {
476            let now_instant = SyncService::monotonic_now();
477            if let Err(error) = service.stop(now_instant).await {
478                self.shared.owner.set_state(SyncManagerState::Failed).await;
479                return Err(ServiceError::shutdown_failed(
480                    self.name(),
481                    error.to_string(),
482                ));
483            }
484        }
485
486        self.shared.owner.set_state(SyncManagerState::Stopped).await;
487
488        tracing::info!(
489            event = "runtime.service.sync.stopped",
490            service = self.name(),
491            "Sync service manager stopped"
492        );
493        match maintenance_shutdown_error {
494            Some(error) => {
495                self.shared.owner.set_state(SyncManagerState::Failed).await;
496                Err(error)
497            }
498            None => Ok(()),
499        }
500    }
501
502    /// Start background maintenance task for pruning long-lived state.
503    fn spawn_maintenance_task(
504        &self,
505        tasks: TaskGroup,
506        time_effects: Arc<dyn PhysicalTimeEffects + Send + Sync>,
507    ) {
508        if !self.config.maintenance_enabled {
509            tracing::debug!("Sync maintenance task disabled by configuration");
510            return;
511        }
512
513        let interval = self.config.maintenance_interval;
514        let peer_state_ttl = self.config.peer_state_ttl;
515        let max_peer_states = self.config.max_peer_states;
516        let manager = self.clone();
517
518        let _maintenance_task_handle = tasks.spawn_interval_until_named(
519            "sync.maintenance",
520            time_effects.clone(),
521            interval,
522            move || {
523                let manager = manager.clone();
524                let time_effects = time_effects.clone();
525                async move {
526                    let snapshot = match manager.state_snapshot().await {
527                        Ok(snapshot) => snapshot,
528                        Err(error) => {
529                            tracing::debug!(
530                                error = %error,
531                                "Sync maintenance skipped because command actor is unavailable"
532                            );
533                            return false;
534                        }
535                    };
536                    let status = snapshot.status;
537                    if matches!(
538                        status,
539                        SyncManagerState::Stopped
540                            | SyncManagerState::Stopping
541                            | SyncManagerState::Failed
542                    ) {
543                        return false;
544                    }
545
546                    let now_ms = match time_effects.physical_time().await {
547                        Ok(t) => t.ts_ms,
548                        Err(e) => {
549                            tracing::warn!("Sync maintenance: failed to get time: {}", e);
550                            return true;
551                        }
552                    };
553
554                    if let Some(service) = snapshot.service {
555                        if let Err(e) = service
556                            .maintenance_cleanup(
557                                now_ms,
558                                peer_state_ttl.as_millis() as u64,
559                                max_peer_states,
560                            )
561                            .await
562                        {
563                            tracing::warn!("Sync maintenance failed: {}", e);
564                        }
565                    }
566
567                    true
568                }
569            },
570        );
571    }
572
573    /// Perform a manual sync with specific peers
574    ///
575    /// # Arguments
576    /// - `effects`: Effect system providing journal, network, and time capabilities
577    /// - `peers`: List of peers to sync with
578    pub async fn sync_with_peers<E>(
579        &self,
580        effects: &E,
581        peers: Vec<DeviceId>,
582    ) -> Result<(), SyncManagerError>
583    where
584        E: aura_core::effects::JournalEffects
585            + aura_core::effects::NetworkEffects
586            + aura_core::effects::PhysicalTimeEffects
587            + aura_guards::GuardContextProvider
588            + Send
589            + Sync,
590    {
591        let service = self
592            .state_snapshot()
593            .await
594            .map_err(SyncManagerError::from)?
595            .service
596            .clone()
597            .ok_or(SyncManagerError::NotStarted)?;
598
599        let now_instant = SyncService::monotonic_now();
600        service
601            .sync_with_peers(effects, peers, now_instant)
602            .await
603            .map_err(|error| SyncManagerError::Sync(error.to_string()))
604    }
605
606    /// Add a peer to the known peers list
607    pub async fn add_peer(&self, peer: DeviceId) {
608        if let Ok(commands) = self.command_handle().await {
609            let _ = commands
610                .request(|reply| SyncCommand::AddPeer { peer, reply })
611                .await;
612        } else {
613            let mut peers = self.shared.configured_peers.lock().await;
614            if !peers.contains(&peer) {
615                peers.push(peer);
616                tracing::debug!("Added peer {} to sync manager", peer);
617            }
618        }
619    }
620
621    /// Remove a peer from the known peers list
622    pub async fn remove_peer(&self, peer: &DeviceId) {
623        if let Ok(commands) = self.command_handle().await {
624            let _ = commands
625                .request(|reply| SyncCommand::RemovePeer { peer: *peer, reply })
626                .await;
627        } else {
628            let mut peers = self.shared.configured_peers.lock().await;
629            peers.retain(|p| p != peer);
630            tracing::debug!("Removed peer {} from sync manager", peer);
631        }
632    }
633
634    /// Get the list of known peers
635    pub async fn peers(&self) -> Vec<DeviceId> {
636        self.state_snapshot()
637            .await
638            .map(|snapshot| snapshot.peers)
639            .unwrap_or_default()
640    }
641
642    /// Get service health information
643    pub async fn sync_service_health(&self) -> Option<aura_sync::services::SyncServiceHealth> {
644        self.state_snapshot()
645            .await
646            .ok()
647            .and_then(|snapshot| snapshot.service.map(|s| s.get_health()))
648    }
649
650    /// Get service metrics
651    pub async fn metrics(&self) -> Option<aura_sync::services::ServiceMetrics> {
652        self.state_snapshot()
653            .await
654            .ok()
655            .and_then(|snapshot| snapshot.service.map(|s| s.get_metrics()))
656    }
657
658    /// Get the configuration
659    pub fn config(&self) -> &SyncManagerConfig {
660        &self.config
661    }
662
663    // =========================================================================
664    // Merkle Verification Methods
665    // =========================================================================
666
667    /// Check if Merkle verification is available
668    ///
669    /// Returns `true` if the manager was created with an indexed journal,
670    /// enabling cryptographic fact verification.
671    pub fn has_merkle_verification(&self) -> bool {
672        self.merkle_verifier.is_some()
673    }
674
675    /// Get the local Merkle root for exchange with peers
676    ///
677    /// Returns `None` if Merkle verification is not enabled (no indexed journal).
678    /// The root represents the current state of the local fact journal and can
679    /// be compared with remote roots to determine if synchronization is needed.
680    pub async fn local_merkle_root(&self) -> Option<[u8; 32]> {
681        if let Some(ref verifier) = self.merkle_verifier {
682            verifier.local_merkle_root().await.ok()
683        } else {
684            None
685        }
686    }
687
688    /// Verify incoming facts against the local Merkle tree
689    ///
690    /// Returns `None` if Merkle verification is not enabled.
691    /// Otherwise returns the verification result containing:
692    /// - `verified`: Facts that passed verification
693    /// - `rejected`: Facts that failed verification with reasons
694    /// - `merkle_root`: Current local Merkle root after verification
695    pub async fn verify_facts(
696        &self,
697        facts: Vec<IndexedFact>,
698        claimed_root: [u8; 32],
699    ) -> Option<VerificationResult> {
700        if let Some(ref verifier) = self.merkle_verifier {
701            verifier
702                .verify_incoming_facts(facts, claimed_root)
703                .await
704                .ok()
705        } else {
706            None
707        }
708    }
709
710    /// Get the internal Merkle verifier reference
711    ///
712    /// Returns `None` if Merkle verification is not enabled.
713    /// Use this for direct access to verification operations like
714    /// `compare_roots()` or `local_bloom_filter()`.
715    pub fn merkle_verifier(&self) -> Option<&Arc<MerkleVerifier>> {
716        self.merkle_verifier.as_ref()
717    }
718}
719
720impl Default for SyncServiceManager {
721    fn default() -> Self {
722        Self::with_defaults()
723    }
724}
725
726// =============================================================================
727// RuntimeService Implementation
728// =============================================================================
729
730#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
731#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
732impl RuntimeService for SyncServiceManager {
733    fn name(&self) -> &'static str {
734        "sync_service"
735    }
736
737    fn dependencies(&self) -> &[&'static str] {
738        &["indexed_journal", "transport"]
739    }
740
741    async fn start(&self, context: &RuntimeServiceContext) -> Result<(), ServiceError> {
742        self.start_managed(context).await
743    }
744
745    async fn stop(&self) -> Result<(), ServiceError> {
746        self.stop_managed().await
747    }
748
749    async fn health(&self) -> ServiceHealth {
750        self.health_async().await
751    }
752}
753
754/// Async health check implementation
755impl SyncServiceManager {
756    /// Get the service health status asynchronously
757    ///
758    /// This provides full health information including underlying service health.
759    /// For the synchronous `RuntimeService::health()`, a simplified status is returned.
760    pub async fn health_async(&self) -> ServiceHealth {
761        let state = self.state().await;
762        match state {
763            SyncManagerState::Stopped => ServiceHealth::Stopped,
764            SyncManagerState::Starting => ServiceHealth::Starting,
765            SyncManagerState::Stopping => ServiceHealth::Stopping,
766            SyncManagerState::Failed => ServiceHealth::Unhealthy {
767                reason: "service entered failed lifecycle state".to_string(),
768            },
769            SyncManagerState::Running => {
770                // Check underlying service health
771                if let Some(svc_health) = self.sync_service_health().await {
772                    use aura_sync::services::HealthStatus;
773                    match svc_health.status {
774                        HealthStatus::Healthy => ServiceHealth::Healthy,
775                        HealthStatus::Degraded | HealthStatus::Starting => {
776                            ServiceHealth::Degraded {
777                                reason: format!(
778                                    "status={:?}, active_sessions={}",
779                                    svc_health.status, svc_health.active_sessions
780                                ),
781                            }
782                        }
783                        HealthStatus::Unhealthy | HealthStatus::Stopping => {
784                            ServiceHealth::Unhealthy {
785                                reason: format!("status={:?}", svc_health.status),
786                            }
787                        }
788                    }
789                } else {
790                    ServiceHealth::Unhealthy {
791                        reason: "underlying service not available".to_string(),
792                    }
793                }
794            }
795        }
796    }
797}
798
799// =============================================================================
800// Choreography Wiring (execute_as)
801// =============================================================================
802
803impl SyncServiceManager {
804    fn epoch_role(authority_id: AuthorityId, role_index: u16) -> ChoreographicRole {
805        ChoreographicRole::for_authority(
806            authority_id,
807            RoleIndex::new(role_index.into()).expect("role index"),
808        )
809    }
810
811    /// Execute epoch rotation protocol as coordinator.
812    pub async fn execute_epoch_rotation_coordinator(
813        &self,
814        effects: Arc<AuraEffectSystem>,
815        coordinator_id: AuthorityId,
816        participant1_id: AuthorityId,
817        participant2_id: AuthorityId,
818        proposal: EpochRotationProposal,
819        commit: EpochCommit,
820    ) -> Result<(), SyncManagerError> {
821        let session_id = epoch_rotation_session_id(&proposal.rotation_id);
822        self.record_native_epoch_session(coordinator_id, session_id)
823            .await;
824        let roles = vec![
825            Self::epoch_role(coordinator_id, 0),
826            Self::epoch_role(participant1_id, 0),
827            Self::epoch_role(participant2_id, 0),
828        ];
829        let peer_roles = BTreeMap::from([
830            (
831                "Participant1".to_string(),
832                Self::epoch_role(participant1_id, 0),
833            ),
834            (
835                "Participant2".to_string(),
836                Self::epoch_role(participant2_id, 0),
837            ),
838        ]);
839        let manifest =
840            aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::composition_manifest();
841        let global_type =
842            aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::global_type();
843        let local_types =
844            aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::local_types();
845
846        let result = async {
847            let mut session = open_owned_manifest_vm_session_admitted(
848                effects.clone(),
849                session_id,
850                roles,
851                &manifest,
852                "Coordinator",
853                &global_type,
854                &local_types,
855                crate::runtime::AuraVmSchedulerSignals::default(),
856            )
857            .await
858            .map_err(|error| SyncManagerError::VmSessionOpen(error.to_string()))?;
859            session.queue_send_bytes(
860                to_vec(&proposal)
861                    .map_err(|error| SyncManagerError::ProposalEncode(error.to_string()))?,
862            );
863            let mut confirmations = Vec::new();
864            let mut commit_queued = false;
865
866            let loop_result = loop {
867                let round = session
868                    .advance_round("Coordinator", &peer_roles)
869                    .await
870                    .map_err(|error| SyncManagerError::VmAdvance(error.to_string()))?;
871
872                if let Some(blocked) = round.blocked_receive {
873                    let confirmation: EpochConfirmation =
874                        aura_core::util::serialization::from_slice(&blocked.payload).map_err(
875                            |error| SyncManagerError::ConfirmationDecode(error.to_string()),
876                        )?;
877                    confirmations.push(confirmation);
878                    if !commit_queued && confirmations.len() == 2 {
879                        let payload = to_vec(&commit)
880                            .map_err(|error| SyncManagerError::CommitEncode(error.to_string()))?;
881                        session.queue_send_bytes(payload.clone());
882                        session.queue_send_bytes(payload);
883                        commit_queued = true;
884                    }
885                    session
886                        .inject_blocked_receive(&blocked)
887                        .map_err(|error| SyncManagerError::VmRoundHandling(error.to_string()))?;
888                    continue;
889                }
890
891                match round.host_wait_status {
892                    AuraVmHostWaitStatus::Idle => {}
893                    AuraVmHostWaitStatus::TimedOut => {
894                        break Err(SyncManagerError::VmTimedOut {
895                            role: "coordinator",
896                        });
897                    }
898                    AuraVmHostWaitStatus::Cancelled => {
899                        break Err(SyncManagerError::VmCancelled {
900                            role: "coordinator",
901                        });
902                    }
903                    AuraVmHostWaitStatus::Deferred | AuraVmHostWaitStatus::Delivered => {}
904                }
905
906                match round.step {
907                    StepResult::AllDone => break Ok(()),
908                    StepResult::Continue => {}
909                    StepResult::Stuck => {
910                        break Err(SyncManagerError::VmStuck {
911                            role: "coordinator",
912                        });
913                    }
914                }
915            };
916            let _ = session.close().await;
917            loop_result
918        }
919        .await;
920        result
921    }
922
923    /// Execute epoch rotation protocol as participant.
924    pub async fn execute_epoch_rotation_participant(
925        &self,
926        effects: Arc<AuraEffectSystem>,
927        role: EpochRotationProtocolRole,
928        coordinator_id: AuthorityId,
929        participant1_id: AuthorityId,
930        participant2_id: AuthorityId,
931        confirmation: EpochConfirmation,
932    ) -> Result<(), SyncManagerError> {
933        let participant_id = match role {
934            EpochRotationProtocolRole::Participant1 => participant1_id,
935            EpochRotationProtocolRole::Participant2 => participant2_id,
936            EpochRotationProtocolRole::Coordinator => {
937                return Err(SyncManagerError::ParticipantRoleRequired)
938            }
939        };
940        let session_id = epoch_rotation_session_id(&confirmation.rotation_id);
941        self.record_native_epoch_session(participant_id, session_id)
942            .await;
943        let active_role_name = match role {
944            EpochRotationProtocolRole::Participant1 => "Participant1",
945            EpochRotationProtocolRole::Participant2 => "Participant2",
946            EpochRotationProtocolRole::Coordinator => unreachable!(),
947        };
948        let roles = vec![
949            Self::epoch_role(coordinator_id, 0),
950            Self::epoch_role(participant_id, 0),
951        ];
952        let peer_roles = BTreeMap::from([(
953            "Coordinator".to_string(),
954            Self::epoch_role(coordinator_id, 0),
955        )]);
956        let manifest =
957            aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::composition_manifest();
958        let global_type =
959            aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::global_type();
960        let local_types =
961            aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::local_types();
962
963        let result = async {
964            let mut session = open_owned_manifest_vm_session_admitted(
965                effects.clone(),
966                session_id,
967                roles,
968                &manifest,
969                active_role_name,
970                &global_type,
971                &local_types,
972                crate::runtime::AuraVmSchedulerSignals::default(),
973            )
974            .await
975            .map_err(|error| SyncManagerError::VmSessionOpen(error.to_string()))?;
976            session.queue_send_bytes(
977                to_vec(&confirmation)
978                    .map_err(|error| SyncManagerError::ConfirmationEncode(error.to_string()))?,
979            );
980
981            let loop_result = loop {
982                let round = session
983                    .advance_round(active_role_name, &peer_roles)
984                    .await
985                    .map_err(|error| SyncManagerError::VmAdvance(error.to_string()))?;
986
987                match crate::runtime::handle_owned_vm_round(
988                    &mut session,
989                    round,
990                    "epoch rotation participant VM",
991                )
992                .map_err(|error| SyncManagerError::VmRoundHandling(error.to_string()))?
993                {
994                    AuraVmRoundDisposition::Continue => {}
995                    AuraVmRoundDisposition::Complete => break Ok(()),
996                }
997            };
998            let _ = session.close().await;
999            loop_result
1000        }
1001        .await;
1002        result
1003    }
1004
1005    /// Delegate ownership of an epoch-rotation session to another authority.
1006    ///
1007    /// This updates runtime session footprints and records a relational protocol fact.
1008    pub async fn delegate_epoch_rotation_session(
1009        &self,
1010        effects: Arc<AuraEffectSystem>,
1011        rotation_id: &str,
1012        from_authority: AuthorityId,
1013        to_authority: AuthorityId,
1014        bundle_id: Option<String>,
1015    ) -> Result<DelegationReceipt, SyncManagerError> {
1016        let session_id =
1017            RuntimeChoreographySessionId::from_uuid(epoch_rotation_session_id(rotation_id))
1018                .into_aura_session_id();
1019        self.reconfiguration
1020            .record_native_session(from_authority, session_id)
1021            .await;
1022        self.reconfiguration
1023            .delegate_session(
1024                &effects,
1025                SessionDelegationTransfer::new(
1026                    session_id,
1027                    from_authority,
1028                    to_authority,
1029                    bundle_id.ok_or(SyncManagerError::DelegationRequiresBundleEvidence)?,
1030                )
1031                .with_context(default_context_id_for_authority(from_authority)),
1032            )
1033            .await
1034            .map(|outcome| outcome.receipt)
1035            .map_err(SyncManagerError::Delegation)
1036    }
1037
1038    async fn record_native_epoch_session(&self, authority_id: AuthorityId, session_uuid: Uuid) {
1039        let session_id =
1040            RuntimeChoreographySessionId::from_uuid(session_uuid).into_aura_session_id();
1041        self.reconfiguration
1042            .record_native_session(authority_id, session_id)
1043            .await;
1044    }
1045}
1046
1047fn epoch_rotation_session_id(rotation_id: &str) -> Uuid {
1048    let digest = hash(rotation_id.as_bytes());
1049    let mut bytes = [0u8; 16];
1050    bytes.copy_from_slice(&digest[..16]);
1051    Uuid::from_bytes(bytes)
1052}
1053
1054#[cfg(test)]
1055#[allow(clippy::disallowed_types)]
1056mod tests {
1057    use super::*;
1058    use async_trait::async_trait;
1059    use aura_core::domain::journal::FactValue;
1060    use aura_core::effects::indexed::{FactId, FactStreamReceiver, IndexStats};
1061    use aura_core::effects::{BloomConfig, BloomFilter};
1062    use aura_core::AuthorityId;
1063    use aura_effects::time::PhysicalTimeHandler;
1064    use std::sync::Mutex;
1065
1066    fn test_service_context() -> RuntimeServiceContext {
1067        RuntimeServiceContext::new(
1068            Arc::new(crate::runtime::TaskSupervisor::new()),
1069            Arc::new(PhysicalTimeHandler::new()),
1070        )
1071    }
1072
1073    /// Mock indexed journal for testing
1074    struct MockIndexedJournal {
1075        root: Mutex<[u8; 32]>,
1076        facts: Mutex<Vec<IndexedFact>>,
1077    }
1078
1079    impl MockIndexedJournal {
1080        fn new(root: [u8; 32]) -> Self {
1081            Self {
1082                root: Mutex::new(root),
1083                facts: Mutex::new(Vec::new()),
1084            }
1085        }
1086    }
1087
1088    #[async_trait]
1089    impl IndexedJournalEffects for MockIndexedJournal {
1090        fn watch_facts(&self) -> Box<dyn FactStreamReceiver> {
1091            panic!("Not implemented for mock")
1092        }
1093
1094        async fn facts_by_predicate(
1095            &self,
1096            _predicate: &str,
1097        ) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1098            Ok(Vec::new())
1099        }
1100
1101        async fn facts_by_authority(
1102            &self,
1103            _authority: &AuthorityId,
1104        ) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1105            Ok(Vec::new())
1106        }
1107
1108        async fn facts_in_range(
1109            &self,
1110            _start: aura_core::time::TimeStamp,
1111            _end: aura_core::time::TimeStamp,
1112        ) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1113            Ok(Vec::new())
1114        }
1115
1116        async fn all_facts(&self) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1117            Ok(self.facts.lock().unwrap().clone())
1118        }
1119
1120        fn might_contain(&self, _predicate: &str, _value: &FactValue) -> bool {
1121            false
1122        }
1123
1124        async fn merkle_root(&self) -> Result<[u8; 32], aura_core::AuraError> {
1125            Ok(*self.root.lock().unwrap())
1126        }
1127
1128        async fn verify_fact_inclusion(
1129            &self,
1130            fact: &IndexedFact,
1131        ) -> Result<bool, aura_core::AuraError> {
1132            let facts = self.facts.lock().unwrap();
1133            Ok(facts.iter().any(|f| f.id == fact.id))
1134        }
1135
1136        async fn get_bloom_filter(&self) -> Result<BloomFilter, aura_core::AuraError> {
1137            BloomFilter::new(BloomConfig::for_sync(100))
1138        }
1139
1140        async fn index_stats(&self) -> Result<IndexStats, aura_core::AuraError> {
1141            let facts = self.facts.lock().unwrap();
1142            Ok(IndexStats {
1143                fact_count: facts.len() as u64,
1144                predicate_count: 1,
1145                authority_count: 1,
1146                bloom_fp_rate: 0.01,
1147                merkle_depth: 10,
1148            })
1149        }
1150    }
1151
1152    #[tokio::test]
1153    async fn test_sync_manager_creation() {
1154        let config = SyncManagerConfig::for_testing();
1155        let manager = SyncServiceManager::new(config);
1156
1157        assert_eq!(manager.state().await, SyncManagerState::Stopped);
1158        assert!(!manager.is_running().await);
1159    }
1160
1161    #[tokio::test]
1162    async fn test_sync_manager_lifecycle() {
1163        let config = SyncManagerConfig::for_testing();
1164        let manager = SyncServiceManager::new(config);
1165        let context = test_service_context();
1166
1167        // Start
1168        RuntimeService::start(&manager, &context).await.unwrap();
1169        assert!(manager.is_running().await);
1170
1171        // Stop
1172        RuntimeService::stop(&manager).await.unwrap();
1173        assert!(!manager.is_running().await);
1174    }
1175
1176    #[tokio::test]
1177    async fn test_sync_manager_concurrent_lifecycle_transitions_are_idempotent() {
1178        let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1179        let context = test_service_context();
1180
1181        let start_a = RuntimeService::start(&manager, &context);
1182        let start_b = RuntimeService::start(&manager, &context);
1183        let (start_a, start_b) = tokio::join!(start_a, start_b);
1184        start_a.expect("first concurrent start should succeed");
1185        start_b.expect("second concurrent start should be idempotent");
1186        assert!(manager.is_running().await);
1187
1188        let stop_a = RuntimeService::stop(&manager);
1189        let stop_b = RuntimeService::stop(&manager);
1190        let (stop_a, stop_b) = tokio::join!(stop_a, stop_b);
1191        stop_a.expect("first concurrent stop should succeed");
1192        stop_b.expect("second concurrent stop should be idempotent");
1193        assert!(!manager.is_running().await);
1194    }
1195
1196    #[tokio::test]
1197    async fn test_sync_manager_stop_drains_owned_tasks() {
1198        let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1199        let context = test_service_context();
1200
1201        RuntimeService::start(&manager, &context).await.unwrap();
1202        let task_group = manager
1203            .shared
1204            .owner
1205            .task_group()
1206            .await
1207            .expect("running sync service should own a maintenance task group");
1208
1209        RuntimeService::stop(&manager).await.unwrap();
1210        task_group
1211            .wait_for_idle(Duration::from_secs(1))
1212            .await
1213            .expect("sync task group should drain on stop");
1214        assert!(
1215            task_group.active_tasks().is_empty(),
1216            "sync task group should not leak owned tasks after stop"
1217        );
1218    }
1219
1220    #[tokio::test]
1221    async fn test_sync_manager_peer_management() {
1222        let manager = SyncServiceManager::with_defaults();
1223
1224        let peer1 = DeviceId::new_from_entropy([1u8; 32]);
1225        let peer2 = DeviceId::new_from_entropy([2u8; 32]);
1226
1227        // Add peers
1228        manager.add_peer(peer1).await;
1229        manager.add_peer(peer2).await;
1230
1231        let peers = manager.peers().await;
1232        assert_eq!(peers.len(), 2);
1233        assert!(peers.contains(&peer1));
1234        assert!(peers.contains(&peer2));
1235
1236        // Remove peer
1237        manager.remove_peer(&peer1).await;
1238        let peers = manager.peers().await;
1239        assert_eq!(peers.len(), 1);
1240        assert!(!peers.contains(&peer1));
1241        assert!(peers.contains(&peer2));
1242    }
1243
1244    #[tokio::test]
1245    async fn test_sync_manager_health_when_not_running() {
1246        let manager = SyncServiceManager::with_defaults();
1247
1248        // Health should be None when not running
1249        assert!(manager.sync_service_health().await.is_none());
1250    }
1251
1252    #[tokio::test]
1253    async fn test_sync_manager_health_when_running() {
1254        let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1255        let context = test_service_context();
1256
1257        RuntimeService::start(&manager, &context).await.unwrap();
1258
1259        // Health should be available when running
1260        let health = manager.sync_service_health().await;
1261        assert!(health.is_some());
1262
1263        RuntimeService::stop(&manager).await.unwrap();
1264    }
1265
1266    #[tokio::test]
1267    async fn test_sync_manager_without_merkle_verification() {
1268        let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1269
1270        // Manager without indexed journal should not have Merkle verification
1271        assert!(!manager.has_merkle_verification());
1272        assert!(manager.local_merkle_root().await.is_none());
1273        assert!(manager.verify_facts(vec![], [0u8; 32]).await.is_none());
1274        assert!(manager.merkle_verifier().is_none());
1275    }
1276
1277    #[tokio::test]
1278    async fn test_sync_manager_with_merkle_verification() {
1279        let root = [42u8; 32];
1280        let journal = Arc::new(MockIndexedJournal::new(root));
1281        let time = Arc::new(PhysicalTimeHandler::new());
1282        let manager = SyncServiceManager::with_indexed_journal(
1283            SyncManagerConfig::for_testing(),
1284            journal,
1285            time,
1286        );
1287
1288        // Manager with indexed journal should have Merkle verification
1289        assert!(manager.has_merkle_verification());
1290        assert!(manager.merkle_verifier().is_some());
1291
1292        // Should return local Merkle root
1293        let local_root = manager.local_merkle_root().await;
1294        assert_eq!(local_root, Some(root));
1295    }
1296
1297    #[tokio::test]
1298    async fn test_sync_manager_verify_facts() {
1299        let root = [42u8; 32];
1300        let journal = Arc::new(MockIndexedJournal::new(root));
1301        let time = Arc::new(PhysicalTimeHandler::new());
1302        let manager = SyncServiceManager::with_indexed_journal(
1303            SyncManagerConfig::for_testing(),
1304            journal,
1305            time,
1306        );
1307
1308        // Create test fact with required authority
1309        let fact = IndexedFact {
1310            id: FactId(1),
1311            predicate: "test".to_string(),
1312            value: FactValue::String("test_value".to_string()),
1313            authority: Some(AuthorityId::new_from_entropy([1u8; 32])),
1314            timestamp: None,
1315        };
1316
1317        // Verify facts returns a result
1318        let result = manager.verify_facts(vec![fact], root).await;
1319        assert!(result.is_some());
1320
1321        let result = result.unwrap();
1322        // New fact should be accepted for merge
1323        assert_eq!(result.verified.len(), 1);
1324        assert!(result.rejected.is_empty());
1325    }
1326}