1use aura_core::effects::transport::TransportEnvelope;
6use aura_core::{AuthorityId, ContextId, DeviceId, SessionId};
7use aura_protocol::effects::{ChoreographicRole, ChoreographyMetrics, RoleIndex};
8use std::collections::{BTreeSet, HashMap};
9use std::fmt;
10use std::sync::Arc;
11use std::thread::ThreadId;
12use tokio::sync::Notify;
13use tokio::task::Id as TaskId;
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17enum ExecutionBindingKey {
18 Task(TaskId),
19 Thread(ThreadId),
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
24pub struct RuntimeChoreographySessionId(Uuid);
25
26impl RuntimeChoreographySessionId {
27 pub fn from_uuid(session_id: Uuid) -> Self {
29 Self(session_id)
30 }
31
32 pub fn as_uuid(self) -> Uuid {
34 self.0
35 }
36
37 pub fn from_aura_session_id(session_id: SessionId) -> Self {
39 Self::from_uuid(session_id.uuid())
40 }
41
42 pub fn into_aura_session_id(self) -> SessionId {
44 SessionId::from_uuid(self.as_uuid())
45 }
46}
47
48impl fmt::Display for RuntimeChoreographySessionId {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 self.0.fmt(f)
51 }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct SessionOwnerRecord {
57 pub owner_label: String,
59 pub capability: SessionOwnerCapability,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum SessionOwnerCapabilityScope {
66 Session,
68 Fragments(BTreeSet<String>),
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct SessionOwnerCapability {
75 pub session_id: RuntimeChoreographySessionId,
76 pub owner_label: String,
77 pub generation: u64,
78 pub scope: SessionOwnerCapabilityScope,
79}
80
81impl SessionOwnerCapability {
82 pub fn full_session(
83 session_id: RuntimeChoreographySessionId,
84 owner_label: impl Into<String>,
85 generation: u64,
86 ) -> Self {
87 Self {
88 session_id,
89 owner_label: owner_label.into(),
90 generation,
91 scope: SessionOwnerCapabilityScope::Session,
92 }
93 }
94
95 pub fn allows_full_session(&self) -> bool {
96 matches!(self.scope, SessionOwnerCapabilityScope::Session)
97 }
98
99 pub fn with_scope(mut self, scope: SessionOwnerCapabilityScope) -> Self {
100 self.scope = scope;
101 self
102 }
103}
104
105#[derive(Debug, thiserror::Error, PartialEq, Eq)]
107pub enum SessionOwnershipError {
108 #[error(
110 "runtime session {session_id} is already owned by {existing_owner}; requested owner {requested_owner}"
111 )]
112 OwnerConflict {
113 session_id: RuntimeChoreographySessionId,
114 existing_owner: String,
115 requested_owner: String,
116 },
117 #[error("runtime session {session_id} has no owner record")]
119 MissingOwner {
120 session_id: RuntimeChoreographySessionId,
121 },
122 #[error("runtime session {session_id} is not owned by expected owner {expected_owner}")]
124 OwnerMismatch {
125 session_id: RuntimeChoreographySessionId,
126 expected_owner: String,
127 },
128 #[error(
130 "runtime session {session_id} rejected capability for owner {expected_owner}; current generation is {current_generation}"
131 )]
132 CapabilityMismatch {
133 session_id: RuntimeChoreographySessionId,
134 expected_owner: String,
135 current_generation: u64,
136 },
137 #[error(
139 "runtime session {session_id} rejected capability for owner {expected_owner}; capability was issued for session {capability_session_id}"
140 )]
141 CapabilitySessionMismatch {
142 session_id: RuntimeChoreographySessionId,
143 expected_owner: String,
144 capability_session_id: RuntimeChoreographySessionId,
145 },
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub enum SessionStartError {
151 TaskAlreadyBound {
153 session_id: RuntimeChoreographySessionId,
154 },
155 SessionAlreadyExists {
157 session_id: RuntimeChoreographySessionId,
158 },
159}
160
161impl fmt::Display for SessionStartError {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 match self {
164 Self::TaskAlreadyBound { session_id } => {
165 write!(
166 f,
167 "task already bound to active choreography session {session_id}"
168 )
169 }
170 Self::SessionAlreadyExists { session_id } => {
171 write!(f, "choreography session already exists: {session_id}")
172 }
173 }
174 }
175}
176
177#[derive(Debug, Clone)]
179pub struct ChoreographySessionState {
180 pub session_id: RuntimeChoreographySessionId,
182 pub protocol_id: Option<String>,
184 pub context_id: ContextId,
186 pub roles: Vec<ChoreographicRole>,
188 pub current_role: ChoreographicRole,
190 pub timeout_ms: Option<u64>,
192 pub started_at_ms: Option<u64>,
194 pub metrics: ChoreographyMetrics,
196}
197
198impl ChoreographySessionState {
199 fn new(
200 session_id: RuntimeChoreographySessionId,
201 protocol_id: Option<String>,
202 context_id: ContextId,
203 roles: Vec<ChoreographicRole>,
204 current_role: ChoreographicRole,
205 timeout_ms: Option<u64>,
206 now_ms: u64,
207 ) -> Self {
208 Self {
209 session_id,
210 protocol_id,
211 context_id,
212 roles,
213 current_role,
214 timeout_ms,
215 started_at_ms: Some(now_ms),
216 metrics: default_metrics(),
217 }
218 }
219}
220
221#[derive(Debug, Clone, Default)]
223pub struct ChoreographyState {
224 sessions: HashMap<RuntimeChoreographySessionId, ChoreographySessionState>,
225 session_owners: HashMap<RuntimeChoreographySessionId, SessionOwnerRecord>,
226 session_owner_generations: HashMap<RuntimeChoreographySessionId, u64>,
227 task_bindings: HashMap<ExecutionBindingKey, RuntimeChoreographySessionId>,
228 session_inbox_notifiers: HashMap<RuntimeChoreographySessionId, Arc<Notify>>,
229 session_inboxes: HashMap<RuntimeChoreographySessionId, Vec<TransportEnvelope>>,
230}
231
232fn default_metrics() -> ChoreographyMetrics {
233 ChoreographyMetrics {
234 messages_sent: 0,
235 messages_received: 0,
236 avg_latency_ms: 0.0,
237 timeout_count: 0,
238 retry_count: 0,
239 total_duration_ms: 0,
240 }
241}
242
243impl Default for ChoreographySessionState {
244 fn default() -> Self {
245 Self {
246 session_id: RuntimeChoreographySessionId::from_uuid(Uuid::from_bytes([0xA5; 16])),
247 protocol_id: None,
248 context_id: ContextId::new_from_entropy([0; 32]),
249 roles: Vec::new(),
250 current_role: ChoreographicRole::new(
251 DeviceId::from_uuid(Uuid::from_bytes([0x5A; 16])),
252 AuthorityId::from_uuid(Uuid::from_bytes([0x5B; 16])),
253 RoleIndex::new(0).expect("role index"),
254 ),
255 timeout_ms: None,
256 started_at_ms: None,
257 metrics: default_metrics(),
258 }
259 }
260}
261
262#[allow(dead_code)]
263impl ChoreographyState {
264 #[allow(clippy::disallowed_methods)] fn current_binding_key() -> ExecutionBindingKey {
266 tokio::task::try_id()
267 .map(ExecutionBindingKey::Task)
268 .unwrap_or_else(|| ExecutionBindingKey::Thread(std::thread::current().id()))
269 }
270
271 pub fn new() -> Self {
273 Self::default()
274 }
275
276 pub fn current_session_id(&self) -> Option<RuntimeChoreographySessionId> {
278 self.task_bindings
279 .get(&Self::current_binding_key())
280 .copied()
281 }
282
283 pub fn current_session(&self) -> Option<ChoreographySessionState> {
285 let session_id = self.current_session_id()?;
286 self.sessions.get(&session_id).cloned()
287 }
288
289 pub fn active_session_count(&self) -> usize {
291 self.sessions.len()
292 }
293
294 pub fn start_session(
296 &mut self,
297 session_id: RuntimeChoreographySessionId,
298 protocol_id: Option<String>,
299 context_id: ContextId,
300 roles: Vec<ChoreographicRole>,
301 current_role: ChoreographicRole,
302 timeout_ms: Option<u64>,
303 now_ms: u64,
304 ) -> Result<(), SessionStartError> {
305 let task_id = Self::current_binding_key();
306 if let Some(existing_session_id) = self.task_bindings.get(&task_id).copied() {
307 if self.sessions.contains_key(&existing_session_id) {
308 return Err(SessionStartError::TaskAlreadyBound {
309 session_id: existing_session_id,
310 });
311 }
312 self.task_bindings.remove(&task_id);
313 }
314 if self.sessions.contains_key(&session_id) {
315 return Err(SessionStartError::SessionAlreadyExists { session_id });
316 }
317
318 self.sessions.insert(
319 session_id,
320 ChoreographySessionState::new(
321 session_id,
322 protocol_id,
323 context_id,
324 roles,
325 current_role,
326 timeout_ms,
327 now_ms,
328 ),
329 );
330 self.session_inbox_notifiers
331 .insert(session_id, Arc::new(Notify::new()));
332 self.session_inboxes.entry(session_id).or_default();
333 self.task_bindings.insert(task_id, session_id);
334 Ok(())
335 }
336
337 pub fn set_session_protocol_id(
339 &mut self,
340 session_id: RuntimeChoreographySessionId,
341 protocol_id: impl Into<String>,
342 ) -> Result<(), String> {
343 let session = self
344 .sessions
345 .get_mut(&session_id)
346 .ok_or_else(|| format!("missing choreography session state for {session_id}"))?;
347 session.protocol_id = Some(protocol_id.into());
348 Ok(())
349 }
350
351 pub fn end_session(&mut self, now_ms: u64) -> Result<RuntimeChoreographySessionId, String> {
353 let task_id = Self::current_binding_key();
354 let session_id = self
355 .task_bindings
356 .remove(&task_id)
357 .ok_or_else(|| "no choreography session bound to current task".to_string())?;
358
359 let Some(mut session) = self.sessions.remove(&session_id) else {
360 return Err(format!(
361 "missing choreography session state for bound session {session_id}"
362 ));
363 };
364
365 if let Some(started) = session.started_at_ms {
366 session.metrics.total_duration_ms = now_ms.saturating_sub(started);
367 }
368 if let Some(notify) = self.session_inbox_notifiers.remove(&session_id) {
369 notify.notify_waiters();
370 }
371 self.session_owners.remove(&session_id);
372 self.session_inboxes.remove(&session_id);
373 self.task_bindings.retain(|_, sid| *sid != session_id);
374 Ok(session_id)
375 }
376
377 pub fn cancel_session(&mut self, session_id: RuntimeChoreographySessionId) -> bool {
379 let removed = self.sessions.remove(&session_id).is_some();
380 if let Some(notify) = self.session_inbox_notifiers.remove(&session_id) {
381 notify.notify_waiters();
382 }
383 self.session_owners.remove(&session_id);
384 self.session_inboxes.remove(&session_id);
385 self.task_bindings.retain(|_, sid| *sid != session_id);
386 removed
387 }
388
389 pub fn with_current_session_mut<T>(
391 &mut self,
392 f: impl FnOnce(&mut ChoreographySessionState) -> T,
393 ) -> Result<T, String> {
394 let task_id = Self::current_binding_key();
395 let session_id = self
396 .task_bindings
397 .get(&task_id)
398 .copied()
399 .ok_or_else(|| "no choreography session bound to current task".to_string())?;
400 let Some(session) = self.sessions.get_mut(&session_id) else {
401 self.task_bindings.remove(&task_id);
402 return Err(format!(
403 "missing choreography session state for bound session {session_id}"
404 ));
405 };
406 Ok(f(session))
407 }
408
409 pub fn is_active(&self) -> bool {
411 self.current_session_id()
412 .and_then(|session_id| self.sessions.get(&session_id))
413 .is_some()
414 }
415
416 pub fn is_timed_out(&self, now_ms: u64) -> bool {
418 let Some(session) = self.current_session() else {
419 return false;
420 };
421 match (session.started_at_ms, session.timeout_ms) {
422 (Some(started), Some(timeout)) => now_ms.saturating_sub(started) > timeout,
423 _ => false,
424 }
425 }
426
427 pub fn session_inbox_notify(
429 &self,
430 session_id: RuntimeChoreographySessionId,
431 ) -> Option<Arc<Notify>> {
432 self.session_inbox_notifiers.get(&session_id).cloned()
433 }
434
435 pub fn notify_session_inbox(&self, session_id: RuntimeChoreographySessionId) {
437 if let Some(notify) = self.session_inbox_notifiers.get(&session_id) {
438 notify.notify_waiters();
439 }
440 }
441
442 pub fn queue_session_envelope(
444 &mut self,
445 session_id: RuntimeChoreographySessionId,
446 envelope: TransportEnvelope,
447 ) {
448 self.session_inboxes
449 .entry(session_id)
450 .or_default()
451 .push(envelope);
452 self.notify_session_inbox(session_id);
453 }
454
455 pub fn take_matching_session_envelope(
457 &mut self,
458 session_id: RuntimeChoreographySessionId,
459 source: AuthorityId,
460 context_id: ContextId,
461 self_authority: AuthorityId,
462 self_device_id: &str,
463 ) -> Option<TransportEnvelope> {
464 let inbox = self.session_inboxes.get_mut(&session_id)?;
465 inbox
466 .iter()
467 .position(|env| {
468 let device_match = env
469 .metadata
470 .get("aura-destination-device-id")
471 .is_some_and(|dst| dst == self_device_id);
472
473 if env.destination == self_authority {
474 env.source == source
475 && env.context == context_id
476 && match env.metadata.get("aura-destination-device-id") {
477 Some(dst) => dst == self_device_id,
478 None => true,
479 }
480 } else {
481 env.source == source && env.context == context_id && device_match
482 }
483 })
484 .map(|pos| inbox.remove(pos))
485 }
486
487 pub fn session_inbox_len(&self, session_id: RuntimeChoreographySessionId) -> usize {
489 self.session_inboxes
490 .get(&session_id)
491 .map_or(0, std::vec::Vec::len)
492 }
493
494 pub fn session_inbox_snapshot(
496 &self,
497 session_id: RuntimeChoreographySessionId,
498 ) -> Vec<TransportEnvelope> {
499 self.session_inboxes
500 .get(&session_id)
501 .cloned()
502 .unwrap_or_default()
503 }
504
505 pub fn claim_session_owner(
507 &mut self,
508 session_id: RuntimeChoreographySessionId,
509 owner_label: impl Into<String>,
510 ) -> Result<SessionOwnerCapability, SessionOwnershipError> {
511 let owner_label = owner_label.into();
512 if !self.sessions.contains_key(&session_id) {
513 return Err(SessionOwnershipError::MissingOwner { session_id });
514 }
515
516 if let Some(existing) = self.session_owners.get(&session_id) {
517 if existing.owner_label != owner_label {
518 return Err(SessionOwnershipError::OwnerConflict {
519 session_id,
520 existing_owner: existing.owner_label.clone(),
521 requested_owner: owner_label,
522 });
523 }
524 return Ok(existing.capability.clone());
525 }
526
527 let generation = self
528 .session_owner_generations
529 .entry(session_id)
530 .and_modify(|generation| *generation = generation.saturating_add(1))
531 .or_insert(1);
532 let capability =
533 SessionOwnerCapability::full_session(session_id, owner_label.clone(), *generation);
534 self.session_owners.insert(
535 session_id,
536 SessionOwnerRecord {
537 owner_label,
538 capability: capability.clone(),
539 },
540 );
541 Ok(capability)
542 }
543
544 pub fn ensure_session_owner(
546 &self,
547 session_id: RuntimeChoreographySessionId,
548 expected_capability: &SessionOwnerCapability,
549 ) -> Result<(), SessionOwnershipError> {
550 let Some(owner) = self.session_owners.get(&session_id) else {
551 return Err(SessionOwnershipError::MissingOwner { session_id });
552 };
553
554 if owner.owner_label != expected_capability.owner_label {
555 return Err(SessionOwnershipError::OwnerMismatch {
556 session_id,
557 expected_owner: expected_capability.owner_label.clone(),
558 });
559 }
560
561 if expected_capability.session_id != session_id {
562 return Err(SessionOwnershipError::CapabilitySessionMismatch {
563 session_id,
564 expected_owner: expected_capability.owner_label.clone(),
565 capability_session_id: expected_capability.session_id,
566 });
567 }
568
569 if owner.capability.generation != expected_capability.generation
570 || owner.capability.scope != expected_capability.scope
571 {
572 return Err(SessionOwnershipError::CapabilityMismatch {
573 session_id,
574 expected_owner: expected_capability.owner_label.clone(),
575 current_generation: owner.capability.generation,
576 });
577 }
578
579 Ok(())
580 }
581
582 pub fn release_session_owner(
584 &mut self,
585 session_id: RuntimeChoreographySessionId,
586 expected_capability: &SessionOwnerCapability,
587 ) -> Result<(), SessionOwnershipError> {
588 self.ensure_session_owner(session_id, expected_capability)?;
589 self.session_owners.remove(&session_id);
590 Ok(())
591 }
592
593 pub fn transfer_session_owner(
595 &mut self,
596 session_id: RuntimeChoreographySessionId,
597 expected_capability: &SessionOwnerCapability,
598 next_owner_label: impl Into<String>,
599 next_scope: SessionOwnerCapabilityScope,
600 ) -> Result<SessionOwnerCapability, SessionOwnershipError> {
601 self.ensure_session_owner(session_id, expected_capability)?;
602
603 let next_owner_label = next_owner_label.into();
604 let generation = self
605 .session_owner_generations
606 .entry(session_id)
607 .and_modify(|generation| *generation = generation.saturating_add(1))
608 .or_insert(1);
609 let next_capability =
610 SessionOwnerCapability::full_session(session_id, next_owner_label.clone(), *generation)
611 .with_scope(next_scope);
612 self.session_owners.insert(
613 session_id,
614 SessionOwnerRecord {
615 owner_label: next_owner_label,
616 capability: next_capability.clone(),
617 },
618 );
619 Ok(next_capability)
620 }
621
622 pub fn session_owner(
624 &self,
625 session_id: RuntimeChoreographySessionId,
626 ) -> Option<&SessionOwnerRecord> {
627 self.session_owners.get(&session_id)
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634
635 #[test]
636 fn runtime_choreography_session_id_bridges_aura_session_id_explicitly() {
637 let aura_session_id = SessionId::new_from_entropy([9; 32]);
638 let runtime_session_id =
639 RuntimeChoreographySessionId::from_aura_session_id(aura_session_id);
640
641 assert_eq!(runtime_session_id.into_aura_session_id(), aura_session_id);
642 }
643
644 #[test]
645 fn session_notifier_tracks_session_lifecycle() {
646 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([4; 16]));
647 let role = ChoreographicRole::new(
648 authority_id,
649 AuthorityId::new_from_entropy([0u8; 32]),
650 RoleIndex::new(0).expect("role index"),
651 );
652 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(44));
653 let context_id = ContextId::new_from_entropy([7; 32]);
654 let mut state = ChoreographyState::new();
655
656 state
657 .start_session(
658 session_id,
659 None,
660 context_id,
661 vec![role],
662 role,
663 Some(1000),
664 0,
665 )
666 .expect("session starts");
667 assert!(
668 state.session_inbox_notify(session_id).is_some(),
669 "active session should expose an inbox notifier"
670 );
671
672 state.end_session(10).expect("session ends");
673 assert!(
674 state.session_inbox_notify(session_id).is_none(),
675 "ended session should release its inbox notifier"
676 );
677 }
678
679 #[test]
680 fn cancel_session_releases_bindings_and_inbox_state() {
681 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([5; 16]));
682 let role = ChoreographicRole::new(
683 authority_id,
684 AuthorityId::new_from_entropy([0u8; 32]),
685 RoleIndex::new(0).expect("role index"),
686 );
687 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(45));
688 let context_id = ContextId::new_from_entropy([8; 32]);
689 let mut state = ChoreographyState::new();
690
691 state
692 .start_session(
693 session_id,
694 None,
695 context_id,
696 vec![role],
697 role,
698 Some(1000),
699 0,
700 )
701 .expect("session starts");
702 state.queue_session_envelope(
703 session_id,
704 TransportEnvelope {
705 destination: AuthorityId::from_uuid(Uuid::from_bytes([5; 16])),
706 source: AuthorityId::from_uuid(Uuid::from_bytes([6; 16])),
707 context: context_id,
708 payload: vec![1],
709 metadata: std::collections::HashMap::new(),
710 receipt: None,
711 },
712 );
713
714 assert!(state.cancel_session(session_id));
715 assert!(state.current_session_id().is_none());
716 assert!(state.session_inbox_notify(session_id).is_none());
717 assert_eq!(state.session_inbox_len(session_id), 0);
718 }
719
720 #[test]
721 fn session_owner_conflict_is_rejected() {
722 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([7; 16]));
723 let role = ChoreographicRole::new(
724 authority_id,
725 AuthorityId::new_from_entropy([1u8; 32]),
726 RoleIndex::new(0).expect("role index"),
727 );
728 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(46));
729 let context_id = ContextId::new_from_entropy([9; 32]);
730 let mut state = ChoreographyState::new();
731
732 state
733 .start_session(
734 session_id,
735 None,
736 context_id,
737 vec![role],
738 role,
739 Some(1000),
740 0,
741 )
742 .expect("session starts");
743 state
744 .claim_session_owner(session_id, "owner-a")
745 .expect("owner a claims session");
746
747 assert!(matches!(
748 state.claim_session_owner(session_id, "owner-b"),
749 Err(SessionOwnershipError::OwnerConflict { .. })
750 ));
751 }
752
753 #[test]
754 fn ending_session_releases_owner_record() {
755 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([8; 16]));
756 let role = ChoreographicRole::new(
757 authority_id,
758 AuthorityId::new_from_entropy([2u8; 32]),
759 RoleIndex::new(0).expect("role index"),
760 );
761 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(47));
762 let context_id = ContextId::new_from_entropy([10; 32]);
763 let mut state = ChoreographyState::new();
764
765 state
766 .start_session(
767 session_id,
768 None,
769 context_id,
770 vec![role],
771 role,
772 Some(1000),
773 0,
774 )
775 .expect("session starts");
776 let capability = state
777 .claim_session_owner(session_id, "owner-a")
778 .expect("owner claims session");
779 state.end_session(10).expect("session ends");
780
781 assert!(matches!(
782 state.ensure_session_owner(session_id, &capability),
783 Err(SessionOwnershipError::MissingOwner { .. })
784 ));
785 }
786
787 #[test]
788 fn reclaiming_owner_invalidates_stale_capability_generation() {
789 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([9; 16]));
790 let role = ChoreographicRole::new(
791 authority_id,
792 AuthorityId::new_from_entropy([3u8; 32]),
793 RoleIndex::new(0).expect("role index"),
794 );
795 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(48));
796 let context_id = ContextId::new_from_entropy([11; 32]);
797 let mut state = ChoreographyState::new();
798
799 state
800 .start_session(
801 session_id,
802 None,
803 context_id,
804 vec![role],
805 role,
806 Some(1000),
807 0,
808 )
809 .expect("session starts");
810 let first = state
811 .claim_session_owner(session_id, "owner-a")
812 .expect("owner claims session");
813 state
814 .release_session_owner(session_id, &first)
815 .expect("owner releases session");
816 let second = state
817 .claim_session_owner(session_id, "owner-a")
818 .expect("owner reclaims session");
819
820 assert!(second.generation > first.generation);
821 assert!(matches!(
822 state.ensure_session_owner(session_id, &first),
823 Err(SessionOwnershipError::CapabilityMismatch { .. })
824 ));
825 assert!(state.ensure_session_owner(session_id, &second).is_ok());
826 }
827
828 #[test]
829 fn transfer_session_owner_is_atomic_and_invalidates_old_capability() {
830 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([10; 16]));
831 let role = ChoreographicRole::new(
832 authority_id,
833 AuthorityId::new_from_entropy([4u8; 32]),
834 RoleIndex::new(0).expect("role index"),
835 );
836 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(49));
837 let context_id = ContextId::new_from_entropy([12; 32]);
838 let mut state = ChoreographyState::new();
839
840 state
841 .start_session(
842 session_id,
843 None,
844 context_id,
845 vec![role],
846 role,
847 Some(1000),
848 0,
849 )
850 .expect("session starts");
851 let original = state
852 .claim_session_owner(session_id, "owner-a")
853 .expect("owner claims session");
854 let transferred = state
855 .transfer_session_owner(
856 session_id,
857 &original,
858 "owner-b",
859 SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
860 "fragment.alpha".to_string()
861 ])),
862 )
863 .expect("ownership transfers");
864
865 assert_eq!(transferred.owner_label, "owner-b");
866 assert!(matches!(
867 transferred.scope,
868 SessionOwnerCapabilityScope::Fragments(_)
869 ));
870 assert!(matches!(
871 state.ensure_session_owner(session_id, &original),
872 Err(SessionOwnershipError::OwnerMismatch { .. })
873 | Err(SessionOwnershipError::CapabilityMismatch { .. })
874 ));
875 assert!(state.ensure_session_owner(session_id, &transferred).is_ok());
876 }
877
878 #[test]
879 fn cross_session_forged_capability_is_rejected() {
880 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([12; 16]));
881 let role = ChoreographicRole::new(
882 authority_id,
883 AuthorityId::new_from_entropy([6u8; 32]),
884 RoleIndex::new(0).expect("role index"),
885 );
886 let session_a = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(51));
887 let session_b = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(52));
888 let context_a = ContextId::new_from_entropy([14; 32]);
889 let context_b = ContextId::new_from_entropy([15; 32]);
890 let mut state = ChoreographyState::new();
891
892 state
893 .start_session(session_a, None, context_a, vec![role], role, Some(1000), 0)
894 .expect("session a starts");
895 state.task_bindings.clear();
896 state
897 .start_session(session_b, None, context_b, vec![role], role, Some(1000), 1)
898 .expect("session b starts");
899
900 let capability_a = state
901 .claim_session_owner(session_a, "owner-a")
902 .expect("owner claims session a");
903 let capability_b = state
904 .claim_session_owner(session_b, "owner-a")
905 .expect("owner claims session b");
906
907 assert_eq!(capability_a.generation, capability_b.generation);
908
909 let forged = SessionOwnerCapability {
910 session_id: session_a,
911 owner_label: capability_b.owner_label.clone(),
912 generation: capability_b.generation,
913 scope: capability_b.scope.clone(),
914 };
915
916 assert!(matches!(
917 state.ensure_session_owner(session_b, &forged),
918 Err(SessionOwnershipError::CapabilitySessionMismatch { .. })
919 ));
920 assert!(state.ensure_session_owner(session_b, &capability_b).is_ok());
921 }
922
923 #[test]
924 fn duplicate_session_start_is_typed() {
925 let authority_id = DeviceId::from_uuid(Uuid::from_bytes([11; 16]));
926 let role = ChoreographicRole::new(
927 authority_id,
928 AuthorityId::new_from_entropy([5u8; 32]),
929 RoleIndex::new(0).expect("role index"),
930 );
931 let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(50));
932 let context_id = ContextId::new_from_entropy([13; 32]);
933 let mut state = ChoreographyState::new();
934
935 state
936 .start_session(
937 session_id,
938 None,
939 context_id,
940 vec![role],
941 role,
942 Some(1000),
943 0,
944 )
945 .expect("session starts");
946 state.task_bindings.clear();
947
948 assert_eq!(
949 state
950 .start_session(
951 session_id,
952 None,
953 context_id,
954 vec![role],
955 role,
956 Some(1000),
957 1
958 )
959 .expect_err("duplicate session should be rejected"),
960 SessionStartError::SessionAlreadyExists { session_id }
961 );
962 }
963}