1#![allow(clippy::disallowed_types)]
14
15use crate::core::config::default_storage_path;
16use crate::core::AgentConfig;
17use crate::database::IndexedJournalHandler;
18use crate::fact_registry::build_fact_registry;
19use crate::runtime::services::{
20 LanTransportService, LogicalClockManager, MoveManager, RendezvousManager,
21};
22use crate::runtime::subsystems::choreography::RuntimeChoreographySessionId;
23use crate::runtime::subsystems::{
24 crypto::CryptoRng, ChoreographyState, CryptoSubsystem, JournalSubsystem, TransportSubsystem,
25 VmFragmentId, VmFragmentRegistry,
26};
27use crate::runtime::time_handler::EnhancedTimeHandler;
28use async_trait::async_trait;
29use aura_app::ReactiveHandler;
30use aura_authorization::BiscuitAuthorizationBridge;
31use aura_composition::{CompositeHandlerAdapter, RegisterAllOptions};
32use aura_core::crypto::single_signer::SigningMode;
33use aura_core::effects::transport::TransportEnvelope;
34use aura_core::effects::*;
35use aura_core::hash::hash as aura_hash;
36use aura_core::types::scope::AuthorizationOp;
37use aura_core::{execute_with_timeout_budget, AuraError, AuthorityId, ContextId, TimeoutBudget};
38use aura_effects::{
39 crypto::RealCryptoHandler,
40 encrypted_storage::{EncryptedStorage, EncryptedStorageConfig},
41 secure::RealSecureStorageHandler,
42 storage::FilesystemStorageHandler,
43 time::{OrderClockHandler, PhysicalTimeHandler},
44};
45use aura_journal::extensibility::FactRegistry;
46use aura_journal::fact::ProtocolRelationalFact;
47use aura_journal::fact::{
48 DkgTranscriptCommit, Fact as TypedFact, FactContent, FactOptions, RelationalFact,
49};
50use aura_mpst::CompositionManifest;
51use aura_protocol::handlers::{PersistentSyncHandler, PersistentTreeHandler};
52use biscuit_auth::{Biscuit, PublicKey};
53use parking_lot::RwLock;
54use rand::rngs::StdRng;
55use rand::SeedableRng;
56use std::collections::HashMap;
57#[cfg(not(target_arch = "wasm32"))]
58use std::net::SocketAddr;
59use std::panic::Location;
60use std::sync::Arc;
61#[cfg(all(debug_assertions, not(target_arch = "wasm32")))]
62use std::sync::Once;
63use std::sync::OnceLock;
64#[cfg(all(debug_assertions, not(target_arch = "wasm32")))]
65use std::time::Duration;
66use tokio::sync::mpsc;
67
68use super::shared_transport::SharedTransport;
69
70#[derive(Clone, Debug)]
75pub struct BiscuitCache {
76 pub token_b64: String,
78 pub root_pk_b64: String,
80}
81
82mod amp;
83mod aura;
84mod choreography;
85mod crypto;
86mod effect_api;
87mod flow;
88mod guard;
89mod journal;
90mod network;
91mod noise;
92mod storage;
93mod sync;
94mod system;
95mod time;
96mod transport;
97mod tree;
98
99const DEFAULT_WINDOW: u32 = 1024;
100const TYPED_FACT_STORAGE_PREFIX: &str = "journal/facts";
101const DEFAULT_CHOREO_FLOW_COST: u32 = 1;
102const CHOREO_FLOW_COST_PER_KB: u32 = 1;
103const AMP_CONTENT_TYPE: &str = "application/aura-amp";
104const TEST_SEED_DERIVATION_DOMAIN: &str = "aura:test-seed:v1";
105
106#[derive(Clone, Debug)]
107struct TestSeedUsage {
108 identity: String,
109 location: String,
110}
111
112static TEST_SEED_REGISTRY: OnceLock<parking_lot::Mutex<HashMap<u64, TestSeedUsage>>> =
113 OnceLock::new();
114
115pub struct AuraEffectSystem {
128 config: AgentConfig,
130 authority_id: AuthorityId,
131 execution_mode: ExecutionMode,
132 harness_mode_enabled: bool,
133
134 crypto: CryptoSubsystem,
137 transport: TransportSubsystem,
139 journal: JournalSubsystem,
141
142 composite: CompositeHandlerAdapter,
144
145 storage_handler: Arc<
147 EncryptedStorage<FilesystemStorageHandler, RealCryptoHandler, RealSecureStorageHandler>,
148 >,
149 tree_handler: PersistentTreeHandler,
150 sync_handler: PersistentSyncHandler,
151
152 time_handler: EnhancedTimeHandler,
154 logical_clock: Arc<LogicalClockManager>,
155 order_clock: OrderClockHandler,
156
157 authorization_handler: aura_authorization::effects::WotAuthorizationHandler<
159 aura_effects::crypto::RealCryptoHandler,
160 >,
161 leakage_handler: aura_effects::leakage::ProductionLeakageHandler<
162 EncryptedStorage<FilesystemStorageHandler, RealCryptoHandler, RealSecureStorageHandler>,
163 >,
164
165 reactive_handler: ReactiveHandler,
168
169 choreography_state: parking_lot::RwLock<ChoreographyState>,
172
173 vm_fragment_registry: parking_lot::RwLock<VmFragmentRegistry>,
175
176 lan_transport: parking_lot::RwLock<Option<Arc<LanTransportService>>>,
178
179 rendezvous_manager: parking_lot::RwLock<Option<RendezvousManager>>,
181
182 move_manager: parking_lot::RwLock<Option<MoveManager>>,
184
185 biscuit_cache: parking_lot::RwLock<Option<BiscuitCache>>,
187
188 #[cfg(not(target_arch = "wasm32"))]
193 network_connections: parking_lot::RwLock<HashMap<uuid::Uuid, SocketAddr>>,
194 #[cfg(target_arch = "wasm32")]
196 network_connections: parking_lot::RwLock<HashMap<uuid::Uuid, String>>,
197}
198
199#[derive(Clone, Default)]
200struct NoopBiscuitAuthorizationHandler;
201
202#[async_trait]
203impl BiscuitAuthorizationEffects for NoopBiscuitAuthorizationHandler {
204 async fn authorize_biscuit(
205 &self,
206 _token_data: &[u8],
207 _operation: AuthorizationOp,
208 _scope: &aura_core::types::scope::ResourceScope,
209 ) -> Result<AuthorizationDecision, AuthorizationError> {
210 Ok(AuthorizationDecision {
211 authorized: true,
212 reason: None,
213 })
214 }
215
216 async fn authorize_fact(
217 &self,
218 _token_data: &[u8],
219 _fact_type: &str,
220 _scope: &aura_core::types::scope::ResourceScope,
221 ) -> Result<bool, AuthorizationError> {
222 Ok(true)
223 }
224}
225
226impl AuraEffectSystem {
227 #[cfg(all(debug_assertions, not(target_arch = "wasm32")))]
228 fn maybe_start_deadlock_detector() {
229 static START: Once = Once::new();
230 START.call_once(|| {
231 std::thread::Builder::new()
232 .name("aura-deadlock-detector".to_string())
233 .spawn(|| loop {
234 std::thread::park_timeout(Duration::from_secs(10));
235 let deadlocks = parking_lot::deadlock::check_deadlock();
236 if !deadlocks.is_empty() {
237 tracing::error!(
239 count = deadlocks.len(),
240 "Detected parking_lot deadlock(s)"
241 );
242 }
243 })
244 .expect("failed to spawn deadlock detector thread");
245 });
246 }
247
248 #[cfg(any(not(debug_assertions), target_arch = "wasm32"))]
249 fn maybe_start_deadlock_detector() {}
250
251 fn normalize_test_config(mut config: AgentConfig) -> AgentConfig {
252 if config.storage.base_path == default_storage_path() {
257 static COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
258
259 let temp_root = std::env::temp_dir();
260 let mut attempt = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
261
262 let mut selected: Option<std::path::PathBuf> = None;
263 for _ in 0..256 {
264 let candidate = temp_root.join(format!("aura-agent-test-{attempt}"));
265 match std::fs::create_dir(&candidate) {
266 Ok(()) => {
267 selected = Some(candidate);
268 break;
269 }
270 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
271 attempt = attempt.wrapping_add(1);
272 continue;
273 }
274 Err(_) => {
275 selected = Some(candidate);
276 break;
277 }
278 }
279 }
280
281 config.storage.base_path =
282 selected.unwrap_or_else(|| temp_root.join("aura-agent-test-fallback"));
283 }
284 config
285 }
286
287 fn build_internal(
300 config: AgentConfig,
301 composite: CompositeHandlerAdapter,
302 execution_mode: ExecutionMode,
303 crypto_seed: Option<[u8; 32]>,
304 shared_transport: Option<SharedTransport>,
305 shared_inbox: Option<Arc<RwLock<Vec<TransportEnvelope>>>>,
306 authority_id: AuthorityId,
307 ) -> Self {
308 Self::maybe_start_deadlock_detector();
309 let authority = authority_id;
310 let device_id = config.device_id();
311 let (journal_policy, journal_verifying_key) = Self::init_journal_policy(authority);
312 let test_mode = execution_mode.is_deterministic();
313 let harness_mode_enabled = std::env::var_os("AURA_HARNESS_MODE").is_some();
314
315 let crypto_handler = match crypto_seed {
317 Some(seed) => RealCryptoHandler::seeded(seed),
318 None => RealCryptoHandler::new(),
319 };
320 let random_rng = match crypto_seed {
321 Some(seed) => CryptoRng::deterministic(StdRng::from_seed(seed)),
322 None => CryptoRng::thread_local(),
323 };
324 let secure_storage_handler = Arc::new(RealSecureStorageHandler::with_base_path(
325 config.storage.base_path.clone(),
326 ));
327 let crypto = CryptoSubsystem::from_parts(
328 crypto_handler.clone(),
329 random_rng,
330 secure_storage_handler.clone(),
331 );
332
333 let auth_time = PhysicalTimeHandler::new();
335 let time_handler = EnhancedTimeHandler::new();
336 let authorization_handler = Self::init_authorization_handler(
337 authority,
338 &crypto_handler,
339 &journal_verifying_key,
340 &auth_time,
341 );
342 let encrypted_storage_config = {
343 let mut cfg = EncryptedStorageConfig::default()
344 .with_encryption_enabled(config.storage.encryption_enabled);
345 if config.storage.opaque_names {
346 cfg = cfg.with_opaque_names();
347 }
348
349 let _ = test_mode; cfg
351 };
352 let storage_handler = Arc::new(EncryptedStorage::new(
353 FilesystemStorageHandler::new(config.storage.base_path.clone()),
354 Arc::new(crypto_handler),
355 secure_storage_handler,
356 encrypted_storage_config,
357 ));
358 let leakage_handler =
359 aura_effects::leakage::ProductionLeakageHandler::with_storage(storage_handler.clone());
360 let tree_handler = PersistentTreeHandler::new(storage_handler.clone());
361 let sync_handler = PersistentSyncHandler::new(storage_handler.clone());
362
363 let transport_handler = aura_effects::transport::RealTransportHandler::default();
365 if shared_transport.is_some() && shared_inbox.is_some() {
367 tracing::warn!(
368 "Shared transport and shared inbox both provided; using shared transport"
369 );
370 }
371 if let Some(shared) = &shared_transport {
372 shared.register(authority);
373 }
374 let transport_inbox = shared_inbox.unwrap_or_else(|| {
375 shared_transport
376 .as_ref()
377 .map(|shared| shared.inbox_for(authority))
378 .unwrap_or_else(|| Arc::new(RwLock::new(Vec::new())))
379 });
380 let transport =
381 TransportSubsystem::from_parts(transport_handler, transport_inbox, shared_transport);
382
383 let indexed_journal = Arc::new(IndexedJournalHandler::with_capacity(100_000));
385 let fact_registry = Arc::new(build_fact_registry());
386 let journal = JournalSubsystem::from_parts(
387 indexed_journal,
388 fact_registry,
389 None, journal_policy,
391 journal_verifying_key,
392 );
393
394 let initial_biscuit_cache = {
398 use base64::Engine;
399 let engine = base64::engine::general_purpose::STANDARD;
400 let token_authority = aura_authorization::TokenAuthority::new(authority);
401 match token_authority.create_token(
402 authority,
403 crate::token_profiles::TokenCapabilityProfile::StandardDevice,
404 ) {
405 Ok(biscuit) => match biscuit.to_vec() {
406 Ok(token_bytes) => {
407 let root_pk_bytes = token_authority.root_public_key().to_bytes();
408 Some(BiscuitCache {
409 token_b64: engine.encode(&token_bytes),
410 root_pk_b64: engine.encode(root_pk_bytes),
411 })
412 }
413 Err(_) => None,
414 },
415 Err(_) => None,
416 }
417 };
418
419 let effect_system = Self {
420 config,
421 authority_id: authority,
422 execution_mode,
423 harness_mode_enabled,
424 crypto,
425 transport,
426 journal,
427 composite,
428 storage_handler,
429 tree_handler,
430 sync_handler,
431 time_handler,
432 logical_clock: Arc::new(LogicalClockManager::new(Some(device_id))),
433 order_clock: OrderClockHandler,
434 authorization_handler,
435 leakage_handler,
436 reactive_handler: ReactiveHandler::new(),
437 choreography_state: parking_lot::RwLock::new(ChoreographyState::default()),
438 vm_fragment_registry: parking_lot::RwLock::new(VmFragmentRegistry::default()),
439 lan_transport: parking_lot::RwLock::new(None),
440 rendezvous_manager: parking_lot::RwLock::new(None),
441 move_manager: parking_lot::RwLock::new(None),
442 biscuit_cache: parking_lot::RwLock::new(initial_biscuit_cache),
443 #[cfg(not(target_arch = "wasm32"))]
444 network_connections: parking_lot::RwLock::new(HashMap::new()),
445 #[cfg(target_arch = "wasm32")]
446 network_connections: parking_lot::RwLock::new(HashMap::new()),
447 };
448
449 tracing::info!(
450 authority_id = %effect_system.authority_id,
451 execution_mode = ?effect_system.execution_mode,
452 choreography_backend = crate::CHOREO_BACKEND,
453 choreography_session_mode = "per-session-task-bound",
454 active_choreography_sessions = effect_system.choreography_state.read().active_session_count(),
455 "initialized aura runtime effect system"
456 );
457
458 effect_system
459 }
460
461 pub fn is_testing(&self) -> bool {
463 self.execution_mode.is_deterministic()
464 }
465
466 pub fn is_test_mode(&self) -> bool {
468 matches!(self.execution_mode, ExecutionMode::Testing)
469 }
470
471 pub fn harness_mode_enabled(&self) -> bool {
473 self.harness_mode_enabled
474 }
475
476 fn ensure_mock_network(&self) -> Result<(), NetworkError> {
477 if self.execution_mode.is_deterministic() {
478 Ok(())
479 } else {
480 Err(NetworkError::NotImplemented)
481 }
482 }
483
484 fn ensure_mock_system(&self, operation: &str) -> Result<(), SystemError> {
485 if self.execution_mode.is_deterministic() {
486 Ok(())
487 } else {
488 Err(SystemError::OperationFailed {
489 message: format!("SystemEffects::{operation} not implemented for production"),
490 })
491 }
492 }
493
494 fn ensure_mock_effect_api(
495 &self,
496 operation: &str,
497 ) -> Result<(), aura_protocol::effects::EffectApiError> {
498 if self.execution_mode.is_deterministic() {
499 Ok(())
500 } else {
501 Err(aura_protocol::effects::EffectApiError::Backend {
502 error: format!("EffectApi::{operation} not implemented for production"),
503 })
504 }
505 }
506
507 pub fn reactive_handler(&self) -> ReactiveHandler {
509 self.reactive_handler.clone()
510 }
511
512 pub async fn export_tree_ops(
513 &self,
514 ) -> Result<Vec<aura_core::AttestedOp>, crate::core::AgentError> {
515 self.tree_handler
516 .export_ops()
517 .await
518 .map_err(|error| crate::core::AgentError::effects(error.to_string()))
519 }
520
521 pub async fn import_tree_ops(
522 &self,
523 ops: &[aura_core::AttestedOp],
524 ) -> Result<(), crate::core::AgentError> {
525 self.tree_handler
526 .import_ops(ops)
527 .await
528 .map_err(|error| crate::core::AgentError::effects(error.to_string()))
529 }
530
531 pub fn attach_fact_sink(&self, tx: mpsc::Sender<crate::reactive::FactSource>) {
535 self.journal.attach_fact_sink(tx);
536 }
537
538 pub fn attach_view_update_sender(
542 &self,
543 tx: tokio::sync::broadcast::Sender<crate::reactive::ViewUpdate>,
544 ) {
545 self.journal.attach_view_update_sender(tx);
546 }
547
548 pub(crate) fn current_runtime_choreography_session_id(
550 &self,
551 ) -> Option<crate::runtime::RuntimeChoreographySessionId> {
552 self.choreography_state.read().current_session_id()
553 }
554
555 pub(crate) fn set_current_runtime_choreography_protocol_id(
557 &self,
558 protocol_id: impl Into<String>,
559 ) -> Result<(), String> {
560 let session_id = self
561 .current_runtime_choreography_session_id()
562 .ok_or_else(|| {
563 "cannot attach a protocol id without an active choreography session".to_string()
564 })?;
565 self.choreography_state
566 .write()
567 .set_session_protocol_id(session_id, protocol_id)
568 }
569
570 pub(crate) fn claim_runtime_choreography_session_owner(
572 &self,
573 session_id: crate::runtime::RuntimeChoreographySessionId,
574 owner_label: impl Into<String>,
575 ) -> Result<crate::runtime::subsystems::choreography::SessionOwnerCapability, String> {
576 self.choreography_state
577 .write()
578 .claim_session_owner(session_id, owner_label)
579 .map_err(|error| error.to_string())
580 }
581
582 pub(crate) fn ensure_runtime_choreography_session_owner_capability(
584 &self,
585 session_id: crate::runtime::RuntimeChoreographySessionId,
586 expected_capability: &crate::runtime::subsystems::choreography::SessionOwnerCapability,
587 ) -> Result<(), String> {
588 self.choreography_state
589 .read()
590 .ensure_session_owner(session_id, expected_capability)
591 .map_err(|error| error.to_string())
592 }
593
594 pub(crate) fn current_runtime_choreography_session_owner_label(
596 &self,
597 ) -> Result<String, String> {
598 let session_id = self
599 .current_runtime_choreography_session_id()
600 .ok_or_else(|| {
601 "cannot resolve runtime session owner label without an active choreography session"
602 .to_string()
603 })?;
604 self.choreography_state
605 .read()
606 .session_owner(session_id)
607 .map(|owner| owner.owner_label.clone())
608 .ok_or_else(|| format!("runtime session {session_id} has no owner record"))
609 }
610
611 pub(crate) fn transfer_runtime_choreography_session_owner(
613 &self,
614 session_id: crate::runtime::RuntimeChoreographySessionId,
615 expected_capability: &crate::runtime::subsystems::choreography::SessionOwnerCapability,
616 next_owner_label: impl Into<String>,
617 next_scope: crate::runtime::subsystems::choreography::SessionOwnerCapabilityScope,
618 ) -> Result<crate::runtime::subsystems::choreography::SessionOwnerCapability, String> {
619 let next_owner_label = next_owner_label.into();
620 let mut choreography_state = self.choreography_state.write();
621 choreography_state
622 .ensure_session_owner(session_id, expected_capability)
623 .map_err(|error| error.to_string())?;
624
625 let transferred_fragments = self
626 .vm_fragment_registry
627 .write()
628 .transfer_session_if_present(
629 session_id,
630 &expected_capability.owner_label,
631 &next_owner_label,
632 )
633 .map_err(|error| error.to_string())?;
634
635 let next_capability = choreography_state
636 .transfer_session_owner(
637 session_id,
638 expected_capability,
639 next_owner_label.clone(),
640 next_scope,
641 )
642 .map_err(|error| error.to_string())?;
643
644 if transferred_fragments > 0 {
645 tracing::info!(
646 session_id = %session_id,
647 from_owner = %expected_capability.owner_label,
648 to_owner = %next_owner_label,
649 fragment_count = transferred_fragments,
650 "transferred runtime choreography session owner and fragment ownership together"
651 );
652 }
653
654 Ok(next_capability)
655 }
656
657 pub(crate) fn claim_vm_fragments_for_manifest(
659 &self,
660 owner_label: impl Into<String>,
661 manifest: &CompositionManifest,
662 ) -> Result<Vec<VmFragmentId>, String> {
663 let session_id = self
664 .current_runtime_choreography_session_id()
665 .ok_or_else(|| {
666 format!(
667 "cannot claim protocol fragments for protocol {} without an active choreography session",
668 manifest.protocol_id
669 )
670 })?;
671 let owner_label = owner_label.into();
672 let claimed = self
673 .vm_fragment_registry
674 .write()
675 .claim_manifest(session_id, owner_label.clone(), manifest)
676 .map_err(|error| error.to_string())?;
677 tracing::debug!(
678 session_id = %session_id,
679 protocol_id = %manifest.protocol_id,
680 owner_label = %owner_label,
681 fragment_count = claimed.len(),
682 "claimed local protocol fragment ownership"
683 );
684 Ok(claimed)
685 }
686
687 pub(crate) fn release_vm_fragments_for_session(
689 &self,
690 session_id: crate::runtime::RuntimeChoreographySessionId,
691 ) -> Vec<VmFragmentId> {
692 let released = self
693 .vm_fragment_registry
694 .write()
695 .release_session(session_id);
696 if !released.is_empty() {
697 tracing::debug!(
698 session_id = %session_id,
699 fragment_count = released.len(),
700 "released local protocol fragment ownership"
701 );
702 }
703 released
704 }
705
706 pub(crate) fn release_vm_fragments(&self, fragment_ids: &[VmFragmentId]) -> Vec<VmFragmentId> {
708 let released = self
709 .vm_fragment_registry
710 .write()
711 .release_fragments(fragment_ids);
712 if !released.is_empty() {
713 tracing::debug!(
714 fragment_count = released.len(),
715 "released explicitly claimed local protocol fragments"
716 );
717 }
718 released
719 }
720
721 #[cfg(test)]
722 pub fn vm_fragment_snapshot(
723 &self,
724 ) -> Vec<(
725 VmFragmentId,
726 crate::runtime::subsystems::VmFragmentOwnerRecord,
727 )> {
728 self.vm_fragment_registry.read().snapshot()
729 }
730
731 pub async fn await_next_view_update(&self) {
744 use crate::reactive::ViewUpdate;
745
746 let Some(mut rx) = self.journal.subscribe_view_updates() else {
747 return;
748 };
749
750 let wait_for_batch = async {
751 loop {
752 match rx.recv().await {
753 Ok(ViewUpdate::Batch { .. }) => return,
754 Ok(_) => continue,
755 Err(_) => return, }
757 }
758 };
759
760 if self.harness_mode_enabled() {
761 let time = PhysicalTimeHandler::new();
762 if let Ok(started_at) = time.physical_time().await {
763 if let Ok(budget) = TimeoutBudget::from_start_and_timeout(
764 &started_at,
765 std::time::Duration::from_secs(2),
766 ) {
767 let _ = execute_with_timeout_budget(&time, &budget, || async {
768 wait_for_batch.await;
769 Ok::<(), ()>(())
770 })
771 .await;
772 }
773 }
774 return;
775 }
776
777 wait_for_batch.await;
778 }
779
780 pub fn requeue_envelope(&self, envelope: TransportEnvelope) {
781 self.queue_runtime_envelope(envelope);
782 }
783
784 pub(crate) fn queue_runtime_envelope(&self, envelope: TransportEnvelope) {
785 if let Some(session_id) = Self::choreography_session_id_from_envelope(&envelope) {
786 self.choreography_state
787 .write()
788 .queue_session_envelope(session_id, envelope);
789 return;
790 }
791
792 self.transport.queue_envelope(envelope);
793 }
794
795 fn choreography_session_id_from_envelope(
796 envelope: &TransportEnvelope,
797 ) -> Option<RuntimeChoreographySessionId> {
798 let is_choreography = envelope
799 .metadata
800 .get("content-type")
801 .is_some_and(|value| value == "application/aura-choreography");
802 if !is_choreography {
803 return None;
804 }
805 let session_id = envelope.metadata.get("session-id")?;
806 let Ok(session_uuid) = uuid::Uuid::parse_str(session_id) else {
807 return None;
808 };
809 Some(RuntimeChoreographySessionId::from_uuid(session_uuid))
810 }
811
812 pub fn attach_lan_transport(&self, service: Arc<LanTransportService>) {
813 *self.lan_transport.write() = Some(service);
814 }
815
816 pub fn lan_transport(&self) -> Option<Arc<LanTransportService>> {
817 self.lan_transport.read().clone()
818 }
819
820 pub fn attach_rendezvous_manager(&self, manager: RendezvousManager) {
821 *self.rendezvous_manager.write() = Some(manager);
822 }
823
824 pub fn rendezvous_manager(&self) -> Option<RendezvousManager> {
825 self.rendezvous_manager.read().clone()
826 }
827
828 pub fn attach_move_manager(&self, manager: MoveManager) {
829 *self.move_manager.write() = Some(manager);
830 }
831
832 pub fn move_manager(&self) -> Option<MoveManager> {
833 self.move_manager.read().clone()
834 }
835
836 pub async fn initialize_biscuit_cache(&self) {
843 use aura_core::effects::secure::{SecureStorageCapability, SecureStorageLocation};
844 use aura_core::effects::SecureStorageEffects;
845 use base64::Engine;
846
847 let location = SecureStorageLocation::biscuit_authority(&self.authority_id);
848 let caps = [SecureStorageCapability::Read];
849
850 match self.secure_retrieve(&location, &caps).await {
851 Ok(bytes) if bytes.len() > 32 => {
852 let engine = base64::engine::general_purpose::STANDARD;
853 let root_pk_b64 = engine.encode(&bytes[..32]);
854 let token_b64 = engine.encode(&bytes[32..]);
855
856 *self.biscuit_cache.write() = Some(BiscuitCache {
857 token_b64,
858 root_pk_b64,
859 });
860 tracing::info!("Biscuit cache initialized from secure storage");
861 }
862 Ok(bytes) => {
863 tracing::warn!(
864 len = bytes.len(),
865 "Biscuit data in secure storage too short (need >32 bytes)"
866 );
867 }
868 Err(_) => {
869 tracing::debug!("No biscuit found in secure storage (new account)");
870 }
871 }
872 }
873
874 pub fn set_biscuit_cache(&self, cache: BiscuitCache) {
876 *self.biscuit_cache.write() = Some(cache);
877 }
878
879 #[cfg(test)]
880 pub(crate) fn clear_biscuit_cache(&self) {
881 *self.biscuit_cache.write() = None;
882 }
883
884 pub fn biscuit_cache(&self) -> Option<BiscuitCache> {
886 self.biscuit_cache.read().clone()
887 }
888
889 pub async fn bootstrap_biscuit_tokens(&self, authority: &AuthorityId) -> Result<(), AuraError> {
895 use aura_core::effects::secure::{SecureStorageCapability, SecureStorageLocation};
896 use aura_core::effects::SecureStorageEffects;
897 use base64::Engine;
898
899 let token_authority = aura_authorization::TokenAuthority::new(*authority);
900 let biscuit = token_authority
901 .create_token(
902 *authority,
903 crate::token_profiles::TokenCapabilityProfile::StandardDevice,
904 )
905 .map_err(|e| AuraError::internal(format!("Failed to create Biscuit token: {e}")))?;
906
907 let token_bytes = biscuit
908 .to_vec()
909 .map_err(|e| AuraError::internal(format!("Failed to serialize Biscuit: {e}")))?;
910 let root_pk_bytes = token_authority.root_public_key().to_bytes();
911
912 let mut storage_bytes = Vec::with_capacity(32 + token_bytes.len());
914 storage_bytes.extend_from_slice(&root_pk_bytes);
915 storage_bytes.extend_from_slice(&token_bytes);
916
917 let location = SecureStorageLocation::biscuit_authority(authority);
918 let caps = vec![SecureStorageCapability::Write];
919 self.secure_store(&location, &storage_bytes, &caps).await?;
920
921 let engine = base64::engine::general_purpose::STANDARD;
923 self.set_biscuit_cache(BiscuitCache {
924 token_b64: engine.encode(&token_bytes),
925 root_pk_b64: engine.encode(root_pk_bytes),
926 });
927
928 tracing::info!(%authority, "Biscuit authorization tokens bootstrapped");
929 Ok(())
930 }
931
932 async fn publish_typed_facts(&self, facts: Vec<TypedFact>) -> Result<(), AuraError> {
933 if !self.journal.has_fact_sink() {
934 return Ok(());
935 }
936
937 self.journal
938 .publish_facts(crate::reactive::FactSource::Journal(facts))
939 .await
940 .map_err(|error| AuraError::internal(error.to_string()))?;
941
942 Ok(())
943 }
944
945 fn typed_fact_storage_prefix(authority_id: AuthorityId) -> String {
946 format!("{}/{}/", TYPED_FACT_STORAGE_PREFIX, authority_id)
947 }
948
949 fn typed_fact_storage_key(
950 authority_id: AuthorityId,
951 order: &aura_core::time::OrderTime,
952 ) -> String {
953 format!(
954 "{}{}",
955 Self::typed_fact_storage_prefix(authority_id),
956 hex::encode(order.0)
957 )
958 }
959
960 pub async fn commit_relational_facts(
964 &self,
965 facts: Vec<RelationalFact>,
966 ) -> Result<Vec<TypedFact>, AuraError> {
967 if facts.is_empty() {
968 return Ok(vec![]);
969 }
970
971 let mut committed: Vec<TypedFact> = Vec::with_capacity(facts.len());
972 for rel in facts {
973 let order = self
974 .order_time()
975 .await
976 .map_err(|e| AuraError::internal(format!("order_time: {e}")))?;
977
978 let fact = TypedFact::new(
979 order.clone(),
980 aura_core::time::TimeStamp::OrderClock(order.clone()),
981 FactContent::Relational(rel),
982 );
983
984 let key = Self::typed_fact_storage_key(self.authority_id, &order);
985 let bytes = aura_core::util::serialization::to_vec(&fact)
986 .map_err(|e| AuraError::internal(format!("serialize fact: {e}")))?;
987 self.store(&key, bytes)
988 .await
989 .map_err(|e| AuraError::storage(format!("persist fact: {e}")))?;
990
991 committed.push(fact);
992 }
993
994 self.publish_typed_facts(committed.clone()).await?;
996
997 Ok(committed)
998 }
999
1000 pub async fn commit_relational_facts_with_options(
1004 &self,
1005 facts: Vec<RelationalFact>,
1006 options: FactOptions,
1007 ) -> Result<Vec<TypedFact>, AuraError> {
1008 if facts.is_empty() {
1009 return Ok(vec![]);
1010 }
1011
1012 let mut committed: Vec<TypedFact> = Vec::with_capacity(facts.len());
1013 for rel in facts {
1014 let order = self
1015 .order_time()
1016 .await
1017 .map_err(|e| AuraError::internal(format!("order_time: {e}")))?;
1018
1019 let mut fact = TypedFact::new(
1020 order.clone(),
1021 aura_core::time::TimeStamp::OrderClock(order.clone()),
1022 FactContent::Relational(rel),
1023 );
1024
1025 if options.request_acks {
1027 fact = fact.with_ack_tracking();
1028 }
1029 if let Some(agreement) = &options.initial_agreement {
1030 fact = fact.with_agreement(agreement.clone());
1031 }
1032
1033 let key = Self::typed_fact_storage_key(self.authority_id, &order);
1034 let bytes = aura_core::util::serialization::to_vec(&fact)
1035 .map_err(|e| AuraError::internal(format!("serialize fact: {e}")))?;
1036 self.store(&key, bytes)
1037 .await
1038 .map_err(|e| AuraError::storage(format!("persist fact: {e}")))?;
1039
1040 committed.push(fact);
1041 }
1042
1043 self.publish_typed_facts(committed.clone()).await?;
1045
1046 Ok(committed)
1047 }
1048
1049 pub async fn commit_generic_fact_bytes(
1051 &self,
1052 context_id: ContextId,
1053 binding_type: aura_core::types::facts::FactTypeId,
1054 binding_data: Vec<u8>,
1055 ) -> Result<TypedFact, AuraError> {
1056 let envelope = aura_core::types::facts::FactEnvelope {
1057 type_id: binding_type,
1058 schema_version: 1,
1059 encoding: aura_core::types::facts::FactEncoding::DagCbor,
1060 payload: binding_data,
1061 };
1062 let rel = RelationalFact::Generic {
1063 context_id,
1064 envelope,
1065 };
1066 let mut committed = self.commit_relational_facts(vec![rel]).await?;
1067 Ok(committed
1068 .pop()
1069 .unwrap_or_else(|| unreachable!("commit_relational_facts committed exactly one")))
1070 }
1071
1072 pub async fn load_committed_facts(
1074 &self,
1075 authority_id: AuthorityId,
1076 ) -> Result<Vec<TypedFact>, AuraError> {
1077 let prefix = Self::typed_fact_storage_prefix(authority_id);
1078 let mut keys = self
1079 .list_keys(Some(&prefix))
1080 .await
1081 .map_err(|e| AuraError::storage(format!("list_keys: {e}")))?;
1082 keys.sort();
1083
1084 let mut facts = Vec::new();
1085 for key in keys {
1086 let Some(bytes) = self
1087 .retrieve(&key)
1088 .await
1089 .map_err(|e| AuraError::storage(format!("retrieve: {e}")))?
1090 else {
1091 continue;
1092 };
1093
1094 let fact: TypedFact = aura_core::util::serialization::from_slice(&bytes)
1095 .map_err(|e| AuraError::internal(format!("deserialize fact: {e}")))?;
1096 facts.push(fact);
1097 }
1098
1099 facts.sort();
1100 Ok(facts)
1101 }
1102
1103 pub async fn has_dkg_transcript_commit(
1105 &self,
1106 authority_id: AuthorityId,
1107 context_id: ContextId,
1108 epoch: u64,
1109 ) -> Result<bool, AuraError> {
1110 let facts = self.load_committed_facts(authority_id).await?;
1111 for fact in facts {
1112 let FactContent::Relational(RelationalFact::Protocol(
1113 ProtocolRelationalFact::DkgTranscriptCommit(commit),
1114 )) = &fact.content
1115 else {
1116 continue;
1117 };
1118
1119 if commit.context == context_id && commit.epoch == epoch {
1120 return Ok(true);
1121 }
1122 }
1123 Ok(false)
1124 }
1125
1126 pub async fn latest_dkg_transcript_commit(
1128 &self,
1129 authority_id: AuthorityId,
1130 context_id: ContextId,
1131 ) -> Result<Option<DkgTranscriptCommit>, AuraError> {
1132 let facts = self.load_committed_facts(authority_id).await?;
1133 let mut latest: Option<DkgTranscriptCommit> = None;
1134 for fact in facts {
1135 let FactContent::Relational(RelationalFact::Protocol(
1136 ProtocolRelationalFact::DkgTranscriptCommit(commit),
1137 )) = &fact.content
1138 else {
1139 continue;
1140 };
1141
1142 if commit.context != context_id {
1143 continue;
1144 }
1145
1146 match &latest {
1147 Some(existing) if existing.epoch >= commit.epoch => {}
1148 _ => latest = Some(commit.clone()),
1149 }
1150 }
1151 Ok(latest)
1152 }
1153
1154 const TEST_CRYPTO_SEED: [u8; 32] = [42u8; 32];
1157
1158 pub fn new(
1160 config: AgentConfig,
1161 authority_id: AuthorityId,
1162 ) -> Result<Self, crate::core::AgentError> {
1163 let config = Self::normalize_test_config(config);
1164 let composite = CompositeHandlerAdapter::for_testing(config.device_id());
1165 Ok(Self::build_internal(
1166 config,
1167 composite,
1168 ExecutionMode::Testing,
1169 Some(Self::TEST_CRYPTO_SEED),
1170 None, None, authority_id,
1173 ))
1174 }
1175
1176 pub fn production(
1178 config: AgentConfig,
1179 authority_id: AuthorityId,
1180 ) -> Result<Self, crate::core::AgentError> {
1181 let mut composite = CompositeHandlerAdapter::for_production(config.device_id());
1182 composite
1183 .composite_mut()
1184 .register_all(RegisterAllOptions::allow_impure())
1185 .map_err(|e| crate::core::AgentError::effects(e.to_string()))?;
1186 Ok(Self::build_internal(
1188 config,
1189 composite,
1190 ExecutionMode::Production,
1191 None,
1192 None,
1193 None,
1194 authority_id,
1195 ))
1196 }
1197
1198 fn identity_from_location(location: &Location<'_>) -> String {
1199 format!(
1200 "{}:{}:{}",
1201 location.file(),
1202 location.line(),
1203 location.column()
1204 )
1205 }
1206
1207 fn derive_test_seed(identity: &str, extra_salt: u64) -> u64 {
1208 let seed_material = format!("{TEST_SEED_DERIVATION_DOMAIN}:{identity}:{extra_salt}");
1209 let digest = aura_hash(seed_material.as_bytes());
1210 let mut seed_bytes = [0u8; 8];
1211 seed_bytes.copy_from_slice(&digest[..8]);
1212 u64::from_le_bytes(seed_bytes)
1213 }
1214
1215 fn derive_test_authority(seed: u64) -> AuthorityId {
1216 let authority_material = format!("{TEST_SEED_DERIVATION_DOMAIN}:authority:{seed}");
1217 AuthorityId::new_from_entropy(aura_hash(authority_material.as_bytes()))
1218 }
1219
1220 fn register_test_seed(
1221 seed: u64,
1222 identity: &str,
1223 location: &Location<'_>,
1224 ) -> Result<(), crate::core::AgentError> {
1225 let registry = TEST_SEED_REGISTRY.get_or_init(|| parking_lot::Mutex::new(HashMap::new()));
1226 let usage = TestSeedUsage {
1227 identity: identity.to_string(),
1228 location: Self::identity_from_location(location),
1229 };
1230 let mut guard = registry.lock();
1231 if let Some(existing) = guard.get(&seed) {
1232 return Err(crate::core::AgentError::effects(format!(
1233 "duplicate deterministic test seed {} detected (first: {} @ {}, second: {} @ {}). \
1234 Use unique test identities or simulation_for_test_with_salt(...) to disambiguate.",
1235 seed, existing.identity, existing.location, usage.identity, usage.location
1236 )));
1237 }
1238 guard.insert(seed, usage);
1239 Ok(())
1240 }
1241
1242 #[track_caller]
1243 fn allocate_test_seed_with_identity(
1244 test_identity: &str,
1245 extra_salt: u64,
1246 ) -> Result<u64, crate::core::AgentError> {
1247 let location = Location::caller();
1248 let scoped_identity = format!(
1249 "{}::{}",
1250 test_identity,
1251 Self::identity_from_location(location)
1252 );
1253 let seed = Self::derive_test_seed(&scoped_identity, extra_salt);
1254 Self::register_test_seed(seed, &scoped_identity, location)?;
1255 Ok(seed)
1256 }
1257
1258 #[track_caller]
1259 fn allocate_test_seed(extra_salt: u64) -> Result<u64, crate::core::AgentError> {
1260 let location = Location::caller();
1261 let identity = Self::identity_from_location(location);
1262 let seed = Self::derive_test_seed(&identity, extra_salt);
1263 Self::register_test_seed(seed, &identity, location)?;
1264 Ok(seed)
1265 }
1266
1267 #[track_caller]
1272 #[allow(clippy::disallowed_methods)]
1273 pub fn simulation_for_test(config: &AgentConfig) -> Result<Self, crate::core::AgentError> {
1274 let seed = Self::allocate_test_seed(0)?;
1275 Self::simulation(config, seed, Self::derive_test_authority(seed))
1276 }
1277
1278 #[track_caller]
1281 #[allow(clippy::disallowed_methods)]
1282 pub fn simulation_for_test_with_salt(
1283 config: &AgentConfig,
1284 extra_salt: u64,
1285 ) -> Result<Self, crate::core::AgentError> {
1286 let seed = Self::allocate_test_seed(extra_salt)?;
1287 Self::simulation(config, seed, Self::derive_test_authority(seed))
1288 }
1289
1290 #[track_caller]
1292 #[allow(clippy::disallowed_methods)]
1293 pub fn simulation_for_named_test(
1294 config: &AgentConfig,
1295 test_identity: &str,
1296 ) -> Result<Self, crate::core::AgentError> {
1297 let seed = Self::allocate_test_seed_with_identity(test_identity, 0)?;
1298 Self::simulation(config, seed, Self::derive_test_authority(seed))
1299 }
1300
1301 #[track_caller]
1303 #[allow(clippy::disallowed_methods)]
1304 pub fn simulation_for_named_test_with_salt(
1305 config: &AgentConfig,
1306 test_identity: &str,
1307 extra_salt: u64,
1308 ) -> Result<Self, crate::core::AgentError> {
1309 let seed = Self::allocate_test_seed_with_identity(test_identity, extra_salt)?;
1310 Self::simulation(config, seed, Self::derive_test_authority(seed))
1311 }
1312
1313 #[track_caller]
1315 #[allow(clippy::disallowed_methods)]
1316 pub fn simulation_for_test_for_authority(
1317 config: &AgentConfig,
1318 authority_id: AuthorityId,
1319 ) -> Result<Self, crate::core::AgentError> {
1320 let seed = Self::allocate_test_seed(0)?;
1321 Self::simulation_for_authority(config, seed, authority_id)
1322 }
1323
1324 #[track_caller]
1326 #[allow(clippy::disallowed_methods)]
1327 pub fn simulation_for_test_for_authority_with_salt(
1328 config: &AgentConfig,
1329 authority_id: AuthorityId,
1330 extra_salt: u64,
1331 ) -> Result<Self, crate::core::AgentError> {
1332 let seed = Self::allocate_test_seed(extra_salt)?;
1333 Self::simulation_for_authority(config, seed, authority_id)
1334 }
1335
1336 #[track_caller]
1338 #[allow(clippy::disallowed_methods)]
1339 pub fn simulation_for_test_with_shared_transport(
1340 config: &AgentConfig,
1341 shared_transport: SharedTransport,
1342 ) -> Result<Self, crate::core::AgentError> {
1343 let seed = Self::allocate_test_seed(0)?;
1344 Self::simulation_with_shared_transport(
1345 config,
1346 seed,
1347 Self::derive_test_authority(seed),
1348 shared_transport,
1349 )
1350 }
1351
1352 #[track_caller]
1354 #[allow(clippy::disallowed_methods)]
1355 pub fn simulation_for_test_with_shared_transport_for_authority(
1356 config: &AgentConfig,
1357 authority_id: AuthorityId,
1358 shared_transport: SharedTransport,
1359 ) -> Result<Self, crate::core::AgentError> {
1360 let seed = Self::allocate_test_seed(0)?;
1361 Self::simulation_with_shared_transport_for_authority(
1362 config,
1363 seed,
1364 authority_id,
1365 shared_transport,
1366 )
1367 }
1368
1369 pub fn testing(
1373 config: &AgentConfig,
1374 authority_id: AuthorityId,
1375 ) -> Result<Self, crate::core::AgentError> {
1376 let composite = CompositeHandlerAdapter::for_testing(config.device_id());
1377 Ok(Self::build_internal(
1378 Self::normalize_test_config(config.clone()),
1379 composite,
1380 ExecutionMode::Testing,
1381 Some(Self::TEST_CRYPTO_SEED),
1382 None, None, authority_id,
1385 ))
1386 }
1387
1388 pub fn testing_with_shared_transport(
1393 config: &AgentConfig,
1394 authority_id: AuthorityId,
1395 shared_transport: SharedTransport,
1396 ) -> Result<Self, crate::core::AgentError> {
1397 let composite = CompositeHandlerAdapter::for_testing(config.device_id());
1398 Ok(Self::build_internal(
1399 Self::normalize_test_config(config.clone()),
1400 composite,
1401 ExecutionMode::Testing,
1402 Some(Self::TEST_CRYPTO_SEED),
1403 Some(shared_transport),
1404 None, authority_id,
1406 ))
1407 }
1408
1409 pub fn simulation(
1411 config: &AgentConfig,
1412 seed: u64,
1413 authority_id: AuthorityId,
1414 ) -> Result<Self, crate::core::AgentError> {
1415 let config = Self::normalize_test_config(config.clone());
1416 let composite = CompositeHandlerAdapter::for_simulation(config.device_id(), seed);
1417 let mut crypto_seed = [0u8; 32];
1419 crypto_seed[0..8].copy_from_slice(&seed.to_le_bytes());
1420 Ok(Self::build_internal(
1421 config,
1422 composite,
1423 ExecutionMode::Simulation { seed },
1424 Some(crypto_seed),
1425 None, None, authority_id,
1428 ))
1429 }
1430
1431 pub fn simulation_with_shared_transport(
1437 config: &AgentConfig,
1438 seed: u64,
1439 authority_id: AuthorityId,
1440 shared_transport: SharedTransport,
1441 ) -> Result<Self, crate::core::AgentError> {
1442 let config = Self::normalize_test_config(config.clone());
1443 let composite = CompositeHandlerAdapter::for_simulation(config.device_id(), seed);
1444 let mut crypto_seed = [0u8; 32];
1446 crypto_seed[0..8].copy_from_slice(&seed.to_le_bytes());
1447 Ok(Self::build_internal(
1448 config,
1449 composite,
1450 ExecutionMode::Simulation { seed },
1451 Some(crypto_seed),
1452 Some(shared_transport),
1453 None, authority_id,
1455 ))
1456 }
1457
1458 pub fn simulation_with_shared_inbox(
1463 config: &AgentConfig,
1464 seed: u64,
1465 authority_id: AuthorityId,
1466 shared_inbox: Arc<RwLock<Vec<TransportEnvelope>>>,
1467 ) -> Result<Self, crate::core::AgentError> {
1468 let config = Self::normalize_test_config(config.clone());
1469 let composite = CompositeHandlerAdapter::for_simulation(config.device_id(), seed);
1470 let mut crypto_seed = [0u8; 32];
1471 crypto_seed[0..8].copy_from_slice(&seed.to_le_bytes());
1472 Ok(Self::build_internal(
1473 config,
1474 composite,
1475 ExecutionMode::Simulation { seed },
1476 Some(crypto_seed),
1477 None, Some(shared_inbox),
1479 authority_id,
1480 ))
1481 }
1482
1483 pub fn production_for_authority(
1485 config: AgentConfig,
1486 authority_id: AuthorityId,
1487 ) -> Result<Self, crate::core::AgentError> {
1488 Self::production(config, authority_id)
1489 }
1490
1491 #[allow(clippy::disallowed_methods)]
1495 pub fn testing_for_authority(
1496 config: &AgentConfig,
1497 authority_id: AuthorityId,
1498 ) -> Result<Self, crate::core::AgentError> {
1499 Self::testing(config, authority_id)
1500 }
1501
1502 #[allow(clippy::disallowed_methods)]
1504 pub fn simulation_for_authority(
1505 config: &AgentConfig,
1506 seed: u64,
1507 authority_id: AuthorityId,
1508 ) -> Result<Self, crate::core::AgentError> {
1509 Self::simulation(config, seed, authority_id)
1510 }
1511
1512 pub fn simulation_with_shared_transport_for_authority(
1514 config: &AgentConfig,
1515 seed: u64,
1516 authority_id: AuthorityId,
1517 shared_transport: SharedTransport,
1518 ) -> Result<Self, crate::core::AgentError> {
1519 Self::simulation_with_shared_transport(config, seed, authority_id, shared_transport)
1520 }
1521
1522 pub fn simulation_with_shared_inbox_for_authority(
1524 config: &AgentConfig,
1525 seed: u64,
1526 authority_id: AuthorityId,
1527 shared_inbox: Arc<RwLock<Vec<TransportEnvelope>>>,
1528 ) -> Result<Self, crate::core::AgentError> {
1529 Self::simulation_with_shared_inbox(config, seed, authority_id, shared_inbox)
1530 }
1531
1532 pub fn config(&self) -> &AgentConfig {
1534 &self.config
1535 }
1536
1537 pub fn composite(&self) -> &CompositeHandlerAdapter {
1539 &self.composite
1540 }
1541
1542 pub fn time_effects(&self) -> &EnhancedTimeHandler {
1544 &self.time_handler
1545 }
1546
1547 pub fn fact_registry(&self) -> Arc<FactRegistry> {
1549 self.journal.fact_registry()
1550 }
1551
1552 pub fn indexed_journal(&self) -> Arc<IndexedJournalHandler> {
1557 self.journal.indexed_journal()
1558 }
1559
1560 fn init_journal_policy(
1562 authority_id: AuthorityId,
1563 ) -> (
1564 Option<(Biscuit, BiscuitAuthorizationBridge)>,
1565 Option<Vec<u8>>,
1566 ) {
1567 let issuer = aura_authorization::TokenAuthority::new(authority_id);
1568 let token = issuer.create_token(
1569 authority_id,
1570 crate::token_profiles::TokenCapabilityProfile::StandardDevice,
1571 );
1572
1573 match token {
1574 Ok(token) => {
1575 let bridge =
1576 BiscuitAuthorizationBridge::new(issuer.root_public_key(), authority_id);
1577 let verifying_key = issuer.root_public_key().to_bytes().to_vec();
1578 (Some((token, bridge)), Some(verifying_key))
1579 }
1580 Err(_) => (None, None),
1581 }
1582 }
1583
1584 fn init_authorization_handler(
1586 authority: AuthorityId,
1587 crypto_handler: &RealCryptoHandler,
1588 verifying_key: &Option<Vec<u8>>,
1589 time_handler: &PhysicalTimeHandler,
1590 ) -> aura_authorization::effects::WotAuthorizationHandler<RealCryptoHandler> {
1591 if let Some(bytes) = verifying_key {
1592 if let Ok(public_key) = PublicKey::from_bytes(bytes) {
1593 let handler = aura_authorization::effects::WotAuthorizationHandler::new(
1594 crypto_handler.clone(),
1595 public_key,
1596 authority,
1597 );
1598 let time_handler = time_handler.clone();
1599 return handler.with_time_provider(Arc::new(move || {
1600 time_handler.physical_time_now_ms() / 1000
1601 }));
1602 }
1603 }
1604
1605 let handler =
1606 aura_authorization::effects::WotAuthorizationHandler::new_mock(crypto_handler.clone());
1607 let time_handler = time_handler.clone();
1608 handler.with_time_provider(Arc::new(move || time_handler.physical_time_now_ms() / 1000))
1609 }
1610
1611 fn journal_handler(
1613 &self,
1614 ) -> aura_journal::JournalHandler<
1615 RealCryptoHandler,
1616 Arc<
1617 EncryptedStorage<FilesystemStorageHandler, RealCryptoHandler, RealSecureStorageHandler>,
1618 >,
1619 NoopBiscuitAuthorizationHandler,
1620 > {
1621 let authorization = self
1622 .journal
1623 .journal_policy()
1624 .and_then(|(token, _bridge)| token.to_vec().ok())
1625 .map(|bytes| (bytes, NoopBiscuitAuthorizationHandler));
1626
1627 aura_journal::JournalHandlerFactory::create(
1628 self.authority_id,
1629 self.crypto.handler().clone(),
1630 self.storage_handler.clone(),
1631 authorization,
1632 self.journal.journal_verifying_key().map(|s| s.to_vec()),
1633 None, )
1635 }
1636}
1637
1638#[cfg(test)]
1639mod tests {
1640 use super::*;
1641 use aura_core::effects::ThresholdSigningEffects;
1642 use aura_core::types::identifiers::ContextId;
1643 use aura_guards::GuardContextProvider;
1644 use aura_protocol::amp::AmpJournalEffects;
1645 use aura_protocol::effects::SyncEffects;
1646 use aura_protocol::effects::TreeEffects;
1647
1648 #[tokio::test]
1649 async fn test_frost_integration_through_effect_system() {
1650 let config = AgentConfig::default();
1651 let effect_system = crate::testing::simulation_effect_system(&config);
1652
1653 let result = effect_system.frost_generate_keys(2, 3).await;
1655 assert!(result.is_ok(), "FROST key generation should succeed");
1656
1657 let key_gen_result = result.unwrap();
1658 assert_eq!(
1659 key_gen_result.key_packages.len(),
1660 3,
1661 "Should have 3 key packages for 3 signers"
1662 );
1663 assert!(
1664 !key_gen_result.public_key_package.is_empty(),
1665 "Public key package should not be empty"
1666 );
1667
1668 let first_key_package = &key_gen_result.key_packages[0];
1670 let nonces_result = effect_system.frost_generate_nonces(first_key_package).await;
1671 assert!(
1672 nonces_result.is_ok(),
1673 "FROST nonce generation should succeed: {:?}",
1674 nonces_result.err()
1675 );
1676
1677 let nonces = nonces_result.unwrap();
1678 assert!(!nonces.is_empty(), "Nonces should not be empty");
1679 }
1680
1681 #[tokio::test]
1682 async fn test_frost_seeded_determinism() {
1683 let identity = "runtime/effects:test_frost_seeded_determinism";
1684 let seed_a = AuraEffectSystem::derive_test_seed(identity, 0);
1685 let seed_b = AuraEffectSystem::derive_test_seed(identity, 0);
1686 let seed_c = AuraEffectSystem::derive_test_seed(identity, 1);
1687 assert_eq!(seed_a, seed_b, "same identity/salt must be deterministic");
1688 assert_ne!(
1689 seed_a, seed_c,
1690 "different salt must produce a different seed"
1691 );
1692 }
1693
1694 #[tokio::test]
1695 async fn test_duplicate_seed_registration_is_rejected() {
1696 let config = AgentConfig::default();
1697 let mut attempts = Vec::new();
1698 for _ in 0..2 {
1699 attempts.push(AuraEffectSystem::simulation_for_named_test_with_salt(
1700 &config, "dup-seed", 7,
1701 ));
1702 }
1703 assert!(
1704 attempts[0].is_ok(),
1705 "first deterministic allocation should succeed"
1706 );
1707 assert!(
1708 attempts[1].is_err(),
1709 "duplicate deterministic seed must be rejected"
1710 );
1711 }
1712
1713 #[tokio::test]
1714 async fn test_guard_effect_system_enables_amp_journal_effects() {
1715 let config = AgentConfig::default();
1716 let effect_system = crate::testing::simulation_effect_system(&config);
1717
1718 let context = ContextId::new_from_entropy([1u8; 32]);
1720 let _journal = effect_system.fetch_context_journal(context).await.unwrap();
1721
1722 assert!(effect_system.get_metadata("authority_id").is_some());
1724 assert!(effect_system.get_metadata("execution_mode").is_some());
1725 assert!(effect_system.get_metadata("device_id").is_some());
1726
1727 assert!(effect_system.can_perform_operation("test_operation"));
1729 }
1730
1731 #[test]
1732 fn test_simulation_uses_isolated_storage_for_default_config() {
1733 let config = AgentConfig::default();
1734 let effect_system = crate::testing::simulation_effect_system(&config);
1735
1736 assert_ne!(
1737 effect_system.config().storage.base_path,
1738 default_storage_path()
1739 );
1740 }
1741
1742 #[tokio::test]
1743 async fn test_tree_and_sync_handlers_are_wired() {
1744 let config = AgentConfig::default();
1745 let effect_system = crate::testing::simulation_effect_system(&config);
1746
1747 let state = effect_system.get_current_state().await.unwrap();
1749 assert_eq!(state.epoch, aura_core::Epoch::initial()); let commitment = effect_system.get_current_commitment().await.unwrap();
1751 assert_eq!(
1754 state.root_commitment,
1755 *commitment.as_bytes(),
1756 "empty tree state should agree with current commitment query"
1757 );
1758
1759 let digest = effect_system.get_oplog_digest().await.unwrap();
1762 let missing_from_empty = effect_system
1763 .get_missing_ops(&aura_protocol::effects::BloomDigest::empty())
1764 .await
1765 .unwrap();
1766 assert_eq!(
1767 digest.cids.len(),
1768 missing_from_empty.len(),
1769 "sync digest should match missing-op projection from an empty remote"
1770 );
1771 }
1772
1773 #[tokio::test]
1774 async fn test_threshold_queries_use_threshold_config_metadata() {
1775 let config = AgentConfig::default();
1776 let effect_system = crate::testing::simulation_effect_system(&config);
1777 let authority = AuthorityId::new_from_entropy([33u8; 32]);
1778
1779 effect_system
1780 .bootstrap_authority(&authority)
1781 .await
1782 .expect("bootstrap should persist threshold config metadata");
1783
1784 let threshold_config = effect_system
1785 .threshold_config(&authority)
1786 .await
1787 .expect("threshold config should be readable");
1788 assert_eq!(threshold_config.threshold, 1);
1789 assert_eq!(threshold_config.total_participants, 1);
1790
1791 let threshold_state = effect_system
1792 .threshold_state(&authority)
1793 .await
1794 .expect("threshold state should be readable");
1795 assert_eq!(threshold_state.epoch, 0);
1796 assert_eq!(
1797 threshold_state.agreement_mode,
1798 aura_core::threshold::AgreementMode::Provisional
1799 );
1800 }
1801}
1802
1803impl AuraEffectSystem {
1808 pub fn device_id(&self) -> aura_core::DeviceId {
1809 self.config.device_id
1810 }
1811
1812 async fn get_current_epoch(&self, authority: &AuthorityId) -> u64 {
1816 let location = SecureStorageLocation::new("epoch_state", format!("{}", authority));
1817 let caps = vec![SecureStorageCapability::Read];
1818
1819 match self
1820 .crypto
1821 .secure_storage()
1822 .secure_retrieve(&location, &caps)
1823 .await
1824 {
1825 Ok(data) if data.len() >= 8 => {
1826 let bytes: [u8; 8] = data[..8].try_into().unwrap_or([0u8; 8]);
1827 u64::from_le_bytes(bytes)
1828 }
1829 _ => 0, }
1831 }
1832
1833 async fn set_current_epoch(
1835 &self,
1836 authority: &AuthorityId,
1837 epoch: u64,
1838 ) -> Result<(), AuraError> {
1839 let location = SecureStorageLocation::new("epoch_state", format!("{}", authority));
1840 let caps = vec![
1841 SecureStorageCapability::Read,
1842 SecureStorageCapability::Write,
1843 ];
1844
1845 let data = epoch.to_le_bytes().to_vec();
1846 self.crypto
1847 .secure_storage()
1848 .secure_store(&location, &data, &caps)
1849 .await
1850 .map_err(|e| AuraError::storage(format!("Failed to store epoch state: {}", e)))
1851 }
1852
1853 async fn delete_epoch_keys(
1855 &self,
1856 authority: &AuthorityId,
1857 epoch: u64,
1858 ) -> Result<(), AuraError> {
1859 let delete_caps = vec![SecureStorageCapability::Delete];
1860
1861 let shares_location =
1863 SecureStorageLocation::new("participant_shares", format!("{}/{}", authority, epoch));
1864 let _ = self
1865 .crypto
1866 .secure_storage()
1867 .secure_delete(&shares_location, &delete_caps)
1868 .await;
1869
1870 let pubkey_location = SecureStorageLocation::with_sub_key(
1872 "threshold_pubkey",
1873 format!("{}", authority),
1874 format!("{}", epoch),
1875 );
1876 let _ = self
1877 .crypto
1878 .secure_storage()
1879 .secure_delete(&pubkey_location, &delete_caps)
1880 .await;
1881
1882 let metadata_location = SecureStorageLocation::with_sub_key(
1884 "threshold_config",
1885 format!("{}", authority),
1886 format!("{}", epoch),
1887 );
1888 let _ = self
1889 .crypto
1890 .secure_storage()
1891 .secure_delete(&metadata_location, &delete_caps)
1892 .await;
1893
1894 tracing::debug!(?authority, epoch, "Deleted keys for epoch");
1895 Ok(())
1896 }
1897
1898 async fn store_threshold_config_metadata(
1904 &self,
1905 authority: &AuthorityId,
1906 epoch: u64,
1907 threshold: u16,
1908 total_participants: u16,
1909 participants: &[aura_core::threshold::ParticipantIdentity],
1910 agreement_mode: aura_core::threshold::AgreementMode,
1911 ) -> Result<(), AuraError> {
1912 let metadata = ThresholdConfigMetadata {
1913 threshold_k: threshold,
1914 total_n: total_participants,
1915 participants: participants.to_vec(),
1916 mode: if threshold >= 2 {
1917 SigningMode::Threshold
1918 } else {
1919 SigningMode::SingleSigner
1920 },
1921 agreement_mode,
1922 };
1923
1924 let location = SecureStorageLocation::with_sub_key(
1925 "threshold_config",
1926 format!("{}", authority),
1927 format!("{}", epoch),
1928 );
1929 let caps = vec![
1930 SecureStorageCapability::Read,
1931 SecureStorageCapability::Write,
1932 ];
1933
1934 let data = serde_json::to_vec(&metadata).map_err(|e| {
1935 AuraError::storage(format!("Failed to serialize threshold config: {}", e))
1936 })?;
1937 self.crypto
1938 .secure_storage()
1939 .secure_store(&location, &data, &caps)
1940 .await
1941 .map_err(|e| AuraError::storage(format!("Failed to store threshold config: {}", e)))?;
1942
1943 tracing::debug!(
1944 ?authority,
1945 epoch,
1946 threshold,
1947 total_participants,
1948 num_participants = participants.len(),
1949 "Stored threshold config metadata"
1950 );
1951 Ok(())
1952 }
1953
1954 async fn get_threshold_config_metadata(
1958 &self,
1959 authority: &AuthorityId,
1960 epoch: u64,
1961 ) -> Option<ThresholdConfigMetadata> {
1962 let location = SecureStorageLocation::with_sub_key(
1963 "threshold_config",
1964 format!("{}", authority),
1965 format!("{}", epoch),
1966 );
1967 let caps = vec![SecureStorageCapability::Read];
1968
1969 match self
1970 .crypto
1971 .secure_storage()
1972 .secure_retrieve(&location, &caps)
1973 .await
1974 {
1975 Ok(data) => match serde_json::from_slice(&data) {
1976 Ok(metadata) => Some(metadata),
1977 Err(e) => {
1978 tracing::warn!(
1979 ?authority,
1980 epoch,
1981 error = %e,
1982 "Failed to deserialize threshold config"
1983 );
1984 None
1985 }
1986 },
1987 Err(_) => None,
1988 }
1989 }
1990}
1991
1992#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1997struct ThresholdConfigMetadata {
1998 threshold_k: u16,
2000 total_n: u16,
2002 #[serde(default)]
2004 participants: Vec<aura_core::threshold::ParticipantIdentity>,
2005 mode: SigningMode,
2007 #[serde(default)]
2009 agreement_mode: aura_core::threshold::AgreementMode,
2010}
2011
2012impl ThresholdConfigMetadata {
2013 fn resolved_participants(&self) -> Vec<aura_core::threshold::ParticipantIdentity> {
2014 self.participants.clone()
2015 }
2016}
2017
2018impl std::fmt::Debug for AuraEffectSystem {
2020 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2021 f.debug_struct("AuraEffectSystem")
2022 .field("config", &self.config)
2023 .field("authority_id", &self.authority_id)
2024 .field("journal_policy", &self.journal.journal_policy().is_some())
2025 .field(
2026 "journal_verifying_key",
2027 &self.journal.journal_verifying_key().is_some(),
2028 )
2029 .finish_non_exhaustive()
2030 }
2031}