Skip to main content

aura_agent/runtime/
session_ingress.rs

1//! Canonical host-side ingress for owned Telltale protocol-machine sessions.
2
3#![allow(clippy::result_large_err, clippy::incompatible_msrv)]
4
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use aura_mpst::{
9    upstream::types::{GlobalType, LocalTypeR},
10    CompositionManifest,
11};
12use aura_protocol::effects::{ChoreographicEffects, ChoreographicRole};
13use thiserror::Error;
14use uuid::Uuid;
15
16use super::subsystems::choreography::{
17    RuntimeChoreographySessionId, SessionOwnerCapability, SessionOwnerCapabilityScope,
18    SessionOwnershipError,
19};
20use super::vm_host_bridge::{
21    advance_host_bridged_vm_round, advance_host_bridged_vm_round_until_receive,
22    close_and_reap_vm_session, handle_standard_vm_round, open_manifest_vm_session_admitted,
23    AuraQueuedVmBridgeHandler, AuraVmBridgeRound, AuraVmHostWaitStatus, AuraVmRoundDisposition,
24    BlockedVmReceive,
25};
26use super::{AuraChoreoEngine, AuraEffectSystem, AuraVmSchedulerSignals};
27use super::{AuraLinkBoundary, RuntimeBoundaryError, RuntimeSessionEvent};
28use aura_core::OwnershipCategory;
29use telltale_machine::SessionId;
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct RuntimeSessionOwner {
33    pub session_id: RuntimeChoreographySessionId,
34    pub owner_label: String,
35    pub capability: SessionOwnerCapability,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum SessionStartFailureReason {
40    AlreadyExists,
41    TaskAlreadyBound,
42    OwnerClaimRejected,
43    VmSessionOpenFailed,
44    Other,
45}
46
47#[derive(Debug, Error)]
48pub enum SessionIngressError {
49    #[error("runtime session {session_id} has no owner record")]
50    MissingOwner {
51        session_id: RuntimeChoreographySessionId,
52    },
53    #[error("runtime session {session_id} is not owned by expected owner {expected_owner}")]
54    StaleOwner {
55        session_id: RuntimeChoreographySessionId,
56        expected_owner: String,
57    },
58    #[error("runtime session {session_id} rejected ingress for owner {owner_label}: {details}")]
59    InvalidIngressRouting {
60        session_id: RuntimeChoreographySessionId,
61        owner_label: String,
62        details: RuntimeBoundaryError,
63    },
64    #[error(
65        "failed to start owned runtime session {session_id} for {owner_label} ({reason:?}): {message}"
66    )]
67    SessionStart {
68        session_id: RuntimeChoreographySessionId,
69        owner_label: String,
70        reason: SessionStartFailureReason,
71        message: String,
72    },
73    #[error("failed to advance owned runtime session {session_id} for {owner_label}: {message}")]
74    Round {
75        session_id: RuntimeChoreographySessionId,
76        owner_label: String,
77        message: String,
78    },
79    #[error("failed to close owned runtime session {session_id} for {owner_label}: {message}")]
80    SessionClose {
81        session_id: RuntimeChoreographySessionId,
82        owner_label: String,
83        message: String,
84    },
85    #[error(
86        "failed to transfer owned runtime session {session_id} from {from_owner_label} to {to_owner_label}: {message}"
87    )]
88    OwnerTransfer {
89        session_id: RuntimeChoreographySessionId,
90        from_owner_label: String,
91        to_owner_label: String,
92        message: String,
93    },
94}
95
96impl SessionIngressError {
97    fn error_kind(&self) -> &'static str {
98        match self {
99            Self::MissingOwner { .. } => "missing_owner",
100            Self::StaleOwner { .. } => "stale_owner",
101            Self::InvalidIngressRouting { .. } => "invalid_ingress_routing",
102            Self::SessionStart { .. } => "session_start",
103            Self::Round { .. } => "round",
104            Self::SessionClose { .. } => "session_close",
105            Self::OwnerTransfer { .. } => "owner_transfer",
106        }
107    }
108}
109
110fn classify_session_start_error(
111    error: &aura_protocol::effects::ChoreographyError,
112) -> SessionStartFailureReason {
113    match error {
114        aura_protocol::effects::ChoreographyError::SessionAlreadyExists { .. } => {
115            SessionStartFailureReason::AlreadyExists
116        }
117        aura_protocol::effects::ChoreographyError::InternalError { message }
118            if message.starts_with("task already bound to active choreography session") =>
119        {
120            SessionStartFailureReason::TaskAlreadyBound
121        }
122        _ => SessionStartFailureReason::Other,
123    }
124}
125
126impl RuntimeSessionOwner {
127    pub const OWNERSHIP_CATEGORY: OwnershipCategory = OwnershipCategory::MoveOwned;
128}
129
130impl From<SessionOwnershipError> for SessionIngressError {
131    fn from(value: SessionOwnershipError) -> Self {
132        match value {
133            SessionOwnershipError::MissingOwner { session_id } => {
134                SessionIngressError::MissingOwner { session_id }
135            }
136            SessionOwnershipError::OwnerMismatch {
137                session_id,
138                expected_owner,
139            } => SessionIngressError::StaleOwner {
140                session_id,
141                expected_owner,
142            },
143            SessionOwnershipError::CapabilityMismatch {
144                session_id,
145                expected_owner,
146                current_generation,
147            } => SessionIngressError::InvalidIngressRouting {
148                session_id,
149                owner_label: expected_owner,
150                details: RuntimeBoundaryError::CapabilityRejected {
151                    details: format!(
152                        "session capability no longer valid; current generation is {current_generation}"
153                    ),
154                },
155            },
156            SessionOwnershipError::CapabilitySessionMismatch {
157                session_id,
158                expected_owner,
159                capability_session_id,
160            } => SessionIngressError::InvalidIngressRouting {
161                session_id,
162                owner_label: expected_owner,
163                details: RuntimeBoundaryError::CapabilityRejected {
164                    details: format!(
165                        "session capability was issued for {capability_session_id}, not {session_id}"
166                    ),
167                },
168            },
169            SessionOwnershipError::OwnerConflict {
170                session_id,
171                existing_owner,
172                requested_owner,
173            } => SessionIngressError::InvalidIngressRouting {
174                session_id,
175                owner_label: requested_owner,
176                details: RuntimeBoundaryError::CapabilityRejected {
177                    details: format!("session already owned by {existing_owner}"),
178                },
179            },
180        }
181    }
182}
183
184pub struct OwnedVmSession {
185    effects: Arc<AuraEffectSystem>,
186    owner: RuntimeSessionOwner,
187    routing_boundary: AuraLinkBoundary,
188    engine: AuraChoreoEngine<AuraQueuedVmBridgeHandler>,
189    handler: Arc<AuraQueuedVmBridgeHandler>,
190    vm_session_id: SessionId,
191}
192
193#[derive(Debug)]
194struct OwnedVmSessionOwnerTransfer {
195    next_owner: RuntimeSessionOwner,
196    next_boundary: AuraLinkBoundary,
197}
198
199fn log_session_owner_assigned(
200    owner: &RuntimeSessionOwner,
201    protocol_id: Option<&str>,
202    context: &'static str,
203) {
204    tracing::debug!(
205        event = RuntimeSessionEvent::OwnerAssigned.as_event_name(),
206        session_id = %owner.session_id,
207        owner_label = %owner.owner_label,
208        capability_generation = owner.capability.generation,
209        protocol_id,
210        context,
211        "Assigned runtime session owner"
212    );
213}
214
215fn log_session_owner_rejected(
216    session_id: RuntimeChoreographySessionId,
217    owner_label: &str,
218    protocol_id: Option<&str>,
219    reason: &'static str,
220    error: &str,
221) {
222    tracing::warn!(
223        event = RuntimeSessionEvent::OwnerRejected.as_event_name(),
224        session_id = %session_id,
225        owner_label,
226        protocol_id,
227        reason,
228        error,
229        "Rejected runtime session owner"
230    );
231}
232
233fn log_session_owner_transferred(
234    previous_owner: &RuntimeSessionOwner,
235    next_owner: &RuntimeSessionOwner,
236    protocol_id: Option<&str>,
237    context: &'static str,
238) {
239    tracing::info!(
240        event = RuntimeSessionEvent::OwnerTransferred.as_event_name(),
241        session_id = %previous_owner.session_id,
242        from_owner_label = %previous_owner.owner_label,
243        from_generation = previous_owner.capability.generation,
244        to_owner_label = %next_owner.owner_label,
245        to_generation = next_owner.capability.generation,
246        protocol_id,
247        context,
248        "Transferred runtime session owner"
249    );
250}
251
252fn log_session_owner_transfer_rejected(
253    previous_owner: &RuntimeSessionOwner,
254    next_owner_label: &str,
255    protocol_id: Option<&str>,
256    error: &SessionIngressError,
257    context: &'static str,
258) {
259    tracing::warn!(
260        event = RuntimeSessionEvent::OwnerTransferRejected.as_event_name(),
261        session_id = %previous_owner.session_id,
262        from_owner_label = %previous_owner.owner_label,
263        from_generation = previous_owner.capability.generation,
264        to_owner_label = next_owner_label,
265        protocol_id,
266        context,
267        error_kind = error.error_kind(),
268        error = %error,
269        "Rejected runtime session owner transfer"
270    );
271}
272
273fn log_session_ingress_received(
274    owner: &RuntimeSessionOwner,
275    ingress_kind: &'static str,
276    active_role: Option<&str>,
277    from_role: Option<&str>,
278    to_role: Option<&str>,
279    payload_bytes: usize,
280) {
281    tracing::debug!(
282        event = RuntimeSessionEvent::IngressReceived.as_event_name(),
283        session_id = %owner.session_id,
284        owner_label = %owner.owner_label,
285        capability_generation = owner.capability.generation,
286        ingress_kind,
287        active_role,
288        from_role,
289        to_role,
290        payload_bytes,
291        "Accepted owned session ingress"
292    );
293}
294
295fn log_session_ingress_dropped(
296    owner: &RuntimeSessionOwner,
297    ingress_kind: &'static str,
298    error: &SessionIngressError,
299    active_role: Option<&str>,
300) {
301    tracing::warn!(
302        event = RuntimeSessionEvent::IngressDropped.as_event_name(),
303        session_id = %owner.session_id,
304        owner_label = %owner.owner_label,
305        capability_generation = owner.capability.generation,
306        ingress_kind,
307        active_role,
308        error_kind = error.error_kind(),
309        error = %error,
310        "Dropped owned session ingress"
311    );
312}
313
314impl OwnedVmSession {
315    fn prepare_owner_transfer(
316        &self,
317        next_owner_label: impl Into<String>,
318        next_boundary: AuraLinkBoundary,
319    ) -> Result<OwnedVmSessionOwnerTransfer, SessionIngressError> {
320        let next_scope = next_boundary.capability_scope.clone();
321        let next_owner = self.effects.transfer_owned_choreography_session_owner(
322            self.owner.clone(),
323            next_owner_label,
324            next_scope,
325        )?;
326        Ok(OwnedVmSessionOwnerTransfer {
327            next_owner,
328            next_boundary,
329        })
330    }
331
332    pub fn owner(&self) -> &RuntimeSessionOwner {
333        &self.owner
334    }
335
336    pub fn routing_boundary(&self) -> &AuraLinkBoundary {
337        &self.routing_boundary
338    }
339
340    pub fn queue_send_bytes(&self, payload: Vec<u8>) {
341        self.handler.push_send_bytes(payload);
342    }
343
344    pub fn queue_choice_label(&self, label: impl Into<String>) {
345        self.handler.push_choice_label(label.into());
346    }
347
348    pub fn engine_mut(&mut self) -> &mut AuraChoreoEngine<AuraQueuedVmBridgeHandler> {
349        &mut self.engine
350    }
351
352    pub fn vm_session_id(&self) -> SessionId {
353        self.vm_session_id
354    }
355
356    pub async fn advance_round(
357        &mut self,
358        active_role: &str,
359        peer_roles: &BTreeMap<String, ChoreographicRole>,
360    ) -> Result<AuraVmBridgeRound, SessionIngressError> {
361        log_session_ingress_received(
362            &self.owner,
363            "advance_round",
364            Some(active_role),
365            None,
366            None,
367            0,
368        );
369        if let Err(error) = self
370            .effects
371            .assert_owned_choreography_boundary(&self.owner, &self.routing_boundary)
372        {
373            log_session_ingress_dropped(&self.owner, "advance_round", &error, Some(active_role));
374            return Err(error);
375        }
376        let result = advance_host_bridged_vm_round(
377            self.effects.as_ref(),
378            &mut self.engine,
379            self.handler.as_ref(),
380            self.vm_session_id,
381            active_role,
382            peer_roles,
383        )
384        .await
385        .map_err(|message| SessionIngressError::Round {
386            session_id: self.owner.session_id,
387            owner_label: self.owner.owner_label.clone(),
388            message,
389        });
390        if let Err(error) = &result {
391            log_session_ingress_dropped(&self.owner, "advance_round", error, Some(active_role));
392        }
393        result
394    }
395
396    pub async fn advance_round_until_receive<F>(
397        &mut self,
398        active_role: &str,
399        peer_roles: &BTreeMap<String, ChoreographicRole>,
400        stop_on_receive_error: F,
401    ) -> Result<AuraVmBridgeRound, SessionIngressError>
402    where
403        F: Fn(&aura_protocol::effects::ChoreographyError) -> bool,
404    {
405        log_session_ingress_received(
406            &self.owner,
407            "advance_round_until_receive",
408            Some(active_role),
409            None,
410            None,
411            0,
412        );
413        if let Err(error) = self
414            .effects
415            .assert_owned_choreography_boundary(&self.owner, &self.routing_boundary)
416        {
417            log_session_ingress_dropped(
418                &self.owner,
419                "advance_round_until_receive",
420                &error,
421                Some(active_role),
422            );
423            return Err(error);
424        }
425        let result = advance_host_bridged_vm_round_until_receive(
426            self.effects.as_ref(),
427            &mut self.engine,
428            self.handler.as_ref(),
429            self.vm_session_id,
430            active_role,
431            peer_roles,
432            stop_on_receive_error,
433        )
434        .await
435        .map_err(|message| SessionIngressError::Round {
436            session_id: self.owner.session_id,
437            owner_label: self.owner.owner_label.clone(),
438            message,
439        });
440        if let Err(error) = &result {
441            log_session_ingress_dropped(
442                &self.owner,
443                "advance_round_until_receive",
444                error,
445                Some(active_role),
446            );
447        }
448        result
449    }
450
451    pub fn inject_blocked_receive(
452        &mut self,
453        receive: &BlockedVmReceive,
454    ) -> Result<(), SessionIngressError> {
455        log_session_ingress_received(
456            &self.owner,
457            "blocked_receive",
458            Some(receive.to_role.as_str()),
459            Some(receive.from_role.as_str()),
460            Some(receive.to_role.as_str()),
461            receive.payload.len(),
462        );
463        if let Err(error) = self
464            .effects
465            .assert_owned_choreography_boundary(&self.owner, &self.routing_boundary)
466        {
467            log_session_ingress_dropped(&self.owner, "blocked_receive", &error, None);
468            return Err(error);
469        }
470        let result =
471            super::vm_host_bridge::inject_vm_receive(&mut self.engine, self.vm_session_id, receive)
472                .map_err(|message| SessionIngressError::Round {
473                    session_id: self.owner.session_id,
474                    owner_label: self.owner.owner_label.clone(),
475                    message,
476                });
477        if let Err(error) = &result {
478            log_session_ingress_dropped(&self.owner, "blocked_receive", error, None);
479        }
480        result
481    }
482
483    pub async fn close(mut self) -> Result<(), SessionIngressError> {
484        self.effects
485            .assert_owned_choreography_session(&self.owner)?;
486        close_and_reap_vm_session(&mut self.engine, self.vm_session_id).map_err(|message| {
487            SessionIngressError::SessionClose {
488                session_id: self.owner.session_id,
489                owner_label: self.owner.owner_label.clone(),
490                message,
491            }
492        })?;
493        self.effects
494            .end_owned_choreography_session(&self.owner)
495            .await
496    }
497
498    pub fn transfer_owner_in_place(
499        &mut self,
500        next_owner_label: impl Into<String>,
501        next_boundary: AuraLinkBoundary,
502    ) -> Result<(), SessionIngressError> {
503        let transfer = self.prepare_owner_transfer(next_owner_label, next_boundary)?;
504        self.routing_boundary = transfer.next_boundary;
505        let _previous = std::mem::replace(&mut self.owner, transfer.next_owner);
506        Ok(())
507    }
508
509    pub fn transfer_owner(
510        mut self,
511        next_owner_label: impl Into<String>,
512        next_scope: SessionOwnerCapabilityScope,
513    ) -> Result<Self, SessionIngressError> {
514        self.transfer_owner_in_place(next_owner_label, AuraLinkBoundary::for_scope(next_scope))?;
515        Ok(self)
516    }
517}
518
519#[track_caller]
520pub fn caller_session_owner_label() -> String {
521    let caller = std::panic::Location::caller();
522    format!("{}:{}:{}", caller.file(), caller.line(), caller.column())
523}
524
525pub async fn open_owned_manifest_vm_session_admitted(
526    effects: Arc<AuraEffectSystem>,
527    session_uuid: Uuid,
528    roles: Vec<ChoreographicRole>,
529    manifest: &CompositionManifest,
530    active_role: &str,
531    global_type: &GlobalType,
532    local_types: &BTreeMap<String, LocalTypeR>,
533    scheduler_signals: AuraVmSchedulerSignals,
534) -> Result<OwnedVmSession, SessionIngressError> {
535    let owner_label = caller_session_owner_label();
536    let owner = effects
537        .start_owned_choreography_session(owner_label, session_uuid, roles)
538        .await?;
539    effects
540        .set_current_runtime_choreography_protocol_id(manifest.protocol_id.clone())
541        .map_err(|message| SessionIngressError::SessionStart {
542            session_id: owner.session_id,
543            owner_label: owner.owner_label.clone(),
544            reason: SessionStartFailureReason::VmSessionOpenFailed,
545            message,
546        })?;
547    let routing_boundary = AuraLinkBoundary::for_manifest(manifest);
548
549    match open_manifest_vm_session_admitted(
550        effects.as_ref(),
551        manifest,
552        active_role,
553        global_type,
554        local_types,
555        scheduler_signals,
556    )
557    .await
558    {
559        Ok((engine, handler, vm_session_id)) => Ok(OwnedVmSession {
560            effects,
561            owner,
562            routing_boundary,
563            engine,
564            handler,
565            vm_session_id,
566        }),
567        Err(error) => {
568            log_session_owner_rejected(
569                owner.session_id,
570                &owner.owner_label,
571                Some(manifest.protocol_id.as_str()),
572                "vm_session_open_failed",
573                &error.to_string(),
574            );
575            let _ = effects.end_owned_choreography_session(&owner).await;
576            Err(SessionIngressError::SessionStart {
577                session_id: owner.session_id,
578                owner_label: owner.owner_label,
579                reason: SessionStartFailureReason::VmSessionOpenFailed,
580                message: error.to_string(),
581            })
582        }
583    }
584}
585
586pub fn handle_owned_vm_round(
587    session: &mut OwnedVmSession,
588    round: AuraVmBridgeRound,
589    context_label: &str,
590) -> Result<AuraVmRoundDisposition, SessionIngressError> {
591    if let Some(blocked) = round.blocked_receive {
592        session.inject_blocked_receive(&blocked)?;
593        return Ok(AuraVmRoundDisposition::Continue);
594    }
595
596    match round.host_wait_status {
597        AuraVmHostWaitStatus::Idle | AuraVmHostWaitStatus::Delivered => {}
598        AuraVmHostWaitStatus::TimedOut => {
599            return Err(SessionIngressError::Round {
600                session_id: session.owner.session_id,
601                owner_label: session.owner.owner_label.clone(),
602                message: format!("{context_label} timed out while waiting for receive"),
603            });
604        }
605        AuraVmHostWaitStatus::Cancelled => {
606            return Err(SessionIngressError::Round {
607                session_id: session.owner.session_id,
608                owner_label: session.owner.owner_label.clone(),
609                message: format!("{context_label} cancelled while waiting for receive"),
610            });
611        }
612        AuraVmHostWaitStatus::Deferred => {}
613    }
614
615    let vm_session_id = session.vm_session_id();
616    handle_standard_vm_round(
617        session.engine_mut(),
618        vm_session_id,
619        AuraVmBridgeRound {
620            step: round.step,
621            blocked_receive: None,
622            host_wait_status: round.host_wait_status,
623        },
624        context_label,
625    )
626    .map_err(|message| SessionIngressError::Round {
627        session_id: session.owner.session_id,
628        owner_label: session.owner.owner_label.clone(),
629        message,
630    })
631}
632
633impl AuraEffectSystem {
634    fn assert_runtime_choreography_session_binding(
635        &self,
636        session_id: RuntimeChoreographySessionId,
637        owner_label: &str,
638    ) -> Result<(), SessionIngressError> {
639        let current_session_id =
640            self.current_runtime_choreography_session_id()
641                .ok_or_else(|| SessionIngressError::InvalidIngressRouting {
642                    session_id,
643                    owner_label: owner_label.to_string(),
644                    details: RuntimeBoundaryError::MissingTaskBinding,
645                })?;
646
647        if current_session_id != session_id {
648            return Err(SessionIngressError::InvalidIngressRouting {
649                session_id,
650                owner_label: owner_label.to_string(),
651                details: RuntimeBoundaryError::SessionBindingMismatch {
652                    expected_session_id: session_id,
653                    bound_session_id: current_session_id,
654                },
655            });
656        }
657
658        Ok(())
659    }
660
661    pub async fn start_owned_choreography_session(
662        &self,
663        owner_label: impl Into<String>,
664        session_uuid: Uuid,
665        roles: Vec<ChoreographicRole>,
666    ) -> Result<RuntimeSessionOwner, SessionIngressError> {
667        let owner_label = owner_label.into();
668        ChoreographicEffects::start_session(self, session_uuid, roles)
669            .await
670            .map_err(|error| SessionIngressError::SessionStart {
671                session_id: RuntimeChoreographySessionId::from_uuid(session_uuid),
672                owner_label: owner_label.clone(),
673                reason: classify_session_start_error(&error),
674                message: error.to_string(),
675            })
676            .inspect_err(|error| {
677                log_session_owner_rejected(
678                    RuntimeChoreographySessionId::from_uuid(session_uuid),
679                    &owner_label,
680                    None,
681                    "runtime_session_start_failed",
682                    &error.to_string(),
683                );
684            })?;
685
686        let session_id = RuntimeChoreographySessionId::from_uuid(session_uuid);
687        let capability =
688            match self.claim_runtime_choreography_session_owner(session_id, owner_label.clone()) {
689                Ok(capability) => capability,
690                Err(error) => {
691                    let _ = ChoreographicEffects::end_session(self).await;
692                    log_session_owner_rejected(
693                        session_id,
694                        &owner_label,
695                        None,
696                        "owner_claim_rejected",
697                        &error,
698                    );
699                    return Err(SessionIngressError::SessionStart {
700                        session_id,
701                        owner_label,
702                        reason: SessionStartFailureReason::OwnerClaimRejected,
703                        message: error,
704                    });
705                }
706            };
707
708        let owner = RuntimeSessionOwner {
709            session_id,
710            owner_label,
711            capability,
712        };
713        log_session_owner_assigned(&owner, None, "start_owned_choreography_session");
714        Ok(owner)
715    }
716
717    pub fn assert_owned_choreography_session(
718        &self,
719        owner: &RuntimeSessionOwner,
720    ) -> Result<(), SessionIngressError> {
721        self.assert_owned_choreography_boundary(
722            owner,
723            &AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Session),
724        )?;
725
726        if !owner.capability.allows_full_session() {
727            return Err(SessionIngressError::InvalidIngressRouting {
728                session_id: owner.session_id,
729                owner_label: owner.owner_label.clone(),
730                details: RuntimeBoundaryError::FullSessionCapabilityRequired,
731            });
732        }
733
734        Ok(())
735    }
736
737    pub fn assert_owned_choreography_boundary(
738        &self,
739        owner: &RuntimeSessionOwner,
740        boundary: &AuraLinkBoundary,
741    ) -> Result<(), SessionIngressError> {
742        self.assert_runtime_choreography_session_binding(owner.session_id, &owner.owner_label)?;
743        self.ensure_runtime_choreography_session_owner_capability(
744            owner.session_id,
745            &owner.capability,
746        )
747        .map_err(|error| SessionIngressError::InvalidIngressRouting {
748            session_id: owner.session_id,
749            owner_label: owner.owner_label.clone(),
750            details: RuntimeBoundaryError::CapabilityRejected { details: error },
751        })?;
752
753        if !boundary.is_allowed_by(&owner.capability.scope) {
754            return Err(SessionIngressError::InvalidIngressRouting {
755                session_id: owner.session_id,
756                owner_label: owner.owner_label.clone(),
757                details: RuntimeBoundaryError::BoundaryScopeRejected {
758                    boundary: boundary.clone(),
759                    capability_scope: owner.capability.scope.clone(),
760                },
761            });
762        }
763
764        Ok(())
765    }
766
767    pub fn transfer_owned_choreography_session_owner(
768        &self,
769        owner: RuntimeSessionOwner,
770        next_owner_label: impl Into<String>,
771        next_scope: SessionOwnerCapabilityScope,
772    ) -> Result<RuntimeSessionOwner, SessionIngressError> {
773        self.assert_runtime_choreography_session_binding(owner.session_id, &owner.owner_label)?;
774        self.ensure_runtime_choreography_session_owner_capability(
775            owner.session_id,
776            &owner.capability,
777        )
778        .map_err(|error| SessionIngressError::InvalidIngressRouting {
779            session_id: owner.session_id,
780            owner_label: owner.owner_label.clone(),
781            details: RuntimeBoundaryError::CapabilityRejected { details: error },
782        })?;
783
784        let next_owner_label = next_owner_label.into();
785        let next_capability = self
786            .transfer_runtime_choreography_session_owner(
787                owner.session_id,
788                &owner.capability,
789                next_owner_label.clone(),
790                next_scope,
791            )
792            .map_err(|message| SessionIngressError::OwnerTransfer {
793                session_id: owner.session_id,
794                from_owner_label: owner.owner_label.clone(),
795                to_owner_label: next_owner_label.clone(),
796                message,
797            });
798
799        match next_capability {
800            Ok(next_capability) => {
801                let next_owner = RuntimeSessionOwner {
802                    session_id: owner.session_id,
803                    owner_label: next_owner_label,
804                    capability: next_capability,
805                };
806                log_session_owner_transferred(
807                    &owner,
808                    &next_owner,
809                    None,
810                    "transfer_owned_choreography_session_owner",
811                );
812                Ok(next_owner)
813            }
814            Err(error) => {
815                log_session_owner_transfer_rejected(
816                    &owner,
817                    &next_owner_label,
818                    None,
819                    &error,
820                    "transfer_owned_choreography_session_owner",
821                );
822                Err(error)
823            }
824        }
825    }
826
827    pub async fn end_owned_choreography_session(
828        &self,
829        owner: &RuntimeSessionOwner,
830    ) -> Result<(), SessionIngressError> {
831        self.assert_owned_choreography_session(owner)?;
832        ChoreographicEffects::end_session(self)
833            .await
834            .map_err(|error| SessionIngressError::SessionClose {
835                session_id: owner.session_id,
836                owner_label: owner.owner_label.clone(),
837                message: error.to_string(),
838            })
839    }
840}
841
842#[cfg(test)]
843mod tests {
844    use super::*;
845    use crate::core::AgentConfig;
846    use aura_core::AuthorityId;
847    use aura_mpst::{CompositionLinkSpec, CompositionManifest};
848    use aura_protocol::effects::{ChoreographicRole, RoleIndex};
849    use std::collections::BTreeSet;
850    use std::sync::Arc;
851
852    fn test_authority(byte: u8) -> AuthorityId {
853        AuthorityId::from_uuid(Uuid::from_bytes([byte; 16]))
854    }
855
856    fn linked_manifest(protocol_id: &str, bundle_id: &str) -> CompositionManifest {
857        CompositionManifest {
858            protocol_name: protocol_id.to_string(),
859            protocol_namespace: None,
860            protocol_qualified_name: protocol_id.to_string(),
861            protocol_id: protocol_id.to_string(),
862            role_names: vec!["Role".to_string()],
863            required_capabilities: Vec::new(),
864            theorem_packs: Vec::new(),
865            required_theorem_packs: Vec::new(),
866            required_theorem_pack_capabilities: Vec::new(),
867            guard_capabilities: Vec::new(),
868            determinism_policy_ref: None,
869            delegation_constraints: Vec::new(),
870            link_specs: vec![CompositionLinkSpec {
871                role: "Role".to_string(),
872                bundle_id: bundle_id.to_string(),
873                imports: Vec::new(),
874                exports: Vec::new(),
875            }],
876        }
877    }
878
879    #[tokio::test]
880    async fn transferring_runtime_session_owner_invalidates_stale_capability() {
881        let authority = test_authority(0x11);
882        let effects =
883            AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
884                .expect("test effect system");
885        let roles = vec![ChoreographicRole::for_authority(
886            authority,
887            RoleIndex::new(0).expect("role index"),
888        )];
889        let session_uuid = Uuid::from_bytes([0x22; 16]);
890        let original = effects
891            .start_owned_choreography_session("owner-a", session_uuid, roles)
892            .await
893            .expect("start owned session");
894        let stale = original.clone();
895
896        let transferred = effects
897            .transfer_owned_choreography_session_owner(
898                original,
899                "owner-b",
900                SessionOwnerCapabilityScope::Session,
901            )
902            .expect("transfer owner");
903
904        assert_eq!(transferred.owner_label, "owner-b");
905        assert!(
906            effects
907                .assert_owned_choreography_session(&transferred)
908                .is_ok(),
909            "new owner must be accepted"
910        );
911        assert!(
912            matches!(
913                effects.assert_owned_choreography_session(&stale),
914                Err(SessionIngressError::StaleOwner { .. })
915                    | Err(SessionIngressError::InvalidIngressRouting { .. })
916            ),
917            "stale owner handle must be rejected after transfer"
918        );
919
920        effects
921            .end_owned_choreography_session(&transferred)
922            .await
923            .expect("close transferred session");
924    }
925
926    #[tokio::test]
927    async fn transferring_runtime_session_owner_moves_fragment_ownership_together() {
928        let authority = test_authority(0x33);
929        let effects =
930            AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
931                .expect("test effect system");
932        let roles = vec![ChoreographicRole::for_authority(
933            authority,
934            RoleIndex::new(0).expect("role index"),
935        )];
936        let session_uuid = Uuid::from_bytes([0x44; 16]);
937        let original = effects
938            .start_owned_choreography_session("owner-a", session_uuid, roles)
939            .await
940            .expect("start owned session");
941        let manifest = linked_manifest("aura.test.protocol", "bundle-a");
942        effects
943            .claim_vm_fragments_for_manifest("owner-a", &manifest)
944            .expect("claim fragment ownership");
945
946        let transferred = effects
947            .transfer_owned_choreography_session_owner(
948                original,
949                "owner-b",
950                SessionOwnerCapabilityScope::Session,
951            )
952            .expect("transfer owner");
953
954        let snapshot = effects.vm_fragment_snapshot();
955        assert_eq!(snapshot.len(), 1);
956        assert_eq!(snapshot[0].1.owner_label, "owner-b");
957        assert!(matches!(
958            snapshot[0].1.bundle_id.as_deref(),
959            Some("bundle-a")
960        ));
961
962        effects
963            .end_owned_choreography_session(&transferred)
964            .await
965            .expect("close transferred session");
966    }
967
968    #[tokio::test]
969    async fn fragment_scoped_owner_accepts_matching_boundary_and_rejects_wrong_boundary() {
970        let authority = test_authority(0x55);
971        let effects =
972            AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
973                .expect("test effect system");
974        let roles = vec![ChoreographicRole::for_authority(
975            authority,
976            RoleIndex::new(0).expect("role index"),
977        )];
978        let session_uuid = Uuid::from_bytes([0x66; 16]);
979        let original = effects
980            .start_owned_choreography_session("owner-a", session_uuid, roles)
981            .await
982            .expect("start owned session");
983
984        let transferred = effects
985            .transfer_owned_choreography_session_owner(
986                original,
987                "owner-b",
988                SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
989                    "bundle:bundle-a".to_string()
990                ])),
991            )
992            .expect("transfer owner");
993
994        let allowed_boundary =
995            AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
996                "bundle:bundle-a".to_string(),
997            ])));
998        effects
999            .assert_owned_choreography_boundary(&transferred, &allowed_boundary)
1000            .expect("matching boundary should be accepted");
1001
1002        let rejected_boundary =
1003            AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
1004                "bundle:bundle-b".to_string(),
1005            ])));
1006        let rejected = effects
1007            .assert_owned_choreography_boundary(&transferred, &rejected_boundary)
1008            .expect_err("wrong boundary should be rejected");
1009        assert!(matches!(
1010            rejected,
1011            SessionIngressError::InvalidIngressRouting { .. }
1012        ));
1013    }
1014
1015    #[tokio::test]
1016    async fn attenuating_owner_capability_rejects_stale_generation_even_for_same_owner_label() {
1017        let authority = test_authority(0x77);
1018        let effects =
1019            AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
1020                .expect("test effect system");
1021        let roles = vec![ChoreographicRole::for_authority(
1022            authority,
1023            RoleIndex::new(0).expect("role index"),
1024        )];
1025        let session_uuid = Uuid::from_bytes([0x88; 16]);
1026        let original = effects
1027            .start_owned_choreography_session("owner-a", session_uuid, roles)
1028            .await
1029            .expect("start owned session");
1030        let stale_full_capability = original.clone();
1031
1032        let attenuated = effects
1033            .transfer_owned_choreography_session_owner(
1034                original,
1035                "owner-a",
1036                SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
1037                    "bundle:bundle-a".to_string()
1038                ])),
1039            )
1040            .expect("attenuate owner capability");
1041        let attenuated_boundary =
1042            AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
1043                "bundle:bundle-a".to_string(),
1044            ])));
1045
1046        effects
1047            .assert_owned_choreography_boundary(&attenuated, &attenuated_boundary)
1048            .expect("attenuated capability should authorize its delegated boundary");
1049        assert!(
1050            matches!(
1051                effects.assert_owned_choreography_boundary(
1052                    &stale_full_capability,
1053                    &attenuated_boundary,
1054                ),
1055                Err(SessionIngressError::InvalidIngressRouting { .. })
1056                    | Err(SessionIngressError::StaleOwner { .. })
1057            ),
1058            "stale capability generation must be rejected even if the owner label is unchanged"
1059        );
1060    }
1061
1062    #[tokio::test]
1063    async fn forged_cross_session_owner_capability_is_rejected() {
1064        let authority = test_authority(0x79);
1065        let effects =
1066            AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
1067                .expect("test effect system");
1068        let roles = vec![ChoreographicRole::for_authority(
1069            authority,
1070            RoleIndex::new(0).expect("role index"),
1071        )];
1072        let session_a = Uuid::from_bytes([0x8a; 16]);
1073        let session_b = Uuid::from_bytes([0x8b; 16]);
1074        let owner_a = effects
1075            .start_owned_choreography_session("owner-a", session_a, roles.clone())
1076            .await
1077            .expect("start session a");
1078        effects
1079            .end_owned_choreography_session(&owner_a)
1080            .await
1081            .expect("close session a");
1082
1083        let owner_b = effects
1084            .start_owned_choreography_session("owner-a", session_b, roles)
1085            .await
1086            .expect("start session b");
1087        let forged = RuntimeSessionOwner {
1088            session_id: owner_b.session_id,
1089            owner_label: owner_b.owner_label.clone(),
1090            capability: owner_a.capability.clone(),
1091        };
1092
1093        assert!(
1094            matches!(
1095                effects.assert_owned_choreography_session(&forged),
1096                Err(SessionIngressError::InvalidIngressRouting { .. })
1097            ),
1098            "cross-session forged authority artifacts must fail closed"
1099        );
1100        effects
1101            .assert_owned_choreography_session(&owner_b)
1102            .expect("canonical capability should still be accepted");
1103
1104        effects
1105            .end_owned_choreography_session(&owner_b)
1106            .await
1107            .expect("close session b");
1108    }
1109
1110    #[tokio::test]
1111    async fn duplicate_session_start_reports_typed_already_exists_reason() {
1112        let authority = test_authority(0x99);
1113        let effects = Arc::new(
1114            AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
1115                .expect("test effect system"),
1116        );
1117        let roles = vec![ChoreographicRole::for_authority(
1118            authority,
1119            RoleIndex::new(0).expect("role index"),
1120        )];
1121        let session_uuid = Uuid::from_bytes([0xaa; 16]);
1122        let (first, second) = futures::join!(
1123            effects.start_owned_choreography_session("owner-a", session_uuid, roles.clone()),
1124            effects.start_owned_choreography_session("owner-b", session_uuid, roles)
1125        );
1126
1127        let (original, duplicate) = match (first, second) {
1128            (Ok(original), Err(duplicate)) => (original, duplicate),
1129            (Err(duplicate), Ok(original)) => (original, duplicate),
1130            (Ok(_), Ok(_)) => panic!("duplicate session start unexpectedly succeeded twice"),
1131            (Err(first_error), Err(second_error)) => {
1132                panic!(
1133                    "duplicate session start unexpectedly failed twice: {first_error:?} / {second_error:?}"
1134                )
1135            }
1136        };
1137
1138        assert!(matches!(
1139            duplicate,
1140            SessionIngressError::SessionStart {
1141                reason: SessionStartFailureReason::AlreadyExists
1142                    | SessionStartFailureReason::TaskAlreadyBound
1143                    | SessionStartFailureReason::OwnerClaimRejected,
1144                ..
1145            }
1146        ));
1147
1148        effects
1149            .end_owned_choreography_session(&original)
1150            .await
1151            .expect("close original session");
1152    }
1153}