1use super::config_profiles::impl_service_config_profiles;
7use super::service_actor::{
8 validate_actor_transition, ActorLifecyclePhase, ActorOwnedServiceRoot, ServiceActorHandle,
9};
10use super::traits::{RuntimeService, RuntimeServiceContext, ServiceError, ServiceHealth};
11use super::{ReconfigurationManager, ReconfigurationManagerError, SessionDelegationTransfer};
12use crate::core::default_context_id_for_authority;
13use crate::runtime::vm_host_bridge::{AuraVmHostWaitStatus, AuraVmRoundDisposition};
14use crate::runtime::{
15 open_owned_manifest_vm_session_admitted, AuraEffectSystem, RuntimeChoreographySessionId,
16 TaskGroup,
17};
18use async_trait::async_trait;
19use aura_core::effects::indexed::{IndexedFact, IndexedJournalEffects};
20use aura_core::effects::PhysicalTimeEffects;
21use aura_core::hash::hash;
22use aura_core::util::serialization::to_vec;
23use aura_core::{AuthorityId, DelegationReceipt, DeviceId, OwnershipCategory};
24use aura_protocol::effects::{ChoreographicRole, RoleIndex};
25use aura_sync::protocols::epoch_runners::EpochRotationProtocolRole;
26use aura_sync::protocols::{EpochCommit, EpochConfirmation, EpochRotationProposal};
27use aura_sync::services::{Service, SyncService, SyncServiceConfig};
28use aura_sync::verification::{MerkleVerifier, VerificationResult};
29use std::collections::BTreeMap;
30use std::sync::Arc;
31use std::time::Duration;
32use telltale_machine::StepResult;
33use thiserror::Error;
34use tokio::sync::{oneshot, Mutex};
35use uuid::Uuid;
36
37#[derive(Debug, Clone)]
39pub struct SyncManagerConfig {
40 pub auto_sync_enabled: bool,
42
43 pub auto_sync_interval: Duration,
45
46 pub max_concurrent_syncs: usize,
48
49 pub initial_peers: Vec<DeviceId>,
51
52 pub maintenance_enabled: bool,
54
55 pub maintenance_interval: Duration,
57
58 pub peer_state_ttl: Duration,
60
61 pub max_peer_states: usize,
63}
64
65impl Default for SyncManagerConfig {
66 fn default() -> Self {
67 Self {
68 auto_sync_enabled: true,
69 auto_sync_interval: Duration::from_secs(60),
70 max_concurrent_syncs: 5,
71 initial_peers: Vec::new(),
72 maintenance_enabled: true,
73 maintenance_interval: Duration::from_secs(60),
74 peer_state_ttl: Duration::from_secs(6 * 60 * 60),
75 max_peer_states: 1024,
76 }
77 }
78}
79
80impl_service_config_profiles!(SyncManagerConfig {
81 pub fn for_testing() -> Self {
83 Self {
84 auto_sync_enabled: true,
85 auto_sync_interval: Duration::from_secs(5),
86 max_concurrent_syncs: 3,
87 initial_peers: Vec::new(),
88 maintenance_enabled: true,
89 maintenance_interval: Duration::from_secs(5),
90 peer_state_ttl: Duration::from_secs(60),
91 max_peer_states: 128,
92 }
93 }
94
95 pub fn manual_only() -> Self {
97 Self {
98 auto_sync_enabled: false,
99 auto_sync_interval: Duration::from_secs(60),
100 max_concurrent_syncs: 5,
101 initial_peers: Vec::new(),
102 maintenance_enabled: true,
103 maintenance_interval: Duration::from_secs(60),
104 peer_state_ttl: Duration::from_secs(6 * 60 * 60),
105 max_peer_states: 1024,
106 }
107 }
108});
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum SyncManagerState {
113 Stopped,
115 Starting,
117 Running,
119 Stopping,
121 Failed,
123}
124
125#[derive(Debug, Error)]
126pub enum SyncManagerError {
127 #[error(transparent)]
128 Service(#[from] ServiceError),
129 #[error("sync service not started")]
130 NotStarted,
131 #[error("sync operation failed: {0}")]
132 Sync(String),
133 #[error("epoch rotation delegation requires bundle evidence")]
134 DelegationRequiresBundleEvidence,
135 #[error("epoch rotation delegation failed")]
136 Delegation(#[source] ReconfigurationManagerError),
137 #[error("epoch rotation participant role required")]
138 ParticipantRoleRequired,
139 #[error("epoch rotation VM session open failed: {0}")]
140 VmSessionOpen(String),
141 #[error("epoch rotation VM advance failed: {0}")]
142 VmAdvance(String),
143 #[error("epoch rotation VM round handling failed: {0}")]
144 VmRoundHandling(String),
145 #[error("epoch proposal encode failed: {0}")]
146 ProposalEncode(String),
147 #[error("epoch confirmation encode failed: {0}")]
148 ConfirmationEncode(String),
149 #[error("epoch confirmation decode failed: {0}")]
150 ConfirmationDecode(String),
151 #[error("epoch commit encode failed: {0}")]
152 CommitEncode(String),
153 #[error("epoch rotation {role} VM timed out while waiting for receive")]
154 VmTimedOut { role: &'static str },
155 #[error("epoch rotation {role} VM cancelled while waiting for receive")]
156 VmCancelled { role: &'static str },
157 #[error("epoch rotation {role} VM became stuck without a pending receive")]
158 VmStuck { role: &'static str },
159}
160
161struct SyncState {
162 service: Arc<SyncService>,
163 peers: Vec<DeviceId>,
164}
165
166struct SyncManagerShared {
167 owner: ActorOwnedServiceRoot<SyncServiceManager, SyncCommand, SyncManagerState>,
168 configured_peers: Mutex<Vec<DeviceId>>,
169}
170
171#[derive(Clone)]
172struct SyncStateSnapshot {
173 service: Option<Arc<SyncService>>,
174 status: SyncManagerState,
175 peers: Vec<DeviceId>,
176}
177
178enum SyncCommand {
179 SnapshotState {
180 reply: oneshot::Sender<SyncStateSnapshot>,
181 },
182 AddPeer {
183 peer: DeviceId,
184 reply: oneshot::Sender<()>,
185 },
186 RemovePeer {
187 peer: DeviceId,
188 reply: oneshot::Sender<()>,
189 },
190}
191
192impl SyncState {
193 fn new_running(service: Arc<SyncService>, initial_peers: Vec<DeviceId>) -> Self {
194 Self {
195 service,
196 peers: initial_peers,
197 }
198 }
199}
200
201impl SyncManagerState {
202 fn phase(self) -> ActorLifecyclePhase {
203 match self {
204 Self::Stopped => ActorLifecyclePhase::Stopped,
205 Self::Starting => ActorLifecyclePhase::Starting,
206 Self::Running => ActorLifecyclePhase::Running,
207 Self::Stopping => ActorLifecyclePhase::Stopping,
208 Self::Failed => ActorLifecyclePhase::Failed,
209 }
210 }
211}
212
213#[aura_macros::actor_owned(
218 owner = "sync_service_manager",
219 domain = "sync",
220 gate = "sync_command_ingress",
221 command = SyncCommand,
222 capacity = 64,
223 category = "actor_owned"
224)]
225#[derive(Clone)]
226pub struct SyncServiceManager {
227 config: SyncManagerConfig,
229 merkle_verifier: Option<Arc<MerkleVerifier>>,
231 shared: Arc<SyncManagerShared>,
233 reconfiguration: ReconfigurationManager,
235}
236
237impl SyncServiceManager {
238 pub const OWNERSHIP_CATEGORY: OwnershipCategory = OwnershipCategory::ActorOwned;
239
240 async fn command_handle(
241 &self,
242 ) -> Result<ServiceActorHandle<SyncServiceManager, SyncCommand>, ServiceError> {
243 self.shared
244 .owner
245 .command_handle(
246 self.name(),
247 "sync command actor unavailable; service is not fully started",
248 )
249 .await
250 }
251
252 fn spawn_command_actor(
253 &self,
254 tasks: &TaskGroup,
255 mut state: SyncState,
256 ) -> ServiceActorHandle<SyncServiceManager, SyncCommand> {
257 let (commands, mut mailbox) =
258 ServiceActorHandle::<SyncServiceManager, SyncCommand>::bounded(self.name(), 64);
259
260 let _command_actor_handle = tasks.spawn_named("command_actor", async move {
261 while let Some(command) = mailbox.recv().await {
262 match command {
263 SyncCommand::SnapshotState { reply } => {
264 let _ = reply.send(SyncStateSnapshot {
265 service: Some(state.service.clone()),
266 status: SyncManagerState::Running,
267 peers: state.peers.clone(),
268 });
269 }
270 SyncCommand::AddPeer { peer, reply } => {
271 if !state.peers.contains(&peer) {
272 state.peers.push(peer);
273 tracing::debug!("Added peer {} to sync manager", peer);
274 }
275 let _ = reply.send(());
276 }
277 SyncCommand::RemovePeer { peer, reply } => {
278 state.peers.retain(|p| p != &peer);
279 tracing::debug!("Removed peer {} from sync manager", peer);
280 let _ = reply.send(());
281 }
282 }
283 }
284 });
285
286 commands
287 }
288
289 async fn state_snapshot(&self) -> Result<SyncStateSnapshot, ServiceError> {
290 match self.command_handle().await {
291 Ok(commands) => {
292 commands
293 .request(|reply| SyncCommand::SnapshotState { reply })
294 .await
295 }
296 Err(_) => Ok(SyncStateSnapshot {
297 service: None,
298 status: self.shared.owner.state().await,
299 peers: self.shared.configured_peers.lock().await.clone(),
300 }),
301 }
302 }
303
304 fn shared_for_config(config: &SyncManagerConfig) -> Arc<SyncManagerShared> {
305 Arc::new(SyncManagerShared {
306 owner: ActorOwnedServiceRoot::new(SyncManagerState::Stopped),
307 configured_peers: Mutex::new(config.initial_peers.clone()),
308 })
309 }
310
311 pub fn new(config: SyncManagerConfig) -> Self {
313 Self {
314 config: config.clone(),
315 merkle_verifier: None,
316 shared: Self::shared_for_config(&config),
317 reconfiguration: ReconfigurationManager::new(),
318 }
319 }
320
321 pub fn with_indexed_journal(
326 config: SyncManagerConfig,
327 indexed_journal: Arc<dyn IndexedJournalEffects + Send + Sync>,
328 time: Arc<dyn PhysicalTimeEffects>,
329 ) -> Self {
330 Self {
331 config: config.clone(),
332 merkle_verifier: Some(Arc::new(MerkleVerifier::new(indexed_journal, time))),
333 shared: Self::shared_for_config(&config),
334 reconfiguration: ReconfigurationManager::new(),
335 }
336 }
337
338 pub fn with_defaults() -> Self {
340 Self::new(SyncManagerConfig::default())
341 }
342
343 pub async fn state(&self) -> SyncManagerState {
345 self.state_snapshot()
346 .await
347 .map(|snapshot| snapshot.status)
348 .unwrap_or(self.shared.owner.state().await)
349 }
350
351 pub async fn is_running(&self) -> bool {
353 self.state().await == SyncManagerState::Running
354 }
355
356 async fn start_managed(&self, context: &RuntimeServiceContext) -> Result<(), ServiceError> {
357 let _lifecycle_guard = self.shared.owner.lifecycle().lock().await;
358 let current_state = self.shared.owner.state().await;
359 if current_state == SyncManagerState::Running {
360 return Ok(());
361 }
362 validate_actor_transition(
363 self.name(),
364 current_state.phase(),
365 ActorLifecyclePhase::Starting,
366 )?;
367
368 self.shared
369 .owner
370 .set_state(SyncManagerState::Starting)
371 .await;
372
373 let sync_config = SyncServiceConfig {
375 auto_sync_enabled: self.config.auto_sync_enabled,
376 auto_sync_interval: self.config.auto_sync_interval,
377 max_concurrent_syncs: self.config.max_concurrent_syncs as u32,
378 ..Default::default()
379 };
380
381 let now_instant = SyncService::monotonic_now();
383 let service = match SyncService::new(sync_config, context.time_effects(), now_instant).await
384 {
385 Ok(service) => service,
386 Err(error) => {
387 self.shared.owner.set_state(SyncManagerState::Failed).await;
388 return Err(ServiceError::startup_failed(
389 "sync_service",
390 error.to_string(),
391 ));
392 }
393 };
394
395 if let Err(error) = service.start(now_instant).await {
397 self.shared.owner.set_state(SyncManagerState::Failed).await;
398 return Err(ServiceError::startup_failed(
399 "sync_service",
400 error.to_string(),
401 ));
402 }
403
404 let initial_peers = self.shared.configured_peers.lock().await.clone();
405 let maintenance_group = context.tasks().group(self.name());
406 let command_handle = self.spawn_command_actor(
407 &maintenance_group,
408 SyncState::new_running(Arc::new(service), initial_peers),
409 );
410 self.shared.owner.install_commands(command_handle).await;
411 self.spawn_maintenance_task(maintenance_group.clone(), context.time_effects());
412 self.shared.owner.set_state(SyncManagerState::Running).await;
413 self.shared.owner.install_tasks(maintenance_group).await;
414
415 tracing::info!(
416 event = "runtime.service.sync.started",
417 service = self.name(),
418 "Sync service manager started"
419 );
420 Ok(())
421 }
422
423 async fn stop_managed(&self) -> Result<(), ServiceError> {
424 let _lifecycle_guard = self.shared.owner.lifecycle().lock().await;
425 let current_state = self.shared.owner.state().await;
426 if current_state == SyncManagerState::Stopped {
427 return Ok(());
428 }
429 validate_actor_transition(
430 self.name(),
431 current_state.phase(),
432 ActorLifecyclePhase::Stopping,
433 )?;
434
435 self.shared
436 .owner
437 .set_state(SyncManagerState::Stopping)
438 .await;
439
440 let snapshot = self.state_snapshot().await.ok();
441 if let Some(snapshot) = snapshot.as_ref() {
442 *self.shared.configured_peers.lock().await = snapshot.peers.clone();
443 }
444 let service = snapshot.and_then(|snapshot| snapshot.service);
445 self.shared.owner.take_commands().await;
446
447 let maintenance_shutdown_error =
448 if let Some(task_group) = self.shared.owner.take_tasks().await {
449 match task_group
450 .shutdown_with_timeout(Duration::from_secs(2))
451 .await
452 {
453 Ok(()) => None,
454 Err(crate::task_registry::TaskSupervisionError::ForcedAbort {
455 aborted_tasks,
456 ..
457 }) => {
458 tracing::warn!(
459 service = self.name(),
460 aborted_tasks = ?aborted_tasks,
461 "Sync service stop force-aborted owned background tasks"
462 );
463 None
464 }
465 Err(error) => Some(ServiceError::shutdown_failed(
466 self.name(),
467 format!("failed to stop maintenance task group: {error}"),
468 )),
469 }
470 } else {
471 None
472 };
473
474 if let Some(service) = service.as_ref() {
476 let now_instant = SyncService::monotonic_now();
477 if let Err(error) = service.stop(now_instant).await {
478 self.shared.owner.set_state(SyncManagerState::Failed).await;
479 return Err(ServiceError::shutdown_failed(
480 self.name(),
481 error.to_string(),
482 ));
483 }
484 }
485
486 self.shared.owner.set_state(SyncManagerState::Stopped).await;
487
488 tracing::info!(
489 event = "runtime.service.sync.stopped",
490 service = self.name(),
491 "Sync service manager stopped"
492 );
493 match maintenance_shutdown_error {
494 Some(error) => {
495 self.shared.owner.set_state(SyncManagerState::Failed).await;
496 Err(error)
497 }
498 None => Ok(()),
499 }
500 }
501
502 fn spawn_maintenance_task(
504 &self,
505 tasks: TaskGroup,
506 time_effects: Arc<dyn PhysicalTimeEffects + Send + Sync>,
507 ) {
508 if !self.config.maintenance_enabled {
509 tracing::debug!("Sync maintenance task disabled by configuration");
510 return;
511 }
512
513 let interval = self.config.maintenance_interval;
514 let peer_state_ttl = self.config.peer_state_ttl;
515 let max_peer_states = self.config.max_peer_states;
516 let manager = self.clone();
517
518 let _maintenance_task_handle = tasks.spawn_interval_until_named(
519 "sync.maintenance",
520 time_effects.clone(),
521 interval,
522 move || {
523 let manager = manager.clone();
524 let time_effects = time_effects.clone();
525 async move {
526 let snapshot = match manager.state_snapshot().await {
527 Ok(snapshot) => snapshot,
528 Err(error) => {
529 tracing::debug!(
530 error = %error,
531 "Sync maintenance skipped because command actor is unavailable"
532 );
533 return false;
534 }
535 };
536 let status = snapshot.status;
537 if matches!(
538 status,
539 SyncManagerState::Stopped
540 | SyncManagerState::Stopping
541 | SyncManagerState::Failed
542 ) {
543 return false;
544 }
545
546 let now_ms = match time_effects.physical_time().await {
547 Ok(t) => t.ts_ms,
548 Err(e) => {
549 tracing::warn!("Sync maintenance: failed to get time: {}", e);
550 return true;
551 }
552 };
553
554 if let Some(service) = snapshot.service {
555 if let Err(e) = service
556 .maintenance_cleanup(
557 now_ms,
558 peer_state_ttl.as_millis() as u64,
559 max_peer_states,
560 )
561 .await
562 {
563 tracing::warn!("Sync maintenance failed: {}", e);
564 }
565 }
566
567 true
568 }
569 },
570 );
571 }
572
573 pub async fn sync_with_peers<E>(
579 &self,
580 effects: &E,
581 peers: Vec<DeviceId>,
582 ) -> Result<(), SyncManagerError>
583 where
584 E: aura_core::effects::JournalEffects
585 + aura_core::effects::NetworkEffects
586 + aura_core::effects::PhysicalTimeEffects
587 + aura_guards::GuardContextProvider
588 + Send
589 + Sync,
590 {
591 let service = self
592 .state_snapshot()
593 .await
594 .map_err(SyncManagerError::from)?
595 .service
596 .clone()
597 .ok_or(SyncManagerError::NotStarted)?;
598
599 let now_instant = SyncService::monotonic_now();
600 service
601 .sync_with_peers(effects, peers, now_instant)
602 .await
603 .map_err(|error| SyncManagerError::Sync(error.to_string()))
604 }
605
606 pub async fn add_peer(&self, peer: DeviceId) {
608 if let Ok(commands) = self.command_handle().await {
609 let _ = commands
610 .request(|reply| SyncCommand::AddPeer { peer, reply })
611 .await;
612 } else {
613 let mut peers = self.shared.configured_peers.lock().await;
614 if !peers.contains(&peer) {
615 peers.push(peer);
616 tracing::debug!("Added peer {} to sync manager", peer);
617 }
618 }
619 }
620
621 pub async fn remove_peer(&self, peer: &DeviceId) {
623 if let Ok(commands) = self.command_handle().await {
624 let _ = commands
625 .request(|reply| SyncCommand::RemovePeer { peer: *peer, reply })
626 .await;
627 } else {
628 let mut peers = self.shared.configured_peers.lock().await;
629 peers.retain(|p| p != peer);
630 tracing::debug!("Removed peer {} from sync manager", peer);
631 }
632 }
633
634 pub async fn peers(&self) -> Vec<DeviceId> {
636 self.state_snapshot()
637 .await
638 .map(|snapshot| snapshot.peers)
639 .unwrap_or_default()
640 }
641
642 pub async fn sync_service_health(&self) -> Option<aura_sync::services::SyncServiceHealth> {
644 self.state_snapshot()
645 .await
646 .ok()
647 .and_then(|snapshot| snapshot.service.map(|s| s.get_health()))
648 }
649
650 pub async fn metrics(&self) -> Option<aura_sync::services::ServiceMetrics> {
652 self.state_snapshot()
653 .await
654 .ok()
655 .and_then(|snapshot| snapshot.service.map(|s| s.get_metrics()))
656 }
657
658 pub fn config(&self) -> &SyncManagerConfig {
660 &self.config
661 }
662
663 pub fn has_merkle_verification(&self) -> bool {
672 self.merkle_verifier.is_some()
673 }
674
675 pub async fn local_merkle_root(&self) -> Option<[u8; 32]> {
681 if let Some(ref verifier) = self.merkle_verifier {
682 verifier.local_merkle_root().await.ok()
683 } else {
684 None
685 }
686 }
687
688 pub async fn verify_facts(
696 &self,
697 facts: Vec<IndexedFact>,
698 claimed_root: [u8; 32],
699 ) -> Option<VerificationResult> {
700 if let Some(ref verifier) = self.merkle_verifier {
701 verifier
702 .verify_incoming_facts(facts, claimed_root)
703 .await
704 .ok()
705 } else {
706 None
707 }
708 }
709
710 pub fn merkle_verifier(&self) -> Option<&Arc<MerkleVerifier>> {
716 self.merkle_verifier.as_ref()
717 }
718}
719
720impl Default for SyncServiceManager {
721 fn default() -> Self {
722 Self::with_defaults()
723 }
724}
725
726#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
731#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
732impl RuntimeService for SyncServiceManager {
733 fn name(&self) -> &'static str {
734 "sync_service"
735 }
736
737 fn dependencies(&self) -> &[&'static str] {
738 &["indexed_journal", "transport"]
739 }
740
741 async fn start(&self, context: &RuntimeServiceContext) -> Result<(), ServiceError> {
742 self.start_managed(context).await
743 }
744
745 async fn stop(&self) -> Result<(), ServiceError> {
746 self.stop_managed().await
747 }
748
749 async fn health(&self) -> ServiceHealth {
750 self.health_async().await
751 }
752}
753
754impl SyncServiceManager {
756 pub async fn health_async(&self) -> ServiceHealth {
761 let state = self.state().await;
762 match state {
763 SyncManagerState::Stopped => ServiceHealth::Stopped,
764 SyncManagerState::Starting => ServiceHealth::Starting,
765 SyncManagerState::Stopping => ServiceHealth::Stopping,
766 SyncManagerState::Failed => ServiceHealth::Unhealthy {
767 reason: "service entered failed lifecycle state".to_string(),
768 },
769 SyncManagerState::Running => {
770 if let Some(svc_health) = self.sync_service_health().await {
772 use aura_sync::services::HealthStatus;
773 match svc_health.status {
774 HealthStatus::Healthy => ServiceHealth::Healthy,
775 HealthStatus::Degraded | HealthStatus::Starting => {
776 ServiceHealth::Degraded {
777 reason: format!(
778 "status={:?}, active_sessions={}",
779 svc_health.status, svc_health.active_sessions
780 ),
781 }
782 }
783 HealthStatus::Unhealthy | HealthStatus::Stopping => {
784 ServiceHealth::Unhealthy {
785 reason: format!("status={:?}", svc_health.status),
786 }
787 }
788 }
789 } else {
790 ServiceHealth::Unhealthy {
791 reason: "underlying service not available".to_string(),
792 }
793 }
794 }
795 }
796 }
797}
798
799impl SyncServiceManager {
804 fn epoch_role(authority_id: AuthorityId, role_index: u16) -> ChoreographicRole {
805 ChoreographicRole::for_authority(
806 authority_id,
807 RoleIndex::new(role_index.into()).expect("role index"),
808 )
809 }
810
811 pub async fn execute_epoch_rotation_coordinator(
813 &self,
814 effects: Arc<AuraEffectSystem>,
815 coordinator_id: AuthorityId,
816 participant1_id: AuthorityId,
817 participant2_id: AuthorityId,
818 proposal: EpochRotationProposal,
819 commit: EpochCommit,
820 ) -> Result<(), SyncManagerError> {
821 let session_id = epoch_rotation_session_id(&proposal.rotation_id);
822 self.record_native_epoch_session(coordinator_id, session_id)
823 .await;
824 let roles = vec![
825 Self::epoch_role(coordinator_id, 0),
826 Self::epoch_role(participant1_id, 0),
827 Self::epoch_role(participant2_id, 0),
828 ];
829 let peer_roles = BTreeMap::from([
830 (
831 "Participant1".to_string(),
832 Self::epoch_role(participant1_id, 0),
833 ),
834 (
835 "Participant2".to_string(),
836 Self::epoch_role(participant2_id, 0),
837 ),
838 ]);
839 let manifest =
840 aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::composition_manifest();
841 let global_type =
842 aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::global_type();
843 let local_types =
844 aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::local_types();
845
846 let result = async {
847 let mut session = open_owned_manifest_vm_session_admitted(
848 effects.clone(),
849 session_id,
850 roles,
851 &manifest,
852 "Coordinator",
853 &global_type,
854 &local_types,
855 crate::runtime::AuraVmSchedulerSignals::default(),
856 )
857 .await
858 .map_err(|error| SyncManagerError::VmSessionOpen(error.to_string()))?;
859 session.queue_send_bytes(
860 to_vec(&proposal)
861 .map_err(|error| SyncManagerError::ProposalEncode(error.to_string()))?,
862 );
863 let mut confirmations = Vec::new();
864 let mut commit_queued = false;
865
866 let loop_result = loop {
867 let round = session
868 .advance_round("Coordinator", &peer_roles)
869 .await
870 .map_err(|error| SyncManagerError::VmAdvance(error.to_string()))?;
871
872 if let Some(blocked) = round.blocked_receive {
873 let confirmation: EpochConfirmation =
874 aura_core::util::serialization::from_slice(&blocked.payload).map_err(
875 |error| SyncManagerError::ConfirmationDecode(error.to_string()),
876 )?;
877 confirmations.push(confirmation);
878 if !commit_queued && confirmations.len() == 2 {
879 let payload = to_vec(&commit)
880 .map_err(|error| SyncManagerError::CommitEncode(error.to_string()))?;
881 session.queue_send_bytes(payload.clone());
882 session.queue_send_bytes(payload);
883 commit_queued = true;
884 }
885 session
886 .inject_blocked_receive(&blocked)
887 .map_err(|error| SyncManagerError::VmRoundHandling(error.to_string()))?;
888 continue;
889 }
890
891 match round.host_wait_status {
892 AuraVmHostWaitStatus::Idle => {}
893 AuraVmHostWaitStatus::TimedOut => {
894 break Err(SyncManagerError::VmTimedOut {
895 role: "coordinator",
896 });
897 }
898 AuraVmHostWaitStatus::Cancelled => {
899 break Err(SyncManagerError::VmCancelled {
900 role: "coordinator",
901 });
902 }
903 AuraVmHostWaitStatus::Deferred | AuraVmHostWaitStatus::Delivered => {}
904 }
905
906 match round.step {
907 StepResult::AllDone => break Ok(()),
908 StepResult::Continue => {}
909 StepResult::Stuck => {
910 break Err(SyncManagerError::VmStuck {
911 role: "coordinator",
912 });
913 }
914 }
915 };
916 let _ = session.close().await;
917 loop_result
918 }
919 .await;
920 result
921 }
922
923 pub async fn execute_epoch_rotation_participant(
925 &self,
926 effects: Arc<AuraEffectSystem>,
927 role: EpochRotationProtocolRole,
928 coordinator_id: AuthorityId,
929 participant1_id: AuthorityId,
930 participant2_id: AuthorityId,
931 confirmation: EpochConfirmation,
932 ) -> Result<(), SyncManagerError> {
933 let participant_id = match role {
934 EpochRotationProtocolRole::Participant1 => participant1_id,
935 EpochRotationProtocolRole::Participant2 => participant2_id,
936 EpochRotationProtocolRole::Coordinator => {
937 return Err(SyncManagerError::ParticipantRoleRequired)
938 }
939 };
940 let session_id = epoch_rotation_session_id(&confirmation.rotation_id);
941 self.record_native_epoch_session(participant_id, session_id)
942 .await;
943 let active_role_name = match role {
944 EpochRotationProtocolRole::Participant1 => "Participant1",
945 EpochRotationProtocolRole::Participant2 => "Participant2",
946 EpochRotationProtocolRole::Coordinator => unreachable!(),
947 };
948 let roles = vec![
949 Self::epoch_role(coordinator_id, 0),
950 Self::epoch_role(participant_id, 0),
951 ];
952 let peer_roles = BTreeMap::from([(
953 "Coordinator".to_string(),
954 Self::epoch_role(coordinator_id, 0),
955 )]);
956 let manifest =
957 aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::composition_manifest();
958 let global_type =
959 aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::global_type();
960 let local_types =
961 aura_sync::protocols::epochs::telltale_session_types_epoch_rotation::vm_artifacts::local_types();
962
963 let result = async {
964 let mut session = open_owned_manifest_vm_session_admitted(
965 effects.clone(),
966 session_id,
967 roles,
968 &manifest,
969 active_role_name,
970 &global_type,
971 &local_types,
972 crate::runtime::AuraVmSchedulerSignals::default(),
973 )
974 .await
975 .map_err(|error| SyncManagerError::VmSessionOpen(error.to_string()))?;
976 session.queue_send_bytes(
977 to_vec(&confirmation)
978 .map_err(|error| SyncManagerError::ConfirmationEncode(error.to_string()))?,
979 );
980
981 let loop_result = loop {
982 let round = session
983 .advance_round(active_role_name, &peer_roles)
984 .await
985 .map_err(|error| SyncManagerError::VmAdvance(error.to_string()))?;
986
987 match crate::runtime::handle_owned_vm_round(
988 &mut session,
989 round,
990 "epoch rotation participant VM",
991 )
992 .map_err(|error| SyncManagerError::VmRoundHandling(error.to_string()))?
993 {
994 AuraVmRoundDisposition::Continue => {}
995 AuraVmRoundDisposition::Complete => break Ok(()),
996 }
997 };
998 let _ = session.close().await;
999 loop_result
1000 }
1001 .await;
1002 result
1003 }
1004
1005 pub async fn delegate_epoch_rotation_session(
1009 &self,
1010 effects: Arc<AuraEffectSystem>,
1011 rotation_id: &str,
1012 from_authority: AuthorityId,
1013 to_authority: AuthorityId,
1014 bundle_id: Option<String>,
1015 ) -> Result<DelegationReceipt, SyncManagerError> {
1016 let session_id =
1017 RuntimeChoreographySessionId::from_uuid(epoch_rotation_session_id(rotation_id))
1018 .into_aura_session_id();
1019 self.reconfiguration
1020 .record_native_session(from_authority, session_id)
1021 .await;
1022 self.reconfiguration
1023 .delegate_session(
1024 &effects,
1025 SessionDelegationTransfer::new(
1026 session_id,
1027 from_authority,
1028 to_authority,
1029 bundle_id.ok_or(SyncManagerError::DelegationRequiresBundleEvidence)?,
1030 )
1031 .with_context(default_context_id_for_authority(from_authority)),
1032 )
1033 .await
1034 .map(|outcome| outcome.receipt)
1035 .map_err(SyncManagerError::Delegation)
1036 }
1037
1038 async fn record_native_epoch_session(&self, authority_id: AuthorityId, session_uuid: Uuid) {
1039 let session_id =
1040 RuntimeChoreographySessionId::from_uuid(session_uuid).into_aura_session_id();
1041 self.reconfiguration
1042 .record_native_session(authority_id, session_id)
1043 .await;
1044 }
1045}
1046
1047fn epoch_rotation_session_id(rotation_id: &str) -> Uuid {
1048 let digest = hash(rotation_id.as_bytes());
1049 let mut bytes = [0u8; 16];
1050 bytes.copy_from_slice(&digest[..16]);
1051 Uuid::from_bytes(bytes)
1052}
1053
1054#[cfg(test)]
1055#[allow(clippy::disallowed_types)]
1056mod tests {
1057 use super::*;
1058 use async_trait::async_trait;
1059 use aura_core::domain::journal::FactValue;
1060 use aura_core::effects::indexed::{FactId, FactStreamReceiver, IndexStats};
1061 use aura_core::effects::{BloomConfig, BloomFilter};
1062 use aura_core::AuthorityId;
1063 use aura_effects::time::PhysicalTimeHandler;
1064 use std::sync::Mutex;
1065
1066 fn test_service_context() -> RuntimeServiceContext {
1067 RuntimeServiceContext::new(
1068 Arc::new(crate::runtime::TaskSupervisor::new()),
1069 Arc::new(PhysicalTimeHandler::new()),
1070 )
1071 }
1072
1073 struct MockIndexedJournal {
1075 root: Mutex<[u8; 32]>,
1076 facts: Mutex<Vec<IndexedFact>>,
1077 }
1078
1079 impl MockIndexedJournal {
1080 fn new(root: [u8; 32]) -> Self {
1081 Self {
1082 root: Mutex::new(root),
1083 facts: Mutex::new(Vec::new()),
1084 }
1085 }
1086 }
1087
1088 #[async_trait]
1089 impl IndexedJournalEffects for MockIndexedJournal {
1090 fn watch_facts(&self) -> Box<dyn FactStreamReceiver> {
1091 panic!("Not implemented for mock")
1092 }
1093
1094 async fn facts_by_predicate(
1095 &self,
1096 _predicate: &str,
1097 ) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1098 Ok(Vec::new())
1099 }
1100
1101 async fn facts_by_authority(
1102 &self,
1103 _authority: &AuthorityId,
1104 ) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1105 Ok(Vec::new())
1106 }
1107
1108 async fn facts_in_range(
1109 &self,
1110 _start: aura_core::time::TimeStamp,
1111 _end: aura_core::time::TimeStamp,
1112 ) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1113 Ok(Vec::new())
1114 }
1115
1116 async fn all_facts(&self) -> Result<Vec<IndexedFact>, aura_core::AuraError> {
1117 Ok(self.facts.lock().unwrap().clone())
1118 }
1119
1120 fn might_contain(&self, _predicate: &str, _value: &FactValue) -> bool {
1121 false
1122 }
1123
1124 async fn merkle_root(&self) -> Result<[u8; 32], aura_core::AuraError> {
1125 Ok(*self.root.lock().unwrap())
1126 }
1127
1128 async fn verify_fact_inclusion(
1129 &self,
1130 fact: &IndexedFact,
1131 ) -> Result<bool, aura_core::AuraError> {
1132 let facts = self.facts.lock().unwrap();
1133 Ok(facts.iter().any(|f| f.id == fact.id))
1134 }
1135
1136 async fn get_bloom_filter(&self) -> Result<BloomFilter, aura_core::AuraError> {
1137 BloomFilter::new(BloomConfig::for_sync(100))
1138 }
1139
1140 async fn index_stats(&self) -> Result<IndexStats, aura_core::AuraError> {
1141 let facts = self.facts.lock().unwrap();
1142 Ok(IndexStats {
1143 fact_count: facts.len() as u64,
1144 predicate_count: 1,
1145 authority_count: 1,
1146 bloom_fp_rate: 0.01,
1147 merkle_depth: 10,
1148 })
1149 }
1150 }
1151
1152 #[tokio::test]
1153 async fn test_sync_manager_creation() {
1154 let config = SyncManagerConfig::for_testing();
1155 let manager = SyncServiceManager::new(config);
1156
1157 assert_eq!(manager.state().await, SyncManagerState::Stopped);
1158 assert!(!manager.is_running().await);
1159 }
1160
1161 #[tokio::test]
1162 async fn test_sync_manager_lifecycle() {
1163 let config = SyncManagerConfig::for_testing();
1164 let manager = SyncServiceManager::new(config);
1165 let context = test_service_context();
1166
1167 RuntimeService::start(&manager, &context).await.unwrap();
1169 assert!(manager.is_running().await);
1170
1171 RuntimeService::stop(&manager).await.unwrap();
1173 assert!(!manager.is_running().await);
1174 }
1175
1176 #[tokio::test]
1177 async fn test_sync_manager_concurrent_lifecycle_transitions_are_idempotent() {
1178 let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1179 let context = test_service_context();
1180
1181 let start_a = RuntimeService::start(&manager, &context);
1182 let start_b = RuntimeService::start(&manager, &context);
1183 let (start_a, start_b) = tokio::join!(start_a, start_b);
1184 start_a.expect("first concurrent start should succeed");
1185 start_b.expect("second concurrent start should be idempotent");
1186 assert!(manager.is_running().await);
1187
1188 let stop_a = RuntimeService::stop(&manager);
1189 let stop_b = RuntimeService::stop(&manager);
1190 let (stop_a, stop_b) = tokio::join!(stop_a, stop_b);
1191 stop_a.expect("first concurrent stop should succeed");
1192 stop_b.expect("second concurrent stop should be idempotent");
1193 assert!(!manager.is_running().await);
1194 }
1195
1196 #[tokio::test]
1197 async fn test_sync_manager_stop_drains_owned_tasks() {
1198 let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1199 let context = test_service_context();
1200
1201 RuntimeService::start(&manager, &context).await.unwrap();
1202 let task_group = manager
1203 .shared
1204 .owner
1205 .task_group()
1206 .await
1207 .expect("running sync service should own a maintenance task group");
1208
1209 RuntimeService::stop(&manager).await.unwrap();
1210 task_group
1211 .wait_for_idle(Duration::from_secs(1))
1212 .await
1213 .expect("sync task group should drain on stop");
1214 assert!(
1215 task_group.active_tasks().is_empty(),
1216 "sync task group should not leak owned tasks after stop"
1217 );
1218 }
1219
1220 #[tokio::test]
1221 async fn test_sync_manager_peer_management() {
1222 let manager = SyncServiceManager::with_defaults();
1223
1224 let peer1 = DeviceId::new_from_entropy([1u8; 32]);
1225 let peer2 = DeviceId::new_from_entropy([2u8; 32]);
1226
1227 manager.add_peer(peer1).await;
1229 manager.add_peer(peer2).await;
1230
1231 let peers = manager.peers().await;
1232 assert_eq!(peers.len(), 2);
1233 assert!(peers.contains(&peer1));
1234 assert!(peers.contains(&peer2));
1235
1236 manager.remove_peer(&peer1).await;
1238 let peers = manager.peers().await;
1239 assert_eq!(peers.len(), 1);
1240 assert!(!peers.contains(&peer1));
1241 assert!(peers.contains(&peer2));
1242 }
1243
1244 #[tokio::test]
1245 async fn test_sync_manager_health_when_not_running() {
1246 let manager = SyncServiceManager::with_defaults();
1247
1248 assert!(manager.sync_service_health().await.is_none());
1250 }
1251
1252 #[tokio::test]
1253 async fn test_sync_manager_health_when_running() {
1254 let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1255 let context = test_service_context();
1256
1257 RuntimeService::start(&manager, &context).await.unwrap();
1258
1259 let health = manager.sync_service_health().await;
1261 assert!(health.is_some());
1262
1263 RuntimeService::stop(&manager).await.unwrap();
1264 }
1265
1266 #[tokio::test]
1267 async fn test_sync_manager_without_merkle_verification() {
1268 let manager = SyncServiceManager::new(SyncManagerConfig::for_testing());
1269
1270 assert!(!manager.has_merkle_verification());
1272 assert!(manager.local_merkle_root().await.is_none());
1273 assert!(manager.verify_facts(vec![], [0u8; 32]).await.is_none());
1274 assert!(manager.merkle_verifier().is_none());
1275 }
1276
1277 #[tokio::test]
1278 async fn test_sync_manager_with_merkle_verification() {
1279 let root = [42u8; 32];
1280 let journal = Arc::new(MockIndexedJournal::new(root));
1281 let time = Arc::new(PhysicalTimeHandler::new());
1282 let manager = SyncServiceManager::with_indexed_journal(
1283 SyncManagerConfig::for_testing(),
1284 journal,
1285 time,
1286 );
1287
1288 assert!(manager.has_merkle_verification());
1290 assert!(manager.merkle_verifier().is_some());
1291
1292 let local_root = manager.local_merkle_root().await;
1294 assert_eq!(local_root, Some(root));
1295 }
1296
1297 #[tokio::test]
1298 async fn test_sync_manager_verify_facts() {
1299 let root = [42u8; 32];
1300 let journal = Arc::new(MockIndexedJournal::new(root));
1301 let time = Arc::new(PhysicalTimeHandler::new());
1302 let manager = SyncServiceManager::with_indexed_journal(
1303 SyncManagerConfig::for_testing(),
1304 journal,
1305 time,
1306 );
1307
1308 let fact = IndexedFact {
1310 id: FactId(1),
1311 predicate: "test".to_string(),
1312 value: FactValue::String("test_value".to_string()),
1313 authority: Some(AuthorityId::new_from_entropy([1u8; 32])),
1314 timestamp: None,
1315 };
1316
1317 let result = manager.verify_facts(vec![fact], root).await;
1319 assert!(result.is_some());
1320
1321 let result = result.unwrap();
1322 assert_eq!(result.verified.len(), 1);
1324 assert!(result.rejected.is_empty());
1325 }
1326}