1use async_trait::async_trait;
30use aura_core::effects::{AdmissionError, CapabilityKey, RuntimeCapabilityEffects};
31use aura_core::hash::hash;
32use aura_core::types::identifiers::{AuthorityId, ContextId};
33use aura_core::util::serialization::{from_slice, to_vec};
34use aura_core::TimeoutBudget;
35use aura_core::{CapabilityName, FlowCost};
36use aura_guards::guards::journal::JournalCoupler;
37use aura_guards::prelude::{GuardContextProvider, GuardEffects, SendGuardChain};
38use aura_guards::LeakageBudget;
39use aura_mpst::upstream::runtime::{ChoreoHandler, ChoreoHandlerExt, ChoreoResult};
40use aura_mpst::upstream::runtime::{
41 ChoreographyError as TelltaleChoreographyError, LabelId, RoleId,
42};
43use aura_mpst::{CompositionManifest, GeneratedChoreographyRuntime};
44use aura_protocol::admission::manifest_admission_requirements;
45use aura_protocol::effects::{
46 ChoreographicEffects, ChoreographicRole, ChoreographyError as AuraChoreographyError, RoleIndex,
47};
48use std::any::Any;
49use std::collections::{HashMap, VecDeque};
50use std::sync::Arc;
51use std::time::Duration;
52use tracing::{debug, warn};
53use uuid::Uuid;
54
55#[derive(Debug, Clone)]
57pub struct MessageGuardRequirements {
58 pub capability: CapabilityName,
60 pub flow_cost: FlowCost,
62 pub leakage_budget: Option<LeakageBudget>,
64 pub journal_facts: Option<String>,
66 pub journal_merge: bool,
68}
69
70impl MessageGuardRequirements {
71 pub fn new(capability: CapabilityName, flow_cost: impl Into<FlowCost>) -> Self {
73 Self {
74 capability,
75 flow_cost: flow_cost.into(),
76 leakage_budget: None,
77 journal_facts: None,
78 journal_merge: false,
79 }
80 }
81
82 pub fn with_leakage_budget(mut self, budget: LeakageBudget) -> Self {
84 self.leakage_budget = Some(budget);
85 self
86 }
87
88 pub fn with_journal_facts(mut self, facts: impl Into<String>) -> Self {
90 self.journal_facts = Some(facts.into());
91 self
92 }
93
94 pub fn with_journal_merge(mut self, merge: bool) -> Self {
96 self.journal_merge = merge;
97 self
98 }
99}
100
101#[derive(Debug, Clone, Default)]
105pub struct GuardConfig {
106 pub context_id: Option<ContextId>,
108 guards: HashMap<String, MessageGuardRequirements>,
110}
111
112impl GuardConfig {
113 pub fn new(context_id: ContextId) -> Self {
115 Self {
116 context_id: Some(context_id),
117 guards: HashMap::new(),
118 }
119 }
120
121 pub fn none() -> Self {
123 Self::default()
124 }
125
126 pub fn with_message_guard<M: 'static>(
130 mut self,
131 capability: CapabilityName,
132 flow_cost: impl Into<FlowCost>,
133 ) -> Self {
134 let type_name = std::any::type_name::<M>().to_string();
135 self.guards.insert(
136 type_name,
137 MessageGuardRequirements::new(capability, flow_cost),
138 );
139 self
140 }
141
142 pub fn with_named_guard(
144 mut self,
145 type_name: impl Into<String>,
146 requirements: MessageGuardRequirements,
147 ) -> Self {
148 self.guards.insert(type_name.into(), requirements);
149 self
150 }
151
152 pub fn get_guard(&self, type_name: &str) -> Option<&MessageGuardRequirements> {
154 self.guards.get(type_name)
155 }
156
157 pub fn is_enabled(&self) -> bool {
159 self.context_id.is_some()
160 }
161}
162
163#[derive(Debug)]
165pub struct AuraHandlerAdapter {
166 authority_id: AuthorityId,
167}
168
169impl AuraHandlerAdapter {
170 pub fn new(authority_id: AuthorityId) -> Self {
171 Self { authority_id }
172 }
173
174 pub fn authority_id(&self) -> AuthorityId {
175 self.authority_id
176 }
177}
178
179#[derive(Debug, Clone)]
181pub struct ReceivedMessage {
182 pub type_name: &'static str,
183 pub bytes: Vec<u8>,
184}
185
186#[derive(Debug)]
188pub struct MessageRequest<R: RoleId> {
189 #[allow(dead_code)]
190 pub to: R,
191 pub type_name: &'static str,
192}
193
194type MessageProviderFn<R> =
205 Box<dyn FnMut(MessageRequest<R>, &[ReceivedMessage]) -> Option<Box<dyn Any + Send>> + Send>;
206
207type BranchDeciderFn = Box<dyn FnMut(&[ReceivedMessage]) -> Option<String> + Send>;
208
209struct RuntimeAdmissionConfig {
210 capability_effects: Arc<dyn RuntimeCapabilityEffects>,
211 required_capabilities: Vec<CapabilityKey>,
212 admitted: bool,
213}
214
215#[allow(dead_code)]
230pub struct AuraProtocolAdapter<E, R>
231where
232 E: ChoreographicEffects
233 + GuardEffects
234 + GuardContextProvider
235 + aura_core::PhysicalTimeEffects
236 + aura_core::TimeEffects,
237 R: RoleId,
238{
239 effects: Arc<E>,
240 authority_id: AuthorityId,
241 self_role: R,
242 role_map: HashMap<R, AuthorityId>,
243 role_families: HashMap<String, Vec<R>>,
245 outbound: VecDeque<Box<dyn Any + Send>>,
246 branch_choices: VecDeque<R::Label>,
247 received: Vec<ReceivedMessage>,
248 message_provider: Option<MessageProviderFn<R>>,
249 branch_decider: Option<BranchDeciderFn>,
250 guard_config: GuardConfig,
252 journal_coupler: Option<JournalCoupler>,
254 runtime_admission: Option<RuntimeAdmissionConfig>,
256}
257
258#[allow(dead_code)]
259impl<E, R> AuraProtocolAdapter<E, R>
260where
261 E: ChoreographicEffects
262 + GuardEffects
263 + GuardContextProvider
264 + aura_core::PhysicalTimeEffects
265 + aura_core::TimeEffects,
266 R: RoleId,
267{
268 pub fn new(
277 effects: Arc<E>,
278 authority_id: AuthorityId,
279 self_role: R,
280 role_map: HashMap<R, AuthorityId>,
281 ) -> Self {
282 Self {
283 effects,
284 authority_id,
285 self_role,
286 role_map,
287 role_families: HashMap::new(),
288 outbound: VecDeque::new(),
289 branch_choices: VecDeque::new(),
290 received: Vec::new(),
291 message_provider: None,
292 branch_decider: None,
293 guard_config: GuardConfig::default(),
294 journal_coupler: None,
295 runtime_admission: None,
296 }
297 }
298
299 pub fn with_role_family(mut self, family: impl Into<String>, roles: Vec<R>) -> Self {
312 self.role_families.insert(family.into(), roles);
313 self
314 }
315
316 pub fn with_role_families(
318 mut self,
319 families: impl IntoIterator<Item = (String, Vec<R>)>,
320 ) -> Self {
321 for (family, roles) in families {
322 self.role_families.insert(family, roles);
323 }
324 self
325 }
326
327 pub fn with_message_provider(
329 mut self,
330 provider: impl FnMut(MessageRequest<R>, &[ReceivedMessage]) -> Option<Box<dyn Any + Send>>
331 + Send
332 + 'static,
333 ) -> Self {
334 self.message_provider = Some(Box::new(provider));
335 self
336 }
337
338 pub fn with_branch_decider(
340 mut self,
341 decider: impl FnMut(&[ReceivedMessage]) -> Option<String> + Send + 'static,
342 ) -> Self {
343 self.branch_decider = Some(Box::new(decider));
344 self
345 }
346
347 pub fn with_guard_config(mut self, config: GuardConfig) -> Self {
362 self.guard_config = config;
363 self
364 }
365
366 pub fn guard_config(&self) -> &GuardConfig {
368 &self.guard_config
369 }
370
371 pub fn with_journal_coupler(mut self, coupler: JournalCoupler) -> Self {
393 self.journal_coupler = Some(coupler);
394 self
395 }
396
397 pub fn with_runtime_capability_admission(
399 mut self,
400 capability_effects: Arc<dyn RuntimeCapabilityEffects>,
401 required_capabilities: Vec<CapabilityKey>,
402 ) -> Self {
403 self.runtime_admission = Some(RuntimeAdmissionConfig {
404 capability_effects,
405 required_capabilities,
406 admitted: false,
407 });
408 self
409 }
410
411 pub fn with_manifest_runtime_capability_admission(
413 mut self,
414 capability_effects: Arc<dyn RuntimeCapabilityEffects>,
415 manifest: &CompositionManifest,
416 ) -> Result<Self, AdmissionError> {
417 let admission = manifest_admission_requirements(manifest)?;
418 self.runtime_admission = Some(RuntimeAdmissionConfig {
419 capability_effects,
420 required_capabilities: admission.required_runtime_capabilities,
421 admitted: false,
422 });
423 Ok(self)
424 }
425
426 pub fn journal_coupler(&self) -> Option<&JournalCoupler> {
428 self.journal_coupler.as_ref()
429 }
430
431 pub fn push_message<M: Send + 'static>(&mut self, message: M) {
432 self.outbound.push_back(Box::new(message));
433 }
434
435 pub fn push_branch_choice(&mut self, label: R::Label) {
436 self.branch_choices.push_back(label);
437 }
438
439 pub fn received_messages(&self) -> &[ReceivedMessage] {
440 &self.received
441 }
442
443 async fn ensure_runtime_admission(&mut self) -> Result<(), AuraChoreographyError> {
444 let Some(admission) = self.runtime_admission.as_mut() else {
445 return Ok(());
446 };
447
448 if admission.admitted {
449 return Ok(());
450 }
451
452 let inventory = admission
453 .capability_effects
454 .capability_inventory()
455 .await
456 .map_err(|_| AuraChoreographyError::AuthorizationFailed {
457 reason: "TheoremPackAdmission inventory unavailable".to_string(),
458 })?;
459 let _inventory_size = inventory.len();
460
461 if let Err(error) = admission
462 .capability_effects
463 .require_capabilities(&admission.required_capabilities)
464 .await
465 {
466 let reason = match &error {
467 AdmissionError::MissingCapability { capability } => format!(
468 "TheoremPackAdmission failed: missing runtime capability ref={}",
469 capability_key_ref(capability.as_str())
470 ),
471 AdmissionError::MissingTheoremPack { theorem_pack } => format!(
472 "TheoremPackAdmission failed: missing theorem pack {}",
473 capability_key_ref(theorem_pack)
474 ),
475 AdmissionError::MissingTheoremPackCapability {
476 theorem_pack: _,
477 capability,
478 } => format!(
479 "TheoremPackAdmission failed: missing theorem-pack capability ref={}",
480 capability_key_ref(capability.as_str())
481 ),
482 AdmissionError::MissingRuntimeContracts => {
483 "TheoremPackAdmission failed: missing runtime contracts".to_string()
484 }
485 AdmissionError::InventoryUnavailable { .. } => {
486 "TheoremPackAdmission failed: inventory unavailable".to_string()
487 }
488 AdmissionError::Internal { .. } => "TheoremPackAdmission failed".to_string(),
489 };
490 return Err(AuraChoreographyError::AuthorizationFailed { reason });
491 }
492 admission.admitted = true;
493 Ok(())
494 }
495
496 pub async fn start_session(&mut self, session_id: Uuid) -> Result<(), AuraChoreographyError> {
497 self.ensure_runtime_admission().await?;
498 let mut roles = Vec::new();
499 let self_role = self.map_role(self.self_role)?;
500 roles.push(self_role);
501
502 for role in self.role_map.keys() {
503 let mapped = self.map_role(*role)?;
504 if mapped != self_role {
505 roles.push(mapped);
506 }
507 }
508
509 self.effects.start_session(session_id, roles).await
510 }
511
512 pub async fn end_session(&mut self) -> Result<(), AuraChoreographyError> {
513 self.effects.end_session().await
514 }
515
516 async fn send_value<M: serde::Serialize + Send + Sync>(
517 &mut self,
518 to: R,
519 msg: &M,
520 ) -> Result<(), AuraChoreographyError> {
521 self.ensure_runtime_admission().await?;
522 let role = self.map_role(to)?;
523 let payload = to_vec(msg).map_err(|err| AuraChoreographyError::SerializationFailed {
524 reason: err.to_string(),
525 })?;
526
527 let mut guard_receipt: Option<aura_core::Receipt> = None;
528 let type_name = std::any::type_name::<M>();
529
530 if let Some(context_id) = self.guard_config.context_id {
531 if let Some(guard_req) = self.guard_config.get_guard(type_name) {
532 let peer = if to == self.self_role {
533 self.authority_id
534 } else {
535 *self
536 .role_map
537 .get(&to)
538 .ok_or_else(|| AuraChoreographyError::RoleNotFound {
539 role: ChoreographicRole::new(
540 aura_core::DeviceId::new_from_entropy([0u8; 32]),
541 self.authority_id,
542 RoleIndex::new(0).expect("role index"),
543 ),
544 })?
545 };
546
547 debug!(
548 message_type = type_name,
549 capability = %guard_req.capability,
550 flow_cost = ?guard_req.flow_cost,
551 context = ?context_id,
552 peer = ?peer,
553 "Evaluating guard chain for choreography send"
554 );
555
556 let mut guard = SendGuardChain::new(
557 guard_req.capability.clone(),
558 context_id,
559 peer,
560 guard_req.flow_cost,
561 );
562
563 if let Some(ref leakage) = guard_req.leakage_budget {
564 guard = guard.with_leakage_budget(leakage.clone());
565 }
566
567 let result = guard.evaluate(&*self.effects).await.map_err(|e| {
568 AuraChoreographyError::InternalError {
569 message: format!("guard chain evaluation failed: {e}"),
570 }
571 })?;
572
573 if !result.authorized {
574 return Err(AuraChoreographyError::ProtocolViolation {
575 message: result
576 .denial_reason
577 .unwrap_or_else(|| "guard chain denied send".to_string()),
578 });
579 }
580
581 debug!(
582 message_type = type_name,
583 receipt = ?result.receipt,
584 "Guard chain authorized choreography send"
585 );
586
587 guard_receipt = result.receipt;
588 }
589 }
590
591 self.effects.send_to_role_bytes(role, payload).await?;
592
593 if let Some(ref coupler) = self.journal_coupler {
594 debug!(
595 message_type = type_name,
596 "Coupling journal operations after choreography send"
597 );
598
599 let coupling_result = coupler
600 .couple_with_send(&*self.effects, &guard_receipt)
601 .await
602 .map_err(|e| {
603 warn!(
604 message_type = type_name,
605 error = %e,
606 "Journal coupling failed after send (message was sent)"
607 );
608 AuraChoreographyError::InternalError {
609 message: format!("journal coupling failed: {e}"),
610 }
611 })?;
612
613 if coupling_result.operations_applied > 0usize {
614 debug!(
615 message_type = type_name,
616 operations_applied = coupling_result.operations_applied,
617 "Journal coupling completed successfully"
618 );
619 }
620 }
621
622 Ok(())
623 }
624
625 async fn recv_value<M: serde::de::DeserializeOwned + Send>(
626 &mut self,
627 from: R,
628 ) -> Result<M, AuraChoreographyError> {
629 self.ensure_runtime_admission().await?;
630 let role = self.map_role(from)?;
631 let payload = self.effects.receive_from_role_bytes(role).await?;
632 self.received.push(ReceivedMessage {
633 type_name: std::any::type_name::<M>(),
634 bytes: payload.clone(),
635 });
636 from_slice(&payload).map_err(|err| AuraChoreographyError::DeserializationFailed {
637 reason: err.to_string(),
638 })
639 }
640
641 fn map_role(&self, role: R) -> Result<ChoreographicRole, AuraChoreographyError> {
642 let authority_id = if role == self.self_role {
643 self.authority_id
644 } else {
645 *self
646 .role_map
647 .get(&role)
648 .ok_or_else(|| AuraChoreographyError::RoleNotFound {
649 role: ChoreographicRole::new(
650 aura_core::DeviceId::new_from_entropy([0u8; 32]),
651 self.authority_id,
652 RoleIndex::new(0).expect("role index"),
653 ),
654 })?
655 };
656
657 let role_index = role.role_index().unwrap_or(0);
658 let role_index =
659 RoleIndex::new(role_index).ok_or_else(|| AuraChoreographyError::ProtocolViolation {
660 message: format!("invalid role index: {role_index}"),
661 })?;
662
663 Ok(ChoreographicRole::for_authority(authority_id, role_index))
664 }
665}
666
667#[async_trait]
668impl<E, R> ChoreoHandler for AuraProtocolAdapter<E, R>
669where
670 E: ChoreographicEffects
671 + GuardEffects
672 + GuardContextProvider
673 + aura_core::PhysicalTimeEffects
674 + aura_core::TimeEffects,
675 R: RoleId,
676{
677 type Role = R;
678 type Endpoint = ();
679
680 async fn send<M: serde::Serialize + Send + Sync>(
681 &mut self,
682 _ep: &mut Self::Endpoint,
683 to: Self::Role,
684 msg: &M,
685 ) -> ChoreoResult<()> {
686 self.send_value(to, msg).await.map_err(map_runtime_error)
687 }
688
689 async fn recv<M: serde::de::DeserializeOwned + Send>(
690 &mut self,
691 _ep: &mut Self::Endpoint,
692 from: Self::Role,
693 ) -> ChoreoResult<M> {
694 self.recv_value(from).await.map_err(map_runtime_error)
695 }
696
697 async fn choose(
698 &mut self,
699 ep: &mut Self::Endpoint,
700 who: Self::Role,
701 label: <Self::Role as RoleId>::Label,
702 ) -> ChoreoResult<()> {
703 self.send(ep, who, &label.as_str().to_string()).await
704 }
705
706 async fn offer(
707 &mut self,
708 ep: &mut Self::Endpoint,
709 from: Self::Role,
710 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
711 let label: String = self.recv(ep, from).await?;
712 <Self::Role as RoleId>::Label::from_str(&label).ok_or_else(|| {
713 TelltaleChoreographyError::InvalidChoice {
714 expected: Vec::new(),
715 actual: label,
716 }
717 })
718 }
719
720 async fn with_timeout<F, T>(
721 &mut self,
722 _ep: &mut Self::Endpoint,
723 _at: Self::Role,
724 dur: Duration,
725 body: F,
726 ) -> ChoreoResult<T>
727 where
728 F: std::future::Future<Output = ChoreoResult<T>> + Send,
729 {
730 let started_at = self
731 .effects
732 .physical_time()
733 .await
734 .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
735 let budget = TimeoutBudget::from_start_and_timeout(&started_at, dur)
736 .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
737 let now = self
738 .effects
739 .physical_time()
740 .await
741 .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
742 let remaining = budget
743 .remaining_at(&now)
744 .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
745 let sleep_ms = remaining.as_millis().min(u128::from(u64::MAX)) as u64;
746 tokio::select! {
747 result = body => result,
748 _ = self.effects.sleep_ms(sleep_ms) => Err(TelltaleChoreographyError::Timeout(dur)),
749 }
750 }
751}
752
753#[async_trait]
754impl<E, R> ChoreoHandlerExt for AuraProtocolAdapter<E, R>
755where
756 E: ChoreographicEffects
757 + GuardEffects
758 + GuardContextProvider
759 + aura_core::PhysicalTimeEffects
760 + aura_core::TimeEffects,
761 R: RoleId,
762{
763 async fn setup(&mut self, _role: Self::Role) -> ChoreoResult<Self::Endpoint> {
764 Ok(())
765 }
766
767 async fn teardown(&mut self, _ep: Self::Endpoint) -> ChoreoResult<()> {
768 Ok(())
769 }
770}
771
772#[async_trait]
773impl<E, R> GeneratedChoreographyRuntime for AuraProtocolAdapter<E, R>
774where
775 E: ChoreographicEffects
776 + GuardEffects
777 + GuardContextProvider
778 + aura_core::PhysicalTimeEffects
779 + aura_core::TimeEffects,
780 R: RoleId,
781{
782 async fn provide_message<M: Send + 'static>(&mut self, to: Self::Role) -> ChoreoResult<M> {
783 self.ensure_runtime_admission()
784 .await
785 .map_err(map_runtime_error)?;
786 let boxed = match self.outbound.pop_front() {
787 Some(boxed) => boxed,
788 None => {
789 if let Some(provider) = self.message_provider.as_mut() {
790 provider(
791 MessageRequest {
792 to,
793 type_name: std::any::type_name::<M>(),
794 },
795 &self.received,
796 )
797 .ok_or_else(|| {
798 map_runtime_error(AuraChoreographyError::ProtocolViolation {
799 message: "message provider returned None".to_string(),
800 })
801 })?
802 } else {
803 return Err(map_runtime_error(
804 AuraChoreographyError::ProtocolViolation {
805 message: "no queued message for provide_message".to_string(),
806 },
807 ));
808 }
809 }
810 };
811
812 boxed.downcast::<M>().map(|msg| *msg).map_err(|_| {
813 map_runtime_error(AuraChoreographyError::ProtocolViolation {
814 message: format!(
815 "queued message type mismatch (expected {})",
816 std::any::type_name::<M>()
817 ),
818 })
819 })
820 }
821
822 async fn select_branch<L: LabelId>(&mut self, choices: &[L]) -> ChoreoResult<L> {
823 self.ensure_runtime_admission()
824 .await
825 .map_err(map_runtime_error)?;
826 let choice = match self.branch_choices.pop_front() {
827 Some(choice) => choice,
828 None => {
829 if let Some(decider) = self.branch_decider.as_mut() {
830 let label = decider(&self.received).ok_or_else(|| {
831 map_runtime_error(AuraChoreographyError::ProtocolViolation {
832 message: "branch decider returned None".to_string(),
833 })
834 })?;
835 let selected = choices
836 .iter()
837 .copied()
838 .find(|choice| choice.as_str() == label);
839 return selected.ok_or_else(|| {
840 map_runtime_error(AuraChoreographyError::ProtocolViolation {
841 message: format!("branch decider returned invalid label: {label}"),
842 })
843 });
844 }
845 return Err(map_runtime_error(
846 AuraChoreographyError::ProtocolViolation {
847 message: "no queued branch choice for select_branch".to_string(),
848 },
849 ));
850 }
851 };
852
853 let selected = choices
854 .iter()
855 .copied()
856 .find(|label| label.as_str() == choice.as_str());
857
858 selected.ok_or_else(|| {
859 map_runtime_error(AuraChoreographyError::ProtocolViolation {
860 message: "queued branch choice is not valid for this choice".to_string(),
861 })
862 })
863 }
864
865 fn resolve_family(&self, family: &str) -> ChoreoResult<Vec<Self::Role>> {
866 self.role_families
867 .get(family)
868 .cloned()
869 .ok_or_else(|| AuraChoreographyError::RoleFamilyNotFound {
870 family: family.to_string(),
871 })
872 .map_err(map_runtime_error)
873 .and_then(|roles| {
874 if roles.is_empty() {
875 Err(map_runtime_error(AuraChoreographyError::EmptyRoleFamily {
876 family: family.to_string(),
877 }))
878 } else {
879 Ok(roles)
880 }
881 })
882 }
883}
884
885fn map_runtime_error(error: AuraChoreographyError) -> TelltaleChoreographyError {
886 TelltaleChoreographyError::ExecutionError(error.to_string())
887}
888
889fn capability_key_ref(key: &str) -> String {
890 let digest = hash(key.as_bytes());
891 hex::encode(&digest[..8])
892}