Skip to main content

aura_agent/runtime/
choreography_adapter.rs

1//! Choreography adapter implementation
2//!
3//! Adapter for integrating with the choreographic programming
4//! system from aura-protocol in the authority-centric runtime.
5//!
6//! ## Role Family Resolution
7//!
8//! The adapter supports parameterized role families like `Witness[N]` from
9//! choreography definitions. Use `with_role_family()` to register role instances:
10//!
11//! ```ignore
12//! let adapter = AuraProtocolAdapter::new(effects, authority_id, self_role, role_map)
13//!     .with_role_family("Witness", witness_roles.clone());
14//! ```
15//!
16//! ## Guard Chain Enforcement
17//!
18//! The adapter supports guard chain enforcement for choreography sends. Configure
19//! guards using `with_guard_config()`:
20//!
21//! ```ignore
22//! let guard_config = GuardConfig::new(context_id)
23//!     .with_message_guard::<MyMessage>(aura_core::capability_name!("amp:send"), 100);
24//!
25//! let adapter = AuraProtocolAdapter::new(effects, authority_id, self_role, role_map)
26//!     .with_guard_config(guard_config);
27//! ```
28
29use async_trait::async_trait;
30use aura_core::effects::{AdmissionError, CapabilityKey, RuntimeCapabilityEffects};
31use aura_core::hash::hash;
32use aura_core::types::identifiers::{AuthorityId, ContextId};
33use aura_core::util::serialization::{from_slice, to_vec};
34use aura_core::TimeoutBudget;
35use aura_core::{CapabilityName, FlowCost};
36use aura_guards::guards::journal::JournalCoupler;
37use aura_guards::prelude::{GuardContextProvider, GuardEffects, SendGuardChain};
38use aura_guards::LeakageBudget;
39use aura_mpst::upstream::runtime::{ChoreoHandler, ChoreoHandlerExt, ChoreoResult};
40use aura_mpst::upstream::runtime::{
41    ChoreographyError as TelltaleChoreographyError, LabelId, RoleId,
42};
43use aura_mpst::{CompositionManifest, GeneratedChoreographyRuntime};
44use aura_protocol::admission::manifest_admission_requirements;
45use aura_protocol::effects::{
46    ChoreographicEffects, ChoreographicRole, ChoreographyError as AuraChoreographyError, RoleIndex,
47};
48use std::any::Any;
49use std::collections::{HashMap, VecDeque};
50use std::sync::Arc;
51use std::time::Duration;
52use tracing::{debug, warn};
53use uuid::Uuid;
54
55/// Guard requirements for a specific message type.
56#[derive(Debug, Clone)]
57pub struct MessageGuardRequirements {
58    /// Required canonical capability for sending this message.
59    pub capability: CapabilityName,
60    /// Flow cost for sending this message
61    pub flow_cost: FlowCost,
62    /// Optional leakage budget for this message
63    pub leakage_budget: Option<LeakageBudget>,
64    /// Journal facts to record after successful send (from choreography annotation)
65    pub journal_facts: Option<String>,
66    /// Whether to merge journal after this message (from choreography annotation)
67    pub journal_merge: bool,
68}
69
70impl MessageGuardRequirements {
71    /// Create guard requirements with capability and flow cost.
72    pub fn new(capability: CapabilityName, flow_cost: impl Into<FlowCost>) -> Self {
73        Self {
74            capability,
75            flow_cost: flow_cost.into(),
76            leakage_budget: None,
77            journal_facts: None,
78            journal_merge: false,
79        }
80    }
81
82    /// Add leakage budget to the guard requirements.
83    pub fn with_leakage_budget(mut self, budget: LeakageBudget) -> Self {
84        self.leakage_budget = Some(budget);
85        self
86    }
87
88    /// Set journal facts to record after successful send.
89    pub fn with_journal_facts(mut self, facts: impl Into<String>) -> Self {
90        self.journal_facts = Some(facts.into());
91        self
92    }
93
94    /// Enable journal merge after this message.
95    pub fn with_journal_merge(mut self, merge: bool) -> Self {
96        self.journal_merge = merge;
97        self
98    }
99}
100
101/// Configuration for guard chain enforcement in choreography execution.
102///
103/// Maps message type names to their guard requirements (capability + flow cost).
104#[derive(Debug, Clone, Default)]
105pub struct GuardConfig {
106    /// Context ID for guard evaluation
107    pub context_id: Option<ContextId>,
108    /// Map of message type name -> guard requirements
109    guards: HashMap<String, MessageGuardRequirements>,
110}
111
112impl GuardConfig {
113    /// Create a new guard config with the given context ID.
114    pub fn new(context_id: ContextId) -> Self {
115        Self {
116            context_id: Some(context_id),
117            guards: HashMap::new(),
118        }
119    }
120
121    /// Create an empty guard config (no guard enforcement).
122    pub fn none() -> Self {
123        Self::default()
124    }
125
126    /// Add guard requirements for a specific message type.
127    ///
128    /// The type name is derived from `std::any::type_name::<M>()`.
129    pub fn with_message_guard<M: 'static>(
130        mut self,
131        capability: CapabilityName,
132        flow_cost: impl Into<FlowCost>,
133    ) -> Self {
134        let type_name = std::any::type_name::<M>().to_string();
135        self.guards.insert(
136            type_name,
137            MessageGuardRequirements::new(capability, flow_cost),
138        );
139        self
140    }
141
142    /// Add guard requirements for a message type by name.
143    pub fn with_named_guard(
144        mut self,
145        type_name: impl Into<String>,
146        requirements: MessageGuardRequirements,
147    ) -> Self {
148        self.guards.insert(type_name.into(), requirements);
149        self
150    }
151
152    /// Get guard requirements for a message type.
153    pub fn get_guard(&self, type_name: &str) -> Option<&MessageGuardRequirements> {
154        self.guards.get(type_name)
155    }
156
157    /// Check if guard enforcement is enabled (context_id is set).
158    pub fn is_enabled(&self) -> bool {
159        self.context_id.is_some()
160    }
161}
162
163/// Adapter for choreography integration
164#[derive(Debug)]
165pub struct AuraHandlerAdapter {
166    authority_id: AuthorityId,
167}
168
169impl AuraHandlerAdapter {
170    pub fn new(authority_id: AuthorityId) -> Self {
171        Self { authority_id }
172    }
173
174    pub fn authority_id(&self) -> AuthorityId {
175        self.authority_id
176    }
177}
178
179/// Metadata captured for received choreography messages.
180#[derive(Debug, Clone)]
181pub struct ReceivedMessage {
182    pub type_name: &'static str,
183    pub bytes: Vec<u8>,
184}
185
186/// Request passed to dynamic message providers.
187#[derive(Debug)]
188pub struct MessageRequest<R: RoleId> {
189    #[allow(dead_code)]
190    pub to: R,
191    pub type_name: &'static str,
192}
193
194/// Runtime adapter used by generated Aura choreography runners.
195///
196/// This adapter implements upstream runtime handler traits plus Aura-owned
197/// generated-runtime hooks for message sourcing, branch selection, and
198/// parameterized role-family resolution.
199///
200/// ## Role Family Support
201///
202/// For protocols with parameterized roles (e.g., `Witness[N]`), use `with_role_family()`
203/// to register role instances that can be resolved during broadcast/collect operations.
204type MessageProviderFn<R> =
205    Box<dyn FnMut(MessageRequest<R>, &[ReceivedMessage]) -> Option<Box<dyn Any + Send>> + Send>;
206
207type BranchDeciderFn = Box<dyn FnMut(&[ReceivedMessage]) -> Option<String> + Send>;
208
209struct RuntimeAdmissionConfig {
210    capability_effects: Arc<dyn RuntimeCapabilityEffects>,
211    required_capabilities: Vec<CapabilityKey>,
212    admitted: bool,
213}
214
215/// Runtime adapter used by generated Aura choreography runners.
216///
217/// This adapter implements upstream runtime handler traits plus Aura-owned
218/// generated-runtime hooks for message sourcing, branch selection, and
219/// parameterized role-family resolution.
220///
221/// ## Features
222///
223/// - **Role Family Support**: For protocols with parameterized roles (e.g., `Witness[N]`),
224///   use `with_role_family()` to register role instances.
225/// - **Guard Chain Enforcement**: When configured via `with_guard_config()`, evaluates
226///   CapGuard and FlowGuard before each send operation.
227/// - **Journal Coupling**: When configured via `with_journal_coupler()`, records journal
228///   facts after successful sends.
229#[allow(dead_code)]
230pub struct AuraProtocolAdapter<E, R>
231where
232    E: ChoreographicEffects
233        + GuardEffects
234        + GuardContextProvider
235        + aura_core::PhysicalTimeEffects
236        + aura_core::TimeEffects,
237    R: RoleId,
238{
239    effects: Arc<E>,
240    authority_id: AuthorityId,
241    self_role: R,
242    role_map: HashMap<R, AuthorityId>,
243    /// Role family registry: maps family names (e.g., "Witness") to role instances
244    role_families: HashMap<String, Vec<R>>,
245    outbound: VecDeque<Box<dyn Any + Send>>,
246    branch_choices: VecDeque<R::Label>,
247    received: Vec<ReceivedMessage>,
248    message_provider: Option<MessageProviderFn<R>>,
249    branch_decider: Option<BranchDeciderFn>,
250    /// Guard chain configuration for send operations
251    guard_config: GuardConfig,
252    /// Journal coupler for fact recording after successful sends
253    journal_coupler: Option<JournalCoupler>,
254    /// Optional theorem-pack/runtime capability admission gate.
255    runtime_admission: Option<RuntimeAdmissionConfig>,
256}
257
258#[allow(dead_code)]
259impl<E, R> AuraProtocolAdapter<E, R>
260where
261    E: ChoreographicEffects
262        + GuardEffects
263        + GuardContextProvider
264        + aura_core::PhysicalTimeEffects
265        + aura_core::TimeEffects,
266    R: RoleId,
267{
268    /// Create a new protocol adapter.
269    ///
270    /// # Arguments
271    ///
272    /// * `effects` - The effect system implementation
273    /// * `authority_id` - The local authority's ID
274    /// * `self_role` - The role this adapter plays in the protocol
275    /// * `role_map` - Mapping from roles to authority IDs
276    pub fn new(
277        effects: Arc<E>,
278        authority_id: AuthorityId,
279        self_role: R,
280        role_map: HashMap<R, AuthorityId>,
281    ) -> Self {
282        Self {
283            effects,
284            authority_id,
285            self_role,
286            role_map,
287            role_families: HashMap::new(),
288            outbound: VecDeque::new(),
289            branch_choices: VecDeque::new(),
290            received: Vec::new(),
291            message_provider: None,
292            branch_decider: None,
293            guard_config: GuardConfig::default(),
294            journal_coupler: None,
295            runtime_admission: None,
296        }
297    }
298
299    /// Register a role family for broadcast/collect operations.
300    ///
301    /// This is used for protocols with parameterized roles like `Witness[N]`.
302    /// The family name should match the role name in the choreography definition.
303    ///
304    /// # Example
305    ///
306    /// ```ignore
307    /// // For a choreography with `roles Coordinator, Witness[N]`
308    /// let adapter = AuraProtocolAdapter::new(effects, auth_id, role, role_map)
309    ///     .with_role_family("Witness", vec![witness0, witness1, witness2]);
310    /// ```
311    pub fn with_role_family(mut self, family: impl Into<String>, roles: Vec<R>) -> Self {
312        self.role_families.insert(family.into(), roles);
313        self
314    }
315
316    /// Register multiple role families at once.
317    pub fn with_role_families(
318        mut self,
319        families: impl IntoIterator<Item = (String, Vec<R>)>,
320    ) -> Self {
321        for (family, roles) in families {
322            self.role_families.insert(family, roles);
323        }
324        self
325    }
326
327    /// Provide outbound messages dynamically when the queue is empty.
328    pub fn with_message_provider(
329        mut self,
330        provider: impl FnMut(MessageRequest<R>, &[ReceivedMessage]) -> Option<Box<dyn Any + Send>>
331            + Send
332            + 'static,
333    ) -> Self {
334        self.message_provider = Some(Box::new(provider));
335        self
336    }
337
338    /// Provide branch decisions dynamically based on received messages.
339    pub fn with_branch_decider(
340        mut self,
341        decider: impl FnMut(&[ReceivedMessage]) -> Option<String> + Send + 'static,
342    ) -> Self {
343        self.branch_decider = Some(Box::new(decider));
344        self
345    }
346
347    /// Configure guard chain enforcement for send operations.
348    ///
349    /// When configured, the adapter will evaluate guard chain requirements
350    /// (capability checks and flow budget charges) before each send operation.
351    ///
352    /// # Example
353    ///
354    /// ```ignore
355    /// let guard_config = GuardConfig::new(context_id)
356    ///     .with_message_guard::<MyMessage>(aura_core::capability_name!("amp:send"), 100);
357    ///
358    /// let adapter = AuraProtocolAdapter::new(effects, auth_id, role, role_map)
359    ///     .with_guard_config(guard_config);
360    /// ```
361    pub fn with_guard_config(mut self, config: GuardConfig) -> Self {
362        self.guard_config = config;
363        self
364    }
365
366    /// Get the current guard configuration.
367    pub fn guard_config(&self) -> &GuardConfig {
368        &self.guard_config
369    }
370
371    /// Configure journal coupler for fact recording after successful sends.
372    ///
373    /// When configured, the adapter will call `couple_with_send` after each
374    /// successful send operation to record journal facts per choreography annotations.
375    ///
376    /// # Example
377    ///
378    /// ```ignore
379    /// use aura_guards::guards::journal::{JournalCoupler, JournalCouplerBuilder};
380    /// use aura_mpst::journal::{JournalAnnotation, JournalOpType};
381    ///
382    /// let coupler = JournalCouplerBuilder::new()
383    ///     .with_annotation(
384    ///         "send_coupling",
385    ///         JournalAnnotation::add_facts("Protocol message sent"),
386    ///     )
387    ///     .build();
388    ///
389    /// let adapter = AuraProtocolAdapter::new(effects, auth_id, role, role_map)
390    ///     .with_journal_coupler(coupler);
391    /// ```
392    pub fn with_journal_coupler(mut self, coupler: JournalCoupler) -> Self {
393        self.journal_coupler = Some(coupler);
394        self
395    }
396
397    /// Configure runtime capability admission checks for this choreography execution.
398    pub fn with_runtime_capability_admission(
399        mut self,
400        capability_effects: Arc<dyn RuntimeCapabilityEffects>,
401        required_capabilities: Vec<CapabilityKey>,
402    ) -> Self {
403        self.runtime_admission = Some(RuntimeAdmissionConfig {
404            capability_effects,
405            required_capabilities,
406            admitted: false,
407        });
408        self
409    }
410
411    /// Configure runtime capability admission from one generated composition manifest.
412    pub fn with_manifest_runtime_capability_admission(
413        mut self,
414        capability_effects: Arc<dyn RuntimeCapabilityEffects>,
415        manifest: &CompositionManifest,
416    ) -> Result<Self, AdmissionError> {
417        let admission = manifest_admission_requirements(manifest)?;
418        self.runtime_admission = Some(RuntimeAdmissionConfig {
419            capability_effects,
420            required_capabilities: admission.required_runtime_capabilities,
421            admitted: false,
422        });
423        Ok(self)
424    }
425
426    /// Get a reference to the journal coupler if configured.
427    pub fn journal_coupler(&self) -> Option<&JournalCoupler> {
428        self.journal_coupler.as_ref()
429    }
430
431    pub fn push_message<M: Send + 'static>(&mut self, message: M) {
432        self.outbound.push_back(Box::new(message));
433    }
434
435    pub fn push_branch_choice(&mut self, label: R::Label) {
436        self.branch_choices.push_back(label);
437    }
438
439    pub fn received_messages(&self) -> &[ReceivedMessage] {
440        &self.received
441    }
442
443    async fn ensure_runtime_admission(&mut self) -> Result<(), AuraChoreographyError> {
444        let Some(admission) = self.runtime_admission.as_mut() else {
445            return Ok(());
446        };
447
448        if admission.admitted {
449            return Ok(());
450        }
451
452        let inventory = admission
453            .capability_effects
454            .capability_inventory()
455            .await
456            .map_err(|_| AuraChoreographyError::AuthorizationFailed {
457                reason: "TheoremPackAdmission inventory unavailable".to_string(),
458            })?;
459        let _inventory_size = inventory.len();
460
461        if let Err(error) = admission
462            .capability_effects
463            .require_capabilities(&admission.required_capabilities)
464            .await
465        {
466            let reason = match &error {
467                AdmissionError::MissingCapability { capability } => format!(
468                    "TheoremPackAdmission failed: missing runtime capability ref={}",
469                    capability_key_ref(capability.as_str())
470                ),
471                AdmissionError::MissingTheoremPack { theorem_pack } => format!(
472                    "TheoremPackAdmission failed: missing theorem pack {}",
473                    capability_key_ref(theorem_pack)
474                ),
475                AdmissionError::MissingTheoremPackCapability {
476                    theorem_pack: _,
477                    capability,
478                } => format!(
479                    "TheoremPackAdmission failed: missing theorem-pack capability ref={}",
480                    capability_key_ref(capability.as_str())
481                ),
482                AdmissionError::MissingRuntimeContracts => {
483                    "TheoremPackAdmission failed: missing runtime contracts".to_string()
484                }
485                AdmissionError::InventoryUnavailable { .. } => {
486                    "TheoremPackAdmission failed: inventory unavailable".to_string()
487                }
488                AdmissionError::Internal { .. } => "TheoremPackAdmission failed".to_string(),
489            };
490            return Err(AuraChoreographyError::AuthorizationFailed { reason });
491        }
492        admission.admitted = true;
493        Ok(())
494    }
495
496    pub async fn start_session(&mut self, session_id: Uuid) -> Result<(), AuraChoreographyError> {
497        self.ensure_runtime_admission().await?;
498        let mut roles = Vec::new();
499        let self_role = self.map_role(self.self_role)?;
500        roles.push(self_role);
501
502        for role in self.role_map.keys() {
503            let mapped = self.map_role(*role)?;
504            if mapped != self_role {
505                roles.push(mapped);
506            }
507        }
508
509        self.effects.start_session(session_id, roles).await
510    }
511
512    pub async fn end_session(&mut self) -> Result<(), AuraChoreographyError> {
513        self.effects.end_session().await
514    }
515
516    async fn send_value<M: serde::Serialize + Send + Sync>(
517        &mut self,
518        to: R,
519        msg: &M,
520    ) -> Result<(), AuraChoreographyError> {
521        self.ensure_runtime_admission().await?;
522        let role = self.map_role(to)?;
523        let payload = to_vec(msg).map_err(|err| AuraChoreographyError::SerializationFailed {
524            reason: err.to_string(),
525        })?;
526
527        let mut guard_receipt: Option<aura_core::Receipt> = None;
528        let type_name = std::any::type_name::<M>();
529
530        if let Some(context_id) = self.guard_config.context_id {
531            if let Some(guard_req) = self.guard_config.get_guard(type_name) {
532                let peer = if to == self.self_role {
533                    self.authority_id
534                } else {
535                    *self
536                        .role_map
537                        .get(&to)
538                        .ok_or_else(|| AuraChoreographyError::RoleNotFound {
539                            role: ChoreographicRole::new(
540                                aura_core::DeviceId::new_from_entropy([0u8; 32]),
541                                self.authority_id,
542                                RoleIndex::new(0).expect("role index"),
543                            ),
544                        })?
545                };
546
547                debug!(
548                    message_type = type_name,
549                    capability = %guard_req.capability,
550                    flow_cost = ?guard_req.flow_cost,
551                    context = ?context_id,
552                    peer = ?peer,
553                    "Evaluating guard chain for choreography send"
554                );
555
556                let mut guard = SendGuardChain::new(
557                    guard_req.capability.clone(),
558                    context_id,
559                    peer,
560                    guard_req.flow_cost,
561                );
562
563                if let Some(ref leakage) = guard_req.leakage_budget {
564                    guard = guard.with_leakage_budget(leakage.clone());
565                }
566
567                let result = guard.evaluate(&*self.effects).await.map_err(|e| {
568                    AuraChoreographyError::InternalError {
569                        message: format!("guard chain evaluation failed: {e}"),
570                    }
571                })?;
572
573                if !result.authorized {
574                    return Err(AuraChoreographyError::ProtocolViolation {
575                        message: result
576                            .denial_reason
577                            .unwrap_or_else(|| "guard chain denied send".to_string()),
578                    });
579                }
580
581                debug!(
582                    message_type = type_name,
583                    receipt = ?result.receipt,
584                    "Guard chain authorized choreography send"
585                );
586
587                guard_receipt = result.receipt;
588            }
589        }
590
591        self.effects.send_to_role_bytes(role, payload).await?;
592
593        if let Some(ref coupler) = self.journal_coupler {
594            debug!(
595                message_type = type_name,
596                "Coupling journal operations after choreography send"
597            );
598
599            let coupling_result = coupler
600                .couple_with_send(&*self.effects, &guard_receipt)
601                .await
602                .map_err(|e| {
603                    warn!(
604                        message_type = type_name,
605                        error = %e,
606                        "Journal coupling failed after send (message was sent)"
607                    );
608                    AuraChoreographyError::InternalError {
609                        message: format!("journal coupling failed: {e}"),
610                    }
611                })?;
612
613            if coupling_result.operations_applied > 0usize {
614                debug!(
615                    message_type = type_name,
616                    operations_applied = coupling_result.operations_applied,
617                    "Journal coupling completed successfully"
618                );
619            }
620        }
621
622        Ok(())
623    }
624
625    async fn recv_value<M: serde::de::DeserializeOwned + Send>(
626        &mut self,
627        from: R,
628    ) -> Result<M, AuraChoreographyError> {
629        self.ensure_runtime_admission().await?;
630        let role = self.map_role(from)?;
631        let payload = self.effects.receive_from_role_bytes(role).await?;
632        self.received.push(ReceivedMessage {
633            type_name: std::any::type_name::<M>(),
634            bytes: payload.clone(),
635        });
636        from_slice(&payload).map_err(|err| AuraChoreographyError::DeserializationFailed {
637            reason: err.to_string(),
638        })
639    }
640
641    fn map_role(&self, role: R) -> Result<ChoreographicRole, AuraChoreographyError> {
642        let authority_id = if role == self.self_role {
643            self.authority_id
644        } else {
645            *self
646                .role_map
647                .get(&role)
648                .ok_or_else(|| AuraChoreographyError::RoleNotFound {
649                    role: ChoreographicRole::new(
650                        aura_core::DeviceId::new_from_entropy([0u8; 32]),
651                        self.authority_id,
652                        RoleIndex::new(0).expect("role index"),
653                    ),
654                })?
655        };
656
657        let role_index = role.role_index().unwrap_or(0);
658        let role_index =
659            RoleIndex::new(role_index).ok_or_else(|| AuraChoreographyError::ProtocolViolation {
660                message: format!("invalid role index: {role_index}"),
661            })?;
662
663        Ok(ChoreographicRole::for_authority(authority_id, role_index))
664    }
665}
666
667#[async_trait]
668impl<E, R> ChoreoHandler for AuraProtocolAdapter<E, R>
669where
670    E: ChoreographicEffects
671        + GuardEffects
672        + GuardContextProvider
673        + aura_core::PhysicalTimeEffects
674        + aura_core::TimeEffects,
675    R: RoleId,
676{
677    type Role = R;
678    type Endpoint = ();
679
680    async fn send<M: serde::Serialize + Send + Sync>(
681        &mut self,
682        _ep: &mut Self::Endpoint,
683        to: Self::Role,
684        msg: &M,
685    ) -> ChoreoResult<()> {
686        self.send_value(to, msg).await.map_err(map_runtime_error)
687    }
688
689    async fn recv<M: serde::de::DeserializeOwned + Send>(
690        &mut self,
691        _ep: &mut Self::Endpoint,
692        from: Self::Role,
693    ) -> ChoreoResult<M> {
694        self.recv_value(from).await.map_err(map_runtime_error)
695    }
696
697    async fn choose(
698        &mut self,
699        ep: &mut Self::Endpoint,
700        who: Self::Role,
701        label: <Self::Role as RoleId>::Label,
702    ) -> ChoreoResult<()> {
703        self.send(ep, who, &label.as_str().to_string()).await
704    }
705
706    async fn offer(
707        &mut self,
708        ep: &mut Self::Endpoint,
709        from: Self::Role,
710    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
711        let label: String = self.recv(ep, from).await?;
712        <Self::Role as RoleId>::Label::from_str(&label).ok_or_else(|| {
713            TelltaleChoreographyError::InvalidChoice {
714                expected: Vec::new(),
715                actual: label,
716            }
717        })
718    }
719
720    async fn with_timeout<F, T>(
721        &mut self,
722        _ep: &mut Self::Endpoint,
723        _at: Self::Role,
724        dur: Duration,
725        body: F,
726    ) -> ChoreoResult<T>
727    where
728        F: std::future::Future<Output = ChoreoResult<T>> + Send,
729    {
730        let started_at = self
731            .effects
732            .physical_time()
733            .await
734            .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
735        let budget = TimeoutBudget::from_start_and_timeout(&started_at, dur)
736            .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
737        let now = self
738            .effects
739            .physical_time()
740            .await
741            .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
742        let remaining = budget
743            .remaining_at(&now)
744            .map_err(|_| TelltaleChoreographyError::Timeout(dur))?;
745        let sleep_ms = remaining.as_millis().min(u128::from(u64::MAX)) as u64;
746        tokio::select! {
747            result = body => result,
748            _ = self.effects.sleep_ms(sleep_ms) => Err(TelltaleChoreographyError::Timeout(dur)),
749        }
750    }
751}
752
753#[async_trait]
754impl<E, R> ChoreoHandlerExt for AuraProtocolAdapter<E, R>
755where
756    E: ChoreographicEffects
757        + GuardEffects
758        + GuardContextProvider
759        + aura_core::PhysicalTimeEffects
760        + aura_core::TimeEffects,
761    R: RoleId,
762{
763    async fn setup(&mut self, _role: Self::Role) -> ChoreoResult<Self::Endpoint> {
764        Ok(())
765    }
766
767    async fn teardown(&mut self, _ep: Self::Endpoint) -> ChoreoResult<()> {
768        Ok(())
769    }
770}
771
772#[async_trait]
773impl<E, R> GeneratedChoreographyRuntime for AuraProtocolAdapter<E, R>
774where
775    E: ChoreographicEffects
776        + GuardEffects
777        + GuardContextProvider
778        + aura_core::PhysicalTimeEffects
779        + aura_core::TimeEffects,
780    R: RoleId,
781{
782    async fn provide_message<M: Send + 'static>(&mut self, to: Self::Role) -> ChoreoResult<M> {
783        self.ensure_runtime_admission()
784            .await
785            .map_err(map_runtime_error)?;
786        let boxed = match self.outbound.pop_front() {
787            Some(boxed) => boxed,
788            None => {
789                if let Some(provider) = self.message_provider.as_mut() {
790                    provider(
791                        MessageRequest {
792                            to,
793                            type_name: std::any::type_name::<M>(),
794                        },
795                        &self.received,
796                    )
797                    .ok_or_else(|| {
798                        map_runtime_error(AuraChoreographyError::ProtocolViolation {
799                            message: "message provider returned None".to_string(),
800                        })
801                    })?
802                } else {
803                    return Err(map_runtime_error(
804                        AuraChoreographyError::ProtocolViolation {
805                            message: "no queued message for provide_message".to_string(),
806                        },
807                    ));
808                }
809            }
810        };
811
812        boxed.downcast::<M>().map(|msg| *msg).map_err(|_| {
813            map_runtime_error(AuraChoreographyError::ProtocolViolation {
814                message: format!(
815                    "queued message type mismatch (expected {})",
816                    std::any::type_name::<M>()
817                ),
818            })
819        })
820    }
821
822    async fn select_branch<L: LabelId>(&mut self, choices: &[L]) -> ChoreoResult<L> {
823        self.ensure_runtime_admission()
824            .await
825            .map_err(map_runtime_error)?;
826        let choice = match self.branch_choices.pop_front() {
827            Some(choice) => choice,
828            None => {
829                if let Some(decider) = self.branch_decider.as_mut() {
830                    let label = decider(&self.received).ok_or_else(|| {
831                        map_runtime_error(AuraChoreographyError::ProtocolViolation {
832                            message: "branch decider returned None".to_string(),
833                        })
834                    })?;
835                    let selected = choices
836                        .iter()
837                        .copied()
838                        .find(|choice| choice.as_str() == label);
839                    return selected.ok_or_else(|| {
840                        map_runtime_error(AuraChoreographyError::ProtocolViolation {
841                            message: format!("branch decider returned invalid label: {label}"),
842                        })
843                    });
844                }
845                return Err(map_runtime_error(
846                    AuraChoreographyError::ProtocolViolation {
847                        message: "no queued branch choice for select_branch".to_string(),
848                    },
849                ));
850            }
851        };
852
853        let selected = choices
854            .iter()
855            .copied()
856            .find(|label| label.as_str() == choice.as_str());
857
858        selected.ok_or_else(|| {
859            map_runtime_error(AuraChoreographyError::ProtocolViolation {
860                message: "queued branch choice is not valid for this choice".to_string(),
861            })
862        })
863    }
864
865    fn resolve_family(&self, family: &str) -> ChoreoResult<Vec<Self::Role>> {
866        self.role_families
867            .get(family)
868            .cloned()
869            .ok_or_else(|| AuraChoreographyError::RoleFamilyNotFound {
870                family: family.to_string(),
871            })
872            .map_err(map_runtime_error)
873            .and_then(|roles| {
874                if roles.is_empty() {
875                    Err(map_runtime_error(AuraChoreographyError::EmptyRoleFamily {
876                        family: family.to_string(),
877                    }))
878                } else {
879                    Ok(roles)
880                }
881            })
882    }
883}
884
885fn map_runtime_error(error: AuraChoreographyError) -> TelltaleChoreographyError {
886    TelltaleChoreographyError::ExecutionError(error.to_string())
887}
888
889fn capability_key_ref(key: &str) -> String {
890    let digest = hash(key.as_bytes());
891    hex::encode(&digest[..8])
892}