1#![allow(clippy::result_large_err, clippy::incompatible_msrv)]
4
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use aura_mpst::{
9 upstream::types::{GlobalType, LocalTypeR},
10 CompositionManifest,
11};
12use aura_protocol::effects::{ChoreographicEffects, ChoreographicRole};
13use thiserror::Error;
14use uuid::Uuid;
15
16use super::subsystems::choreography::{
17 RuntimeChoreographySessionId, SessionOwnerCapability, SessionOwnerCapabilityScope,
18 SessionOwnershipError,
19};
20use super::vm_host_bridge::{
21 advance_host_bridged_vm_round, advance_host_bridged_vm_round_until_receive,
22 close_and_reap_vm_session, handle_standard_vm_round, open_manifest_vm_session_admitted,
23 AuraQueuedVmBridgeHandler, AuraVmBridgeRound, AuraVmHostWaitStatus, AuraVmRoundDisposition,
24 BlockedVmReceive,
25};
26use super::{AuraChoreoEngine, AuraEffectSystem, AuraVmSchedulerSignals};
27use super::{AuraLinkBoundary, RuntimeBoundaryError, RuntimeSessionEvent};
28use aura_core::OwnershipCategory;
29use telltale_machine::SessionId;
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct RuntimeSessionOwner {
33 pub session_id: RuntimeChoreographySessionId,
34 pub owner_label: String,
35 pub capability: SessionOwnerCapability,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum SessionStartFailureReason {
40 AlreadyExists,
41 TaskAlreadyBound,
42 OwnerClaimRejected,
43 VmSessionOpenFailed,
44 Other,
45}
46
47#[derive(Debug, Error)]
48pub enum SessionIngressError {
49 #[error("runtime session {session_id} has no owner record")]
50 MissingOwner {
51 session_id: RuntimeChoreographySessionId,
52 },
53 #[error("runtime session {session_id} is not owned by expected owner {expected_owner}")]
54 StaleOwner {
55 session_id: RuntimeChoreographySessionId,
56 expected_owner: String,
57 },
58 #[error("runtime session {session_id} rejected ingress for owner {owner_label}: {details}")]
59 InvalidIngressRouting {
60 session_id: RuntimeChoreographySessionId,
61 owner_label: String,
62 details: RuntimeBoundaryError,
63 },
64 #[error(
65 "failed to start owned runtime session {session_id} for {owner_label} ({reason:?}): {message}"
66 )]
67 SessionStart {
68 session_id: RuntimeChoreographySessionId,
69 owner_label: String,
70 reason: SessionStartFailureReason,
71 message: String,
72 },
73 #[error("failed to advance owned runtime session {session_id} for {owner_label}: {message}")]
74 Round {
75 session_id: RuntimeChoreographySessionId,
76 owner_label: String,
77 message: String,
78 },
79 #[error("failed to close owned runtime session {session_id} for {owner_label}: {message}")]
80 SessionClose {
81 session_id: RuntimeChoreographySessionId,
82 owner_label: String,
83 message: String,
84 },
85 #[error(
86 "failed to transfer owned runtime session {session_id} from {from_owner_label} to {to_owner_label}: {message}"
87 )]
88 OwnerTransfer {
89 session_id: RuntimeChoreographySessionId,
90 from_owner_label: String,
91 to_owner_label: String,
92 message: String,
93 },
94}
95
96impl SessionIngressError {
97 fn error_kind(&self) -> &'static str {
98 match self {
99 Self::MissingOwner { .. } => "missing_owner",
100 Self::StaleOwner { .. } => "stale_owner",
101 Self::InvalidIngressRouting { .. } => "invalid_ingress_routing",
102 Self::SessionStart { .. } => "session_start",
103 Self::Round { .. } => "round",
104 Self::SessionClose { .. } => "session_close",
105 Self::OwnerTransfer { .. } => "owner_transfer",
106 }
107 }
108}
109
110fn classify_session_start_error(
111 error: &aura_protocol::effects::ChoreographyError,
112) -> SessionStartFailureReason {
113 match error {
114 aura_protocol::effects::ChoreographyError::SessionAlreadyExists { .. } => {
115 SessionStartFailureReason::AlreadyExists
116 }
117 aura_protocol::effects::ChoreographyError::InternalError { message }
118 if message.starts_with("task already bound to active choreography session") =>
119 {
120 SessionStartFailureReason::TaskAlreadyBound
121 }
122 _ => SessionStartFailureReason::Other,
123 }
124}
125
126impl RuntimeSessionOwner {
127 pub const OWNERSHIP_CATEGORY: OwnershipCategory = OwnershipCategory::MoveOwned;
128}
129
130impl From<SessionOwnershipError> for SessionIngressError {
131 fn from(value: SessionOwnershipError) -> Self {
132 match value {
133 SessionOwnershipError::MissingOwner { session_id } => {
134 SessionIngressError::MissingOwner { session_id }
135 }
136 SessionOwnershipError::OwnerMismatch {
137 session_id,
138 expected_owner,
139 } => SessionIngressError::StaleOwner {
140 session_id,
141 expected_owner,
142 },
143 SessionOwnershipError::CapabilityMismatch {
144 session_id,
145 expected_owner,
146 current_generation,
147 } => SessionIngressError::InvalidIngressRouting {
148 session_id,
149 owner_label: expected_owner,
150 details: RuntimeBoundaryError::CapabilityRejected {
151 details: format!(
152 "session capability no longer valid; current generation is {current_generation}"
153 ),
154 },
155 },
156 SessionOwnershipError::CapabilitySessionMismatch {
157 session_id,
158 expected_owner,
159 capability_session_id,
160 } => SessionIngressError::InvalidIngressRouting {
161 session_id,
162 owner_label: expected_owner,
163 details: RuntimeBoundaryError::CapabilityRejected {
164 details: format!(
165 "session capability was issued for {capability_session_id}, not {session_id}"
166 ),
167 },
168 },
169 SessionOwnershipError::OwnerConflict {
170 session_id,
171 existing_owner,
172 requested_owner,
173 } => SessionIngressError::InvalidIngressRouting {
174 session_id,
175 owner_label: requested_owner,
176 details: RuntimeBoundaryError::CapabilityRejected {
177 details: format!("session already owned by {existing_owner}"),
178 },
179 },
180 }
181 }
182}
183
184pub struct OwnedVmSession {
185 effects: Arc<AuraEffectSystem>,
186 owner: RuntimeSessionOwner,
187 routing_boundary: AuraLinkBoundary,
188 engine: AuraChoreoEngine<AuraQueuedVmBridgeHandler>,
189 handler: Arc<AuraQueuedVmBridgeHandler>,
190 vm_session_id: SessionId,
191}
192
193#[derive(Debug)]
194struct OwnedVmSessionOwnerTransfer {
195 next_owner: RuntimeSessionOwner,
196 next_boundary: AuraLinkBoundary,
197}
198
199fn log_session_owner_assigned(
200 owner: &RuntimeSessionOwner,
201 protocol_id: Option<&str>,
202 context: &'static str,
203) {
204 tracing::debug!(
205 event = RuntimeSessionEvent::OwnerAssigned.as_event_name(),
206 session_id = %owner.session_id,
207 owner_label = %owner.owner_label,
208 capability_generation = owner.capability.generation,
209 protocol_id,
210 context,
211 "Assigned runtime session owner"
212 );
213}
214
215fn log_session_owner_rejected(
216 session_id: RuntimeChoreographySessionId,
217 owner_label: &str,
218 protocol_id: Option<&str>,
219 reason: &'static str,
220 error: &str,
221) {
222 tracing::warn!(
223 event = RuntimeSessionEvent::OwnerRejected.as_event_name(),
224 session_id = %session_id,
225 owner_label,
226 protocol_id,
227 reason,
228 error,
229 "Rejected runtime session owner"
230 );
231}
232
233fn log_session_owner_transferred(
234 previous_owner: &RuntimeSessionOwner,
235 next_owner: &RuntimeSessionOwner,
236 protocol_id: Option<&str>,
237 context: &'static str,
238) {
239 tracing::info!(
240 event = RuntimeSessionEvent::OwnerTransferred.as_event_name(),
241 session_id = %previous_owner.session_id,
242 from_owner_label = %previous_owner.owner_label,
243 from_generation = previous_owner.capability.generation,
244 to_owner_label = %next_owner.owner_label,
245 to_generation = next_owner.capability.generation,
246 protocol_id,
247 context,
248 "Transferred runtime session owner"
249 );
250}
251
252fn log_session_owner_transfer_rejected(
253 previous_owner: &RuntimeSessionOwner,
254 next_owner_label: &str,
255 protocol_id: Option<&str>,
256 error: &SessionIngressError,
257 context: &'static str,
258) {
259 tracing::warn!(
260 event = RuntimeSessionEvent::OwnerTransferRejected.as_event_name(),
261 session_id = %previous_owner.session_id,
262 from_owner_label = %previous_owner.owner_label,
263 from_generation = previous_owner.capability.generation,
264 to_owner_label = next_owner_label,
265 protocol_id,
266 context,
267 error_kind = error.error_kind(),
268 error = %error,
269 "Rejected runtime session owner transfer"
270 );
271}
272
273fn log_session_ingress_received(
274 owner: &RuntimeSessionOwner,
275 ingress_kind: &'static str,
276 active_role: Option<&str>,
277 from_role: Option<&str>,
278 to_role: Option<&str>,
279 payload_bytes: usize,
280) {
281 tracing::debug!(
282 event = RuntimeSessionEvent::IngressReceived.as_event_name(),
283 session_id = %owner.session_id,
284 owner_label = %owner.owner_label,
285 capability_generation = owner.capability.generation,
286 ingress_kind,
287 active_role,
288 from_role,
289 to_role,
290 payload_bytes,
291 "Accepted owned session ingress"
292 );
293}
294
295fn log_session_ingress_dropped(
296 owner: &RuntimeSessionOwner,
297 ingress_kind: &'static str,
298 error: &SessionIngressError,
299 active_role: Option<&str>,
300) {
301 tracing::warn!(
302 event = RuntimeSessionEvent::IngressDropped.as_event_name(),
303 session_id = %owner.session_id,
304 owner_label = %owner.owner_label,
305 capability_generation = owner.capability.generation,
306 ingress_kind,
307 active_role,
308 error_kind = error.error_kind(),
309 error = %error,
310 "Dropped owned session ingress"
311 );
312}
313
314impl OwnedVmSession {
315 fn prepare_owner_transfer(
316 &self,
317 next_owner_label: impl Into<String>,
318 next_boundary: AuraLinkBoundary,
319 ) -> Result<OwnedVmSessionOwnerTransfer, SessionIngressError> {
320 let next_scope = next_boundary.capability_scope.clone();
321 let next_owner = self.effects.transfer_owned_choreography_session_owner(
322 self.owner.clone(),
323 next_owner_label,
324 next_scope,
325 )?;
326 Ok(OwnedVmSessionOwnerTransfer {
327 next_owner,
328 next_boundary,
329 })
330 }
331
332 pub fn owner(&self) -> &RuntimeSessionOwner {
333 &self.owner
334 }
335
336 pub fn routing_boundary(&self) -> &AuraLinkBoundary {
337 &self.routing_boundary
338 }
339
340 pub fn queue_send_bytes(&self, payload: Vec<u8>) {
341 self.handler.push_send_bytes(payload);
342 }
343
344 pub fn queue_choice_label(&self, label: impl Into<String>) {
345 self.handler.push_choice_label(label.into());
346 }
347
348 pub fn engine_mut(&mut self) -> &mut AuraChoreoEngine<AuraQueuedVmBridgeHandler> {
349 &mut self.engine
350 }
351
352 pub fn vm_session_id(&self) -> SessionId {
353 self.vm_session_id
354 }
355
356 pub async fn advance_round(
357 &mut self,
358 active_role: &str,
359 peer_roles: &BTreeMap<String, ChoreographicRole>,
360 ) -> Result<AuraVmBridgeRound, SessionIngressError> {
361 log_session_ingress_received(
362 &self.owner,
363 "advance_round",
364 Some(active_role),
365 None,
366 None,
367 0,
368 );
369 if let Err(error) = self
370 .effects
371 .assert_owned_choreography_boundary(&self.owner, &self.routing_boundary)
372 {
373 log_session_ingress_dropped(&self.owner, "advance_round", &error, Some(active_role));
374 return Err(error);
375 }
376 let result = advance_host_bridged_vm_round(
377 self.effects.as_ref(),
378 &mut self.engine,
379 self.handler.as_ref(),
380 self.vm_session_id,
381 active_role,
382 peer_roles,
383 )
384 .await
385 .map_err(|message| SessionIngressError::Round {
386 session_id: self.owner.session_id,
387 owner_label: self.owner.owner_label.clone(),
388 message,
389 });
390 if let Err(error) = &result {
391 log_session_ingress_dropped(&self.owner, "advance_round", error, Some(active_role));
392 }
393 result
394 }
395
396 pub async fn advance_round_until_receive<F>(
397 &mut self,
398 active_role: &str,
399 peer_roles: &BTreeMap<String, ChoreographicRole>,
400 stop_on_receive_error: F,
401 ) -> Result<AuraVmBridgeRound, SessionIngressError>
402 where
403 F: Fn(&aura_protocol::effects::ChoreographyError) -> bool,
404 {
405 log_session_ingress_received(
406 &self.owner,
407 "advance_round_until_receive",
408 Some(active_role),
409 None,
410 None,
411 0,
412 );
413 if let Err(error) = self
414 .effects
415 .assert_owned_choreography_boundary(&self.owner, &self.routing_boundary)
416 {
417 log_session_ingress_dropped(
418 &self.owner,
419 "advance_round_until_receive",
420 &error,
421 Some(active_role),
422 );
423 return Err(error);
424 }
425 let result = advance_host_bridged_vm_round_until_receive(
426 self.effects.as_ref(),
427 &mut self.engine,
428 self.handler.as_ref(),
429 self.vm_session_id,
430 active_role,
431 peer_roles,
432 stop_on_receive_error,
433 )
434 .await
435 .map_err(|message| SessionIngressError::Round {
436 session_id: self.owner.session_id,
437 owner_label: self.owner.owner_label.clone(),
438 message,
439 });
440 if let Err(error) = &result {
441 log_session_ingress_dropped(
442 &self.owner,
443 "advance_round_until_receive",
444 error,
445 Some(active_role),
446 );
447 }
448 result
449 }
450
451 pub fn inject_blocked_receive(
452 &mut self,
453 receive: &BlockedVmReceive,
454 ) -> Result<(), SessionIngressError> {
455 log_session_ingress_received(
456 &self.owner,
457 "blocked_receive",
458 Some(receive.to_role.as_str()),
459 Some(receive.from_role.as_str()),
460 Some(receive.to_role.as_str()),
461 receive.payload.len(),
462 );
463 if let Err(error) = self
464 .effects
465 .assert_owned_choreography_boundary(&self.owner, &self.routing_boundary)
466 {
467 log_session_ingress_dropped(&self.owner, "blocked_receive", &error, None);
468 return Err(error);
469 }
470 let result =
471 super::vm_host_bridge::inject_vm_receive(&mut self.engine, self.vm_session_id, receive)
472 .map_err(|message| SessionIngressError::Round {
473 session_id: self.owner.session_id,
474 owner_label: self.owner.owner_label.clone(),
475 message,
476 });
477 if let Err(error) = &result {
478 log_session_ingress_dropped(&self.owner, "blocked_receive", error, None);
479 }
480 result
481 }
482
483 pub async fn close(mut self) -> Result<(), SessionIngressError> {
484 self.effects
485 .assert_owned_choreography_session(&self.owner)?;
486 close_and_reap_vm_session(&mut self.engine, self.vm_session_id).map_err(|message| {
487 SessionIngressError::SessionClose {
488 session_id: self.owner.session_id,
489 owner_label: self.owner.owner_label.clone(),
490 message,
491 }
492 })?;
493 self.effects
494 .end_owned_choreography_session(&self.owner)
495 .await
496 }
497
498 pub fn transfer_owner_in_place(
499 &mut self,
500 next_owner_label: impl Into<String>,
501 next_boundary: AuraLinkBoundary,
502 ) -> Result<(), SessionIngressError> {
503 let transfer = self.prepare_owner_transfer(next_owner_label, next_boundary)?;
504 self.routing_boundary = transfer.next_boundary;
505 let _previous = std::mem::replace(&mut self.owner, transfer.next_owner);
506 Ok(())
507 }
508
509 pub fn transfer_owner(
510 mut self,
511 next_owner_label: impl Into<String>,
512 next_scope: SessionOwnerCapabilityScope,
513 ) -> Result<Self, SessionIngressError> {
514 self.transfer_owner_in_place(next_owner_label, AuraLinkBoundary::for_scope(next_scope))?;
515 Ok(self)
516 }
517}
518
519#[track_caller]
520pub fn caller_session_owner_label() -> String {
521 let caller = std::panic::Location::caller();
522 format!("{}:{}:{}", caller.file(), caller.line(), caller.column())
523}
524
525pub async fn open_owned_manifest_vm_session_admitted(
526 effects: Arc<AuraEffectSystem>,
527 session_uuid: Uuid,
528 roles: Vec<ChoreographicRole>,
529 manifest: &CompositionManifest,
530 active_role: &str,
531 global_type: &GlobalType,
532 local_types: &BTreeMap<String, LocalTypeR>,
533 scheduler_signals: AuraVmSchedulerSignals,
534) -> Result<OwnedVmSession, SessionIngressError> {
535 let owner_label = caller_session_owner_label();
536 let owner = effects
537 .start_owned_choreography_session(owner_label, session_uuid, roles)
538 .await?;
539 effects
540 .set_current_runtime_choreography_protocol_id(manifest.protocol_id.clone())
541 .map_err(|message| SessionIngressError::SessionStart {
542 session_id: owner.session_id,
543 owner_label: owner.owner_label.clone(),
544 reason: SessionStartFailureReason::VmSessionOpenFailed,
545 message,
546 })?;
547 let routing_boundary = AuraLinkBoundary::for_manifest(manifest);
548
549 match open_manifest_vm_session_admitted(
550 effects.as_ref(),
551 manifest,
552 active_role,
553 global_type,
554 local_types,
555 scheduler_signals,
556 )
557 .await
558 {
559 Ok((engine, handler, vm_session_id)) => Ok(OwnedVmSession {
560 effects,
561 owner,
562 routing_boundary,
563 engine,
564 handler,
565 vm_session_id,
566 }),
567 Err(error) => {
568 log_session_owner_rejected(
569 owner.session_id,
570 &owner.owner_label,
571 Some(manifest.protocol_id.as_str()),
572 "vm_session_open_failed",
573 &error.to_string(),
574 );
575 let _ = effects.end_owned_choreography_session(&owner).await;
576 Err(SessionIngressError::SessionStart {
577 session_id: owner.session_id,
578 owner_label: owner.owner_label,
579 reason: SessionStartFailureReason::VmSessionOpenFailed,
580 message: error.to_string(),
581 })
582 }
583 }
584}
585
586pub fn handle_owned_vm_round(
587 session: &mut OwnedVmSession,
588 round: AuraVmBridgeRound,
589 context_label: &str,
590) -> Result<AuraVmRoundDisposition, SessionIngressError> {
591 if let Some(blocked) = round.blocked_receive {
592 session.inject_blocked_receive(&blocked)?;
593 return Ok(AuraVmRoundDisposition::Continue);
594 }
595
596 match round.host_wait_status {
597 AuraVmHostWaitStatus::Idle | AuraVmHostWaitStatus::Delivered => {}
598 AuraVmHostWaitStatus::TimedOut => {
599 return Err(SessionIngressError::Round {
600 session_id: session.owner.session_id,
601 owner_label: session.owner.owner_label.clone(),
602 message: format!("{context_label} timed out while waiting for receive"),
603 });
604 }
605 AuraVmHostWaitStatus::Cancelled => {
606 return Err(SessionIngressError::Round {
607 session_id: session.owner.session_id,
608 owner_label: session.owner.owner_label.clone(),
609 message: format!("{context_label} cancelled while waiting for receive"),
610 });
611 }
612 AuraVmHostWaitStatus::Deferred => {}
613 }
614
615 let vm_session_id = session.vm_session_id();
616 handle_standard_vm_round(
617 session.engine_mut(),
618 vm_session_id,
619 AuraVmBridgeRound {
620 step: round.step,
621 blocked_receive: None,
622 host_wait_status: round.host_wait_status,
623 },
624 context_label,
625 )
626 .map_err(|message| SessionIngressError::Round {
627 session_id: session.owner.session_id,
628 owner_label: session.owner.owner_label.clone(),
629 message,
630 })
631}
632
633impl AuraEffectSystem {
634 fn assert_runtime_choreography_session_binding(
635 &self,
636 session_id: RuntimeChoreographySessionId,
637 owner_label: &str,
638 ) -> Result<(), SessionIngressError> {
639 let current_session_id =
640 self.current_runtime_choreography_session_id()
641 .ok_or_else(|| SessionIngressError::InvalidIngressRouting {
642 session_id,
643 owner_label: owner_label.to_string(),
644 details: RuntimeBoundaryError::MissingTaskBinding,
645 })?;
646
647 if current_session_id != session_id {
648 return Err(SessionIngressError::InvalidIngressRouting {
649 session_id,
650 owner_label: owner_label.to_string(),
651 details: RuntimeBoundaryError::SessionBindingMismatch {
652 expected_session_id: session_id,
653 bound_session_id: current_session_id,
654 },
655 });
656 }
657
658 Ok(())
659 }
660
661 pub async fn start_owned_choreography_session(
662 &self,
663 owner_label: impl Into<String>,
664 session_uuid: Uuid,
665 roles: Vec<ChoreographicRole>,
666 ) -> Result<RuntimeSessionOwner, SessionIngressError> {
667 let owner_label = owner_label.into();
668 ChoreographicEffects::start_session(self, session_uuid, roles)
669 .await
670 .map_err(|error| SessionIngressError::SessionStart {
671 session_id: RuntimeChoreographySessionId::from_uuid(session_uuid),
672 owner_label: owner_label.clone(),
673 reason: classify_session_start_error(&error),
674 message: error.to_string(),
675 })
676 .inspect_err(|error| {
677 log_session_owner_rejected(
678 RuntimeChoreographySessionId::from_uuid(session_uuid),
679 &owner_label,
680 None,
681 "runtime_session_start_failed",
682 &error.to_string(),
683 );
684 })?;
685
686 let session_id = RuntimeChoreographySessionId::from_uuid(session_uuid);
687 let capability =
688 match self.claim_runtime_choreography_session_owner(session_id, owner_label.clone()) {
689 Ok(capability) => capability,
690 Err(error) => {
691 let _ = ChoreographicEffects::end_session(self).await;
692 log_session_owner_rejected(
693 session_id,
694 &owner_label,
695 None,
696 "owner_claim_rejected",
697 &error,
698 );
699 return Err(SessionIngressError::SessionStart {
700 session_id,
701 owner_label,
702 reason: SessionStartFailureReason::OwnerClaimRejected,
703 message: error,
704 });
705 }
706 };
707
708 let owner = RuntimeSessionOwner {
709 session_id,
710 owner_label,
711 capability,
712 };
713 log_session_owner_assigned(&owner, None, "start_owned_choreography_session");
714 Ok(owner)
715 }
716
717 pub fn assert_owned_choreography_session(
718 &self,
719 owner: &RuntimeSessionOwner,
720 ) -> Result<(), SessionIngressError> {
721 self.assert_owned_choreography_boundary(
722 owner,
723 &AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Session),
724 )?;
725
726 if !owner.capability.allows_full_session() {
727 return Err(SessionIngressError::InvalidIngressRouting {
728 session_id: owner.session_id,
729 owner_label: owner.owner_label.clone(),
730 details: RuntimeBoundaryError::FullSessionCapabilityRequired,
731 });
732 }
733
734 Ok(())
735 }
736
737 pub fn assert_owned_choreography_boundary(
738 &self,
739 owner: &RuntimeSessionOwner,
740 boundary: &AuraLinkBoundary,
741 ) -> Result<(), SessionIngressError> {
742 self.assert_runtime_choreography_session_binding(owner.session_id, &owner.owner_label)?;
743 self.ensure_runtime_choreography_session_owner_capability(
744 owner.session_id,
745 &owner.capability,
746 )
747 .map_err(|error| SessionIngressError::InvalidIngressRouting {
748 session_id: owner.session_id,
749 owner_label: owner.owner_label.clone(),
750 details: RuntimeBoundaryError::CapabilityRejected { details: error },
751 })?;
752
753 if !boundary.is_allowed_by(&owner.capability.scope) {
754 return Err(SessionIngressError::InvalidIngressRouting {
755 session_id: owner.session_id,
756 owner_label: owner.owner_label.clone(),
757 details: RuntimeBoundaryError::BoundaryScopeRejected {
758 boundary: boundary.clone(),
759 capability_scope: owner.capability.scope.clone(),
760 },
761 });
762 }
763
764 Ok(())
765 }
766
767 pub fn transfer_owned_choreography_session_owner(
768 &self,
769 owner: RuntimeSessionOwner,
770 next_owner_label: impl Into<String>,
771 next_scope: SessionOwnerCapabilityScope,
772 ) -> Result<RuntimeSessionOwner, SessionIngressError> {
773 self.assert_runtime_choreography_session_binding(owner.session_id, &owner.owner_label)?;
774 self.ensure_runtime_choreography_session_owner_capability(
775 owner.session_id,
776 &owner.capability,
777 )
778 .map_err(|error| SessionIngressError::InvalidIngressRouting {
779 session_id: owner.session_id,
780 owner_label: owner.owner_label.clone(),
781 details: RuntimeBoundaryError::CapabilityRejected { details: error },
782 })?;
783
784 let next_owner_label = next_owner_label.into();
785 let next_capability = self
786 .transfer_runtime_choreography_session_owner(
787 owner.session_id,
788 &owner.capability,
789 next_owner_label.clone(),
790 next_scope,
791 )
792 .map_err(|message| SessionIngressError::OwnerTransfer {
793 session_id: owner.session_id,
794 from_owner_label: owner.owner_label.clone(),
795 to_owner_label: next_owner_label.clone(),
796 message,
797 });
798
799 match next_capability {
800 Ok(next_capability) => {
801 let next_owner = RuntimeSessionOwner {
802 session_id: owner.session_id,
803 owner_label: next_owner_label,
804 capability: next_capability,
805 };
806 log_session_owner_transferred(
807 &owner,
808 &next_owner,
809 None,
810 "transfer_owned_choreography_session_owner",
811 );
812 Ok(next_owner)
813 }
814 Err(error) => {
815 log_session_owner_transfer_rejected(
816 &owner,
817 &next_owner_label,
818 None,
819 &error,
820 "transfer_owned_choreography_session_owner",
821 );
822 Err(error)
823 }
824 }
825 }
826
827 pub async fn end_owned_choreography_session(
828 &self,
829 owner: &RuntimeSessionOwner,
830 ) -> Result<(), SessionIngressError> {
831 self.assert_owned_choreography_session(owner)?;
832 ChoreographicEffects::end_session(self)
833 .await
834 .map_err(|error| SessionIngressError::SessionClose {
835 session_id: owner.session_id,
836 owner_label: owner.owner_label.clone(),
837 message: error.to_string(),
838 })
839 }
840}
841
842#[cfg(test)]
843mod tests {
844 use super::*;
845 use crate::core::AgentConfig;
846 use aura_core::AuthorityId;
847 use aura_mpst::{CompositionLinkSpec, CompositionManifest};
848 use aura_protocol::effects::{ChoreographicRole, RoleIndex};
849 use std::collections::BTreeSet;
850 use std::sync::Arc;
851
852 fn test_authority(byte: u8) -> AuthorityId {
853 AuthorityId::from_uuid(Uuid::from_bytes([byte; 16]))
854 }
855
856 fn linked_manifest(protocol_id: &str, bundle_id: &str) -> CompositionManifest {
857 CompositionManifest {
858 protocol_name: protocol_id.to_string(),
859 protocol_namespace: None,
860 protocol_qualified_name: protocol_id.to_string(),
861 protocol_id: protocol_id.to_string(),
862 role_names: vec!["Role".to_string()],
863 required_capabilities: Vec::new(),
864 theorem_packs: Vec::new(),
865 required_theorem_packs: Vec::new(),
866 required_theorem_pack_capabilities: Vec::new(),
867 guard_capabilities: Vec::new(),
868 determinism_policy_ref: None,
869 delegation_constraints: Vec::new(),
870 link_specs: vec![CompositionLinkSpec {
871 role: "Role".to_string(),
872 bundle_id: bundle_id.to_string(),
873 imports: Vec::new(),
874 exports: Vec::new(),
875 }],
876 }
877 }
878
879 #[tokio::test]
880 async fn transferring_runtime_session_owner_invalidates_stale_capability() {
881 let authority = test_authority(0x11);
882 let effects =
883 AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
884 .expect("test effect system");
885 let roles = vec![ChoreographicRole::for_authority(
886 authority,
887 RoleIndex::new(0).expect("role index"),
888 )];
889 let session_uuid = Uuid::from_bytes([0x22; 16]);
890 let original = effects
891 .start_owned_choreography_session("owner-a", session_uuid, roles)
892 .await
893 .expect("start owned session");
894 let stale = original.clone();
895
896 let transferred = effects
897 .transfer_owned_choreography_session_owner(
898 original,
899 "owner-b",
900 SessionOwnerCapabilityScope::Session,
901 )
902 .expect("transfer owner");
903
904 assert_eq!(transferred.owner_label, "owner-b");
905 assert!(
906 effects
907 .assert_owned_choreography_session(&transferred)
908 .is_ok(),
909 "new owner must be accepted"
910 );
911 assert!(
912 matches!(
913 effects.assert_owned_choreography_session(&stale),
914 Err(SessionIngressError::StaleOwner { .. })
915 | Err(SessionIngressError::InvalidIngressRouting { .. })
916 ),
917 "stale owner handle must be rejected after transfer"
918 );
919
920 effects
921 .end_owned_choreography_session(&transferred)
922 .await
923 .expect("close transferred session");
924 }
925
926 #[tokio::test]
927 async fn transferring_runtime_session_owner_moves_fragment_ownership_together() {
928 let authority = test_authority(0x33);
929 let effects =
930 AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
931 .expect("test effect system");
932 let roles = vec![ChoreographicRole::for_authority(
933 authority,
934 RoleIndex::new(0).expect("role index"),
935 )];
936 let session_uuid = Uuid::from_bytes([0x44; 16]);
937 let original = effects
938 .start_owned_choreography_session("owner-a", session_uuid, roles)
939 .await
940 .expect("start owned session");
941 let manifest = linked_manifest("aura.test.protocol", "bundle-a");
942 effects
943 .claim_vm_fragments_for_manifest("owner-a", &manifest)
944 .expect("claim fragment ownership");
945
946 let transferred = effects
947 .transfer_owned_choreography_session_owner(
948 original,
949 "owner-b",
950 SessionOwnerCapabilityScope::Session,
951 )
952 .expect("transfer owner");
953
954 let snapshot = effects.vm_fragment_snapshot();
955 assert_eq!(snapshot.len(), 1);
956 assert_eq!(snapshot[0].1.owner_label, "owner-b");
957 assert!(matches!(
958 snapshot[0].1.bundle_id.as_deref(),
959 Some("bundle-a")
960 ));
961
962 effects
963 .end_owned_choreography_session(&transferred)
964 .await
965 .expect("close transferred session");
966 }
967
968 #[tokio::test]
969 async fn fragment_scoped_owner_accepts_matching_boundary_and_rejects_wrong_boundary() {
970 let authority = test_authority(0x55);
971 let effects =
972 AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
973 .expect("test effect system");
974 let roles = vec![ChoreographicRole::for_authority(
975 authority,
976 RoleIndex::new(0).expect("role index"),
977 )];
978 let session_uuid = Uuid::from_bytes([0x66; 16]);
979 let original = effects
980 .start_owned_choreography_session("owner-a", session_uuid, roles)
981 .await
982 .expect("start owned session");
983
984 let transferred = effects
985 .transfer_owned_choreography_session_owner(
986 original,
987 "owner-b",
988 SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
989 "bundle:bundle-a".to_string()
990 ])),
991 )
992 .expect("transfer owner");
993
994 let allowed_boundary =
995 AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
996 "bundle:bundle-a".to_string(),
997 ])));
998 effects
999 .assert_owned_choreography_boundary(&transferred, &allowed_boundary)
1000 .expect("matching boundary should be accepted");
1001
1002 let rejected_boundary =
1003 AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
1004 "bundle:bundle-b".to_string(),
1005 ])));
1006 let rejected = effects
1007 .assert_owned_choreography_boundary(&transferred, &rejected_boundary)
1008 .expect_err("wrong boundary should be rejected");
1009 assert!(matches!(
1010 rejected,
1011 SessionIngressError::InvalidIngressRouting { .. }
1012 ));
1013 }
1014
1015 #[tokio::test]
1016 async fn attenuating_owner_capability_rejects_stale_generation_even_for_same_owner_label() {
1017 let authority = test_authority(0x77);
1018 let effects =
1019 AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
1020 .expect("test effect system");
1021 let roles = vec![ChoreographicRole::for_authority(
1022 authority,
1023 RoleIndex::new(0).expect("role index"),
1024 )];
1025 let session_uuid = Uuid::from_bytes([0x88; 16]);
1026 let original = effects
1027 .start_owned_choreography_session("owner-a", session_uuid, roles)
1028 .await
1029 .expect("start owned session");
1030 let stale_full_capability = original.clone();
1031
1032 let attenuated = effects
1033 .transfer_owned_choreography_session_owner(
1034 original,
1035 "owner-a",
1036 SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
1037 "bundle:bundle-a".to_string()
1038 ])),
1039 )
1040 .expect("attenuate owner capability");
1041 let attenuated_boundary =
1042 AuraLinkBoundary::for_scope(SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
1043 "bundle:bundle-a".to_string(),
1044 ])));
1045
1046 effects
1047 .assert_owned_choreography_boundary(&attenuated, &attenuated_boundary)
1048 .expect("attenuated capability should authorize its delegated boundary");
1049 assert!(
1050 matches!(
1051 effects.assert_owned_choreography_boundary(
1052 &stale_full_capability,
1053 &attenuated_boundary,
1054 ),
1055 Err(SessionIngressError::InvalidIngressRouting { .. })
1056 | Err(SessionIngressError::StaleOwner { .. })
1057 ),
1058 "stale capability generation must be rejected even if the owner label is unchanged"
1059 );
1060 }
1061
1062 #[tokio::test]
1063 async fn forged_cross_session_owner_capability_is_rejected() {
1064 let authority = test_authority(0x79);
1065 let effects =
1066 AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
1067 .expect("test effect system");
1068 let roles = vec![ChoreographicRole::for_authority(
1069 authority,
1070 RoleIndex::new(0).expect("role index"),
1071 )];
1072 let session_a = Uuid::from_bytes([0x8a; 16]);
1073 let session_b = Uuid::from_bytes([0x8b; 16]);
1074 let owner_a = effects
1075 .start_owned_choreography_session("owner-a", session_a, roles.clone())
1076 .await
1077 .expect("start session a");
1078 effects
1079 .end_owned_choreography_session(&owner_a)
1080 .await
1081 .expect("close session a");
1082
1083 let owner_b = effects
1084 .start_owned_choreography_session("owner-a", session_b, roles)
1085 .await
1086 .expect("start session b");
1087 let forged = RuntimeSessionOwner {
1088 session_id: owner_b.session_id,
1089 owner_label: owner_b.owner_label.clone(),
1090 capability: owner_a.capability.clone(),
1091 };
1092
1093 assert!(
1094 matches!(
1095 effects.assert_owned_choreography_session(&forged),
1096 Err(SessionIngressError::InvalidIngressRouting { .. })
1097 ),
1098 "cross-session forged authority artifacts must fail closed"
1099 );
1100 effects
1101 .assert_owned_choreography_session(&owner_b)
1102 .expect("canonical capability should still be accepted");
1103
1104 effects
1105 .end_owned_choreography_session(&owner_b)
1106 .await
1107 .expect("close session b");
1108 }
1109
1110 #[tokio::test]
1111 async fn duplicate_session_start_reports_typed_already_exists_reason() {
1112 let authority = test_authority(0x99);
1113 let effects = Arc::new(
1114 AuraEffectSystem::simulation_for_test_for_authority(&AgentConfig::default(), authority)
1115 .expect("test effect system"),
1116 );
1117 let roles = vec![ChoreographicRole::for_authority(
1118 authority,
1119 RoleIndex::new(0).expect("role index"),
1120 )];
1121 let session_uuid = Uuid::from_bytes([0xaa; 16]);
1122 let (first, second) = futures::join!(
1123 effects.start_owned_choreography_session("owner-a", session_uuid, roles.clone()),
1124 effects.start_owned_choreography_session("owner-b", session_uuid, roles)
1125 );
1126
1127 let (original, duplicate) = match (first, second) {
1128 (Ok(original), Err(duplicate)) => (original, duplicate),
1129 (Err(duplicate), Ok(original)) => (original, duplicate),
1130 (Ok(_), Ok(_)) => panic!("duplicate session start unexpectedly succeeded twice"),
1131 (Err(first_error), Err(second_error)) => {
1132 panic!(
1133 "duplicate session start unexpectedly failed twice: {first_error:?} / {second_error:?}"
1134 )
1135 }
1136 };
1137
1138 assert!(matches!(
1139 duplicate,
1140 SessionIngressError::SessionStart {
1141 reason: SessionStartFailureReason::AlreadyExists
1142 | SessionStartFailureReason::TaskAlreadyBound
1143 | SessionStartFailureReason::OwnerClaimRejected,
1144 ..
1145 }
1146 ));
1147
1148 effects
1149 .end_owned_choreography_session(&original)
1150 .await
1151 .expect("close original session");
1152 }
1153}