Skip to main content

aura_agent/runtime/subsystems/
choreography.rs

1//! Choreography State
2//!
3//! In-memory choreography session state for runtime coordination.
4
5use aura_core::effects::transport::TransportEnvelope;
6use aura_core::{AuthorityId, ContextId, DeviceId, SessionId};
7use aura_protocol::effects::{ChoreographicRole, ChoreographyMetrics, RoleIndex};
8use std::collections::{BTreeSet, HashMap};
9use std::fmt;
10use std::sync::Arc;
11use std::thread::ThreadId;
12use tokio::sync::Notify;
13use tokio::task::Id as TaskId;
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17enum ExecutionBindingKey {
18    Task(TaskId),
19    Thread(ThreadId),
20}
21
22/// Runtime choreography session identity bound to one active protocol execution.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
24pub struct RuntimeChoreographySessionId(Uuid);
25
26impl RuntimeChoreographySessionId {
27    /// Wrap one raw runtime session UUID.
28    pub fn from_uuid(session_id: Uuid) -> Self {
29        Self(session_id)
30    }
31
32    /// Borrow the raw runtime session UUID.
33    pub fn as_uuid(self) -> Uuid {
34        self.0
35    }
36
37    /// Explicitly bridge one durable Aura session identifier into runtime choreography scope.
38    pub fn from_aura_session_id(session_id: SessionId) -> Self {
39        Self::from_uuid(session_id.uuid())
40    }
41
42    /// Explicitly bridge one runtime choreography session into durable Aura session scope.
43    pub fn into_aura_session_id(self) -> SessionId {
44        SessionId::from_uuid(self.as_uuid())
45    }
46}
47
48impl fmt::Display for RuntimeChoreographySessionId {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        self.0.fmt(f)
51    }
52}
53
54/// Authoritative local owner record for one active runtime session.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct SessionOwnerRecord {
57    /// Stable local owner label.
58    pub owner_label: String,
59    /// Current capability proving authority to act for this owner.
60    pub capability: SessionOwnerCapability,
61}
62
63/// Scope granted to one current runtime session owner capability.
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum SessionOwnerCapabilityScope {
66    /// Grants authority across the full runtime session.
67    Session,
68    /// Grants authority only to the listed fragments.
69    Fragments(BTreeSet<String>),
70}
71
72/// Capability proving current authority to act on one runtime session.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct SessionOwnerCapability {
75    pub session_id: RuntimeChoreographySessionId,
76    pub owner_label: String,
77    pub generation: u64,
78    pub scope: SessionOwnerCapabilityScope,
79}
80
81impl SessionOwnerCapability {
82    pub fn full_session(
83        session_id: RuntimeChoreographySessionId,
84        owner_label: impl Into<String>,
85        generation: u64,
86    ) -> Self {
87        Self {
88            session_id,
89            owner_label: owner_label.into(),
90            generation,
91            scope: SessionOwnerCapabilityScope::Session,
92        }
93    }
94
95    pub fn allows_full_session(&self) -> bool {
96        matches!(self.scope, SessionOwnerCapabilityScope::Session)
97    }
98
99    pub fn with_scope(mut self, scope: SessionOwnerCapabilityScope) -> Self {
100        self.scope = scope;
101        self
102    }
103}
104
105/// Errors raised while managing runtime session ownership.
106#[derive(Debug, thiserror::Error, PartialEq, Eq)]
107pub enum SessionOwnershipError {
108    /// Another local owner already claimed the active session.
109    #[error(
110        "runtime session {session_id} is already owned by {existing_owner}; requested owner {requested_owner}"
111    )]
112    OwnerConflict {
113        session_id: RuntimeChoreographySessionId,
114        existing_owner: String,
115        requested_owner: String,
116    },
117    /// No authoritative owner exists for the requested session.
118    #[error("runtime session {session_id} has no owner record")]
119    MissingOwner {
120        session_id: RuntimeChoreographySessionId,
121    },
122    /// The caller does not match the authoritative owner for the requested session.
123    #[error("runtime session {session_id} is not owned by expected owner {expected_owner}")]
124    OwnerMismatch {
125        session_id: RuntimeChoreographySessionId,
126        expected_owner: String,
127    },
128    /// The caller's capability is stale or insufficient for the current owner record.
129    #[error(
130        "runtime session {session_id} rejected capability for owner {expected_owner}; current generation is {current_generation}"
131    )]
132    CapabilityMismatch {
133        session_id: RuntimeChoreographySessionId,
134        expected_owner: String,
135        current_generation: u64,
136    },
137    /// The caller's capability was issued for a different session.
138    #[error(
139        "runtime session {session_id} rejected capability for owner {expected_owner}; capability was issued for session {capability_session_id}"
140    )]
141    CapabilitySessionMismatch {
142        session_id: RuntimeChoreographySessionId,
143        expected_owner: String,
144        capability_session_id: RuntimeChoreographySessionId,
145    },
146}
147
148/// Typed reasons why a runtime choreography session start was rejected.
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub enum SessionStartError {
151    /// The current task or thread is still bound to an active session.
152    TaskAlreadyBound {
153        session_id: RuntimeChoreographySessionId,
154    },
155    /// Another active session already exists for the requested session id.
156    SessionAlreadyExists {
157        session_id: RuntimeChoreographySessionId,
158    },
159}
160
161impl fmt::Display for SessionStartError {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            Self::TaskAlreadyBound { session_id } => {
165                write!(
166                    f,
167                    "task already bound to active choreography session {session_id}"
168                )
169            }
170            Self::SessionAlreadyExists { session_id } => {
171                write!(f, "choreography session already exists: {session_id}")
172            }
173        }
174    }
175}
176
177/// In-memory choreography session state for one runtime session.
178#[derive(Debug, Clone)]
179pub struct ChoreographySessionState {
180    /// Current session ID.
181    pub session_id: RuntimeChoreographySessionId,
182    /// Stable protocol identifier when the session was admitted from a manifest.
183    pub protocol_id: Option<String>,
184    /// Context ID for this session
185    pub context_id: ContextId,
186    /// Roles participating in this choreography
187    pub roles: Vec<ChoreographicRole>,
188    /// This node's current role
189    pub current_role: ChoreographicRole,
190    /// Session timeout in milliseconds
191    pub timeout_ms: Option<u64>,
192    /// Session start time in milliseconds since epoch
193    pub started_at_ms: Option<u64>,
194    /// Session metrics
195    pub metrics: ChoreographyMetrics,
196}
197
198impl ChoreographySessionState {
199    fn new(
200        session_id: RuntimeChoreographySessionId,
201        protocol_id: Option<String>,
202        context_id: ContextId,
203        roles: Vec<ChoreographicRole>,
204        current_role: ChoreographicRole,
205        timeout_ms: Option<u64>,
206        now_ms: u64,
207    ) -> Self {
208        Self {
209            session_id,
210            protocol_id,
211            context_id,
212            roles,
213            current_role,
214            timeout_ms,
215            started_at_ms: Some(now_ms),
216            metrics: default_metrics(),
217        }
218    }
219}
220
221/// In-memory choreography session registry keyed by runtime session id.
222#[derive(Debug, Clone, Default)]
223pub struct ChoreographyState {
224    sessions: HashMap<RuntimeChoreographySessionId, ChoreographySessionState>,
225    session_owners: HashMap<RuntimeChoreographySessionId, SessionOwnerRecord>,
226    session_owner_generations: HashMap<RuntimeChoreographySessionId, u64>,
227    task_bindings: HashMap<ExecutionBindingKey, RuntimeChoreographySessionId>,
228    session_inbox_notifiers: HashMap<RuntimeChoreographySessionId, Arc<Notify>>,
229    session_inboxes: HashMap<RuntimeChoreographySessionId, Vec<TransportEnvelope>>,
230}
231
232fn default_metrics() -> ChoreographyMetrics {
233    ChoreographyMetrics {
234        messages_sent: 0,
235        messages_received: 0,
236        avg_latency_ms: 0.0,
237        timeout_count: 0,
238        retry_count: 0,
239        total_duration_ms: 0,
240    }
241}
242
243impl Default for ChoreographySessionState {
244    fn default() -> Self {
245        Self {
246            session_id: RuntimeChoreographySessionId::from_uuid(Uuid::from_bytes([0xA5; 16])),
247            protocol_id: None,
248            context_id: ContextId::new_from_entropy([0; 32]),
249            roles: Vec::new(),
250            current_role: ChoreographicRole::new(
251                DeviceId::from_uuid(Uuid::from_bytes([0x5A; 16])),
252                AuthorityId::from_uuid(Uuid::from_bytes([0x5B; 16])),
253                RoleIndex::new(0).expect("role index"),
254            ),
255            timeout_ms: None,
256            started_at_ms: None,
257            metrics: default_metrics(),
258        }
259    }
260}
261
262#[allow(dead_code)]
263impl ChoreographyState {
264    #[allow(clippy::disallowed_methods)] // Fallback for tests/sync callers outside a Tokio task.
265    fn current_binding_key() -> ExecutionBindingKey {
266        tokio::task::try_id()
267            .map(ExecutionBindingKey::Task)
268            .unwrap_or_else(|| ExecutionBindingKey::Thread(std::thread::current().id()))
269    }
270
271    /// Create a new empty state
272    pub fn new() -> Self {
273        Self::default()
274    }
275
276    /// Return the active session id bound to the current Tokio task, if any.
277    pub fn current_session_id(&self) -> Option<RuntimeChoreographySessionId> {
278        self.task_bindings
279            .get(&Self::current_binding_key())
280            .copied()
281    }
282
283    /// Return a clone of the session state bound to the current Tokio task.
284    pub fn current_session(&self) -> Option<ChoreographySessionState> {
285        let session_id = self.current_session_id()?;
286        self.sessions.get(&session_id).cloned()
287    }
288
289    /// Number of active runtime choreography sessions.
290    pub fn active_session_count(&self) -> usize {
291        self.sessions.len()
292    }
293
294    /// Start a new session and bind it to the current Tokio task.
295    pub fn start_session(
296        &mut self,
297        session_id: RuntimeChoreographySessionId,
298        protocol_id: Option<String>,
299        context_id: ContextId,
300        roles: Vec<ChoreographicRole>,
301        current_role: ChoreographicRole,
302        timeout_ms: Option<u64>,
303        now_ms: u64,
304    ) -> Result<(), SessionStartError> {
305        let task_id = Self::current_binding_key();
306        if let Some(existing_session_id) = self.task_bindings.get(&task_id).copied() {
307            if self.sessions.contains_key(&existing_session_id) {
308                return Err(SessionStartError::TaskAlreadyBound {
309                    session_id: existing_session_id,
310                });
311            }
312            self.task_bindings.remove(&task_id);
313        }
314        if self.sessions.contains_key(&session_id) {
315            return Err(SessionStartError::SessionAlreadyExists { session_id });
316        }
317
318        self.sessions.insert(
319            session_id,
320            ChoreographySessionState::new(
321                session_id,
322                protocol_id,
323                context_id,
324                roles,
325                current_role,
326                timeout_ms,
327                now_ms,
328            ),
329        );
330        self.session_inbox_notifiers
331            .insert(session_id, Arc::new(Notify::new()));
332        self.session_inboxes.entry(session_id).or_default();
333        self.task_bindings.insert(task_id, session_id);
334        Ok(())
335    }
336
337    /// Attach or replace the stable protocol identifier for one active session.
338    pub fn set_session_protocol_id(
339        &mut self,
340        session_id: RuntimeChoreographySessionId,
341        protocol_id: impl Into<String>,
342    ) -> Result<(), String> {
343        let session = self
344            .sessions
345            .get_mut(&session_id)
346            .ok_or_else(|| format!("missing choreography session state for {session_id}"))?;
347        session.protocol_id = Some(protocol_id.into());
348        Ok(())
349    }
350
351    /// End the current task-bound session and clean up all session bindings.
352    pub fn end_session(&mut self, now_ms: u64) -> Result<RuntimeChoreographySessionId, String> {
353        let task_id = Self::current_binding_key();
354        let session_id = self
355            .task_bindings
356            .remove(&task_id)
357            .ok_or_else(|| "no choreography session bound to current task".to_string())?;
358
359        let Some(mut session) = self.sessions.remove(&session_id) else {
360            return Err(format!(
361                "missing choreography session state for bound session {session_id}"
362            ));
363        };
364
365        if let Some(started) = session.started_at_ms {
366            session.metrics.total_duration_ms = now_ms.saturating_sub(started);
367        }
368        if let Some(notify) = self.session_inbox_notifiers.remove(&session_id) {
369            notify.notify_waiters();
370        }
371        self.session_owners.remove(&session_id);
372        self.session_inboxes.remove(&session_id);
373        self.task_bindings.retain(|_, sid| *sid != session_id);
374        Ok(session_id)
375    }
376
377    /// Cancel one session by explicit runtime session id and wake any blocked waiters.
378    pub fn cancel_session(&mut self, session_id: RuntimeChoreographySessionId) -> bool {
379        let removed = self.sessions.remove(&session_id).is_some();
380        if let Some(notify) = self.session_inbox_notifiers.remove(&session_id) {
381            notify.notify_waiters();
382        }
383        self.session_owners.remove(&session_id);
384        self.session_inboxes.remove(&session_id);
385        self.task_bindings.retain(|_, sid| *sid != session_id);
386        removed
387    }
388
389    /// Run a mutable update against the current task-bound session.
390    pub fn with_current_session_mut<T>(
391        &mut self,
392        f: impl FnOnce(&mut ChoreographySessionState) -> T,
393    ) -> Result<T, String> {
394        let task_id = Self::current_binding_key();
395        let session_id = self
396            .task_bindings
397            .get(&task_id)
398            .copied()
399            .ok_or_else(|| "no choreography session bound to current task".to_string())?;
400        let Some(session) = self.sessions.get_mut(&session_id) else {
401            self.task_bindings.remove(&task_id);
402            return Err(format!(
403                "missing choreography session state for bound session {session_id}"
404            ));
405        };
406        Ok(f(session))
407    }
408
409    /// Check if a session is active
410    pub fn is_active(&self) -> bool {
411        self.current_session_id()
412            .and_then(|session_id| self.sessions.get(&session_id))
413            .is_some()
414    }
415
416    /// Check if the session has timed out
417    pub fn is_timed_out(&self, now_ms: u64) -> bool {
418        let Some(session) = self.current_session() else {
419            return false;
420        };
421        match (session.started_at_ms, session.timeout_ms) {
422            (Some(started), Some(timeout)) => now_ms.saturating_sub(started) > timeout,
423            _ => false,
424        }
425    }
426
427    /// Snapshot the inbox notifier for one active session.
428    pub fn session_inbox_notify(
429        &self,
430        session_id: RuntimeChoreographySessionId,
431    ) -> Option<Arc<Notify>> {
432        self.session_inbox_notifiers.get(&session_id).cloned()
433    }
434
435    /// Wake any waiters blocked on one session-local inbox.
436    pub fn notify_session_inbox(&self, session_id: RuntimeChoreographySessionId) {
437        if let Some(notify) = self.session_inbox_notifiers.get(&session_id) {
438            notify.notify_waiters();
439        }
440    }
441
442    /// Queue one choreography envelope for one session-local inbox and wake waiters.
443    pub fn queue_session_envelope(
444        &mut self,
445        session_id: RuntimeChoreographySessionId,
446        envelope: TransportEnvelope,
447    ) {
448        self.session_inboxes
449            .entry(session_id)
450            .or_default()
451            .push(envelope);
452        self.notify_session_inbox(session_id);
453    }
454
455    /// Remove one matching choreography envelope from one session-local inbox.
456    pub fn take_matching_session_envelope(
457        &mut self,
458        session_id: RuntimeChoreographySessionId,
459        source: AuthorityId,
460        context_id: ContextId,
461        self_authority: AuthorityId,
462        self_device_id: &str,
463    ) -> Option<TransportEnvelope> {
464        let inbox = self.session_inboxes.get_mut(&session_id)?;
465        inbox
466            .iter()
467            .position(|env| {
468                let device_match = env
469                    .metadata
470                    .get("aura-destination-device-id")
471                    .is_some_and(|dst| dst == self_device_id);
472
473                if env.destination == self_authority {
474                    env.source == source
475                        && env.context == context_id
476                        && match env.metadata.get("aura-destination-device-id") {
477                            Some(dst) => dst == self_device_id,
478                            None => true,
479                        }
480                } else {
481                    env.source == source && env.context == context_id && device_match
482                }
483            })
484            .map(|pos| inbox.remove(pos))
485    }
486
487    /// Snapshot the current queued envelope count for one session-local inbox.
488    pub fn session_inbox_len(&self, session_id: RuntimeChoreographySessionId) -> usize {
489        self.session_inboxes
490            .get(&session_id)
491            .map_or(0, std::vec::Vec::len)
492    }
493
494    /// Clone the current queued envelopes for one session-local inbox.
495    pub fn session_inbox_snapshot(
496        &self,
497        session_id: RuntimeChoreographySessionId,
498    ) -> Vec<TransportEnvelope> {
499        self.session_inboxes
500            .get(&session_id)
501            .cloned()
502            .unwrap_or_default()
503    }
504
505    /// Claim authoritative ownership for one active session.
506    pub fn claim_session_owner(
507        &mut self,
508        session_id: RuntimeChoreographySessionId,
509        owner_label: impl Into<String>,
510    ) -> Result<SessionOwnerCapability, SessionOwnershipError> {
511        let owner_label = owner_label.into();
512        if !self.sessions.contains_key(&session_id) {
513            return Err(SessionOwnershipError::MissingOwner { session_id });
514        }
515
516        if let Some(existing) = self.session_owners.get(&session_id) {
517            if existing.owner_label != owner_label {
518                return Err(SessionOwnershipError::OwnerConflict {
519                    session_id,
520                    existing_owner: existing.owner_label.clone(),
521                    requested_owner: owner_label,
522                });
523            }
524            return Ok(existing.capability.clone());
525        }
526
527        let generation = self
528            .session_owner_generations
529            .entry(session_id)
530            .and_modify(|generation| *generation = generation.saturating_add(1))
531            .or_insert(1);
532        let capability =
533            SessionOwnerCapability::full_session(session_id, owner_label.clone(), *generation);
534        self.session_owners.insert(
535            session_id,
536            SessionOwnerRecord {
537                owner_label,
538                capability: capability.clone(),
539            },
540        );
541        Ok(capability)
542    }
543
544    /// Ensure the requested owner still holds the active session.
545    pub fn ensure_session_owner(
546        &self,
547        session_id: RuntimeChoreographySessionId,
548        expected_capability: &SessionOwnerCapability,
549    ) -> Result<(), SessionOwnershipError> {
550        let Some(owner) = self.session_owners.get(&session_id) else {
551            return Err(SessionOwnershipError::MissingOwner { session_id });
552        };
553
554        if owner.owner_label != expected_capability.owner_label {
555            return Err(SessionOwnershipError::OwnerMismatch {
556                session_id,
557                expected_owner: expected_capability.owner_label.clone(),
558            });
559        }
560
561        if expected_capability.session_id != session_id {
562            return Err(SessionOwnershipError::CapabilitySessionMismatch {
563                session_id,
564                expected_owner: expected_capability.owner_label.clone(),
565                capability_session_id: expected_capability.session_id,
566            });
567        }
568
569        if owner.capability.generation != expected_capability.generation
570            || owner.capability.scope != expected_capability.scope
571        {
572            return Err(SessionOwnershipError::CapabilityMismatch {
573                session_id,
574                expected_owner: expected_capability.owner_label.clone(),
575                current_generation: owner.capability.generation,
576            });
577        }
578
579        Ok(())
580    }
581
582    /// Release authoritative ownership for one active session.
583    pub fn release_session_owner(
584        &mut self,
585        session_id: RuntimeChoreographySessionId,
586        expected_capability: &SessionOwnerCapability,
587    ) -> Result<(), SessionOwnershipError> {
588        self.ensure_session_owner(session_id, expected_capability)?;
589        self.session_owners.remove(&session_id);
590        Ok(())
591    }
592
593    /// Atomically transfer authoritative ownership for one active session.
594    pub fn transfer_session_owner(
595        &mut self,
596        session_id: RuntimeChoreographySessionId,
597        expected_capability: &SessionOwnerCapability,
598        next_owner_label: impl Into<String>,
599        next_scope: SessionOwnerCapabilityScope,
600    ) -> Result<SessionOwnerCapability, SessionOwnershipError> {
601        self.ensure_session_owner(session_id, expected_capability)?;
602
603        let next_owner_label = next_owner_label.into();
604        let generation = self
605            .session_owner_generations
606            .entry(session_id)
607            .and_modify(|generation| *generation = generation.saturating_add(1))
608            .or_insert(1);
609        let next_capability =
610            SessionOwnerCapability::full_session(session_id, next_owner_label.clone(), *generation)
611                .with_scope(next_scope);
612        self.session_owners.insert(
613            session_id,
614            SessionOwnerRecord {
615                owner_label: next_owner_label,
616                capability: next_capability.clone(),
617            },
618        );
619        Ok(next_capability)
620    }
621
622    /// Snapshot the current owner for one active session.
623    pub fn session_owner(
624        &self,
625        session_id: RuntimeChoreographySessionId,
626    ) -> Option<&SessionOwnerRecord> {
627        self.session_owners.get(&session_id)
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634
635    #[test]
636    fn runtime_choreography_session_id_bridges_aura_session_id_explicitly() {
637        let aura_session_id = SessionId::new_from_entropy([9; 32]);
638        let runtime_session_id =
639            RuntimeChoreographySessionId::from_aura_session_id(aura_session_id);
640
641        assert_eq!(runtime_session_id.into_aura_session_id(), aura_session_id);
642    }
643
644    #[test]
645    fn session_notifier_tracks_session_lifecycle() {
646        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([4; 16]));
647        let role = ChoreographicRole::new(
648            authority_id,
649            AuthorityId::new_from_entropy([0u8; 32]),
650            RoleIndex::new(0).expect("role index"),
651        );
652        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(44));
653        let context_id = ContextId::new_from_entropy([7; 32]);
654        let mut state = ChoreographyState::new();
655
656        state
657            .start_session(
658                session_id,
659                None,
660                context_id,
661                vec![role],
662                role,
663                Some(1000),
664                0,
665            )
666            .expect("session starts");
667        assert!(
668            state.session_inbox_notify(session_id).is_some(),
669            "active session should expose an inbox notifier"
670        );
671
672        state.end_session(10).expect("session ends");
673        assert!(
674            state.session_inbox_notify(session_id).is_none(),
675            "ended session should release its inbox notifier"
676        );
677    }
678
679    #[test]
680    fn cancel_session_releases_bindings_and_inbox_state() {
681        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([5; 16]));
682        let role = ChoreographicRole::new(
683            authority_id,
684            AuthorityId::new_from_entropy([0u8; 32]),
685            RoleIndex::new(0).expect("role index"),
686        );
687        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(45));
688        let context_id = ContextId::new_from_entropy([8; 32]);
689        let mut state = ChoreographyState::new();
690
691        state
692            .start_session(
693                session_id,
694                None,
695                context_id,
696                vec![role],
697                role,
698                Some(1000),
699                0,
700            )
701            .expect("session starts");
702        state.queue_session_envelope(
703            session_id,
704            TransportEnvelope {
705                destination: AuthorityId::from_uuid(Uuid::from_bytes([5; 16])),
706                source: AuthorityId::from_uuid(Uuid::from_bytes([6; 16])),
707                context: context_id,
708                payload: vec![1],
709                metadata: std::collections::HashMap::new(),
710                receipt: None,
711            },
712        );
713
714        assert!(state.cancel_session(session_id));
715        assert!(state.current_session_id().is_none());
716        assert!(state.session_inbox_notify(session_id).is_none());
717        assert_eq!(state.session_inbox_len(session_id), 0);
718    }
719
720    #[test]
721    fn session_owner_conflict_is_rejected() {
722        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([7; 16]));
723        let role = ChoreographicRole::new(
724            authority_id,
725            AuthorityId::new_from_entropy([1u8; 32]),
726            RoleIndex::new(0).expect("role index"),
727        );
728        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(46));
729        let context_id = ContextId::new_from_entropy([9; 32]);
730        let mut state = ChoreographyState::new();
731
732        state
733            .start_session(
734                session_id,
735                None,
736                context_id,
737                vec![role],
738                role,
739                Some(1000),
740                0,
741            )
742            .expect("session starts");
743        state
744            .claim_session_owner(session_id, "owner-a")
745            .expect("owner a claims session");
746
747        assert!(matches!(
748            state.claim_session_owner(session_id, "owner-b"),
749            Err(SessionOwnershipError::OwnerConflict { .. })
750        ));
751    }
752
753    #[test]
754    fn ending_session_releases_owner_record() {
755        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([8; 16]));
756        let role = ChoreographicRole::new(
757            authority_id,
758            AuthorityId::new_from_entropy([2u8; 32]),
759            RoleIndex::new(0).expect("role index"),
760        );
761        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(47));
762        let context_id = ContextId::new_from_entropy([10; 32]);
763        let mut state = ChoreographyState::new();
764
765        state
766            .start_session(
767                session_id,
768                None,
769                context_id,
770                vec![role],
771                role,
772                Some(1000),
773                0,
774            )
775            .expect("session starts");
776        let capability = state
777            .claim_session_owner(session_id, "owner-a")
778            .expect("owner claims session");
779        state.end_session(10).expect("session ends");
780
781        assert!(matches!(
782            state.ensure_session_owner(session_id, &capability),
783            Err(SessionOwnershipError::MissingOwner { .. })
784        ));
785    }
786
787    #[test]
788    fn reclaiming_owner_invalidates_stale_capability_generation() {
789        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([9; 16]));
790        let role = ChoreographicRole::new(
791            authority_id,
792            AuthorityId::new_from_entropy([3u8; 32]),
793            RoleIndex::new(0).expect("role index"),
794        );
795        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(48));
796        let context_id = ContextId::new_from_entropy([11; 32]);
797        let mut state = ChoreographyState::new();
798
799        state
800            .start_session(
801                session_id,
802                None,
803                context_id,
804                vec![role],
805                role,
806                Some(1000),
807                0,
808            )
809            .expect("session starts");
810        let first = state
811            .claim_session_owner(session_id, "owner-a")
812            .expect("owner claims session");
813        state
814            .release_session_owner(session_id, &first)
815            .expect("owner releases session");
816        let second = state
817            .claim_session_owner(session_id, "owner-a")
818            .expect("owner reclaims session");
819
820        assert!(second.generation > first.generation);
821        assert!(matches!(
822            state.ensure_session_owner(session_id, &first),
823            Err(SessionOwnershipError::CapabilityMismatch { .. })
824        ));
825        assert!(state.ensure_session_owner(session_id, &second).is_ok());
826    }
827
828    #[test]
829    fn transfer_session_owner_is_atomic_and_invalidates_old_capability() {
830        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([10; 16]));
831        let role = ChoreographicRole::new(
832            authority_id,
833            AuthorityId::new_from_entropy([4u8; 32]),
834            RoleIndex::new(0).expect("role index"),
835        );
836        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(49));
837        let context_id = ContextId::new_from_entropy([12; 32]);
838        let mut state = ChoreographyState::new();
839
840        state
841            .start_session(
842                session_id,
843                None,
844                context_id,
845                vec![role],
846                role,
847                Some(1000),
848                0,
849            )
850            .expect("session starts");
851        let original = state
852            .claim_session_owner(session_id, "owner-a")
853            .expect("owner claims session");
854        let transferred = state
855            .transfer_session_owner(
856                session_id,
857                &original,
858                "owner-b",
859                SessionOwnerCapabilityScope::Fragments(BTreeSet::from([
860                    "fragment.alpha".to_string()
861                ])),
862            )
863            .expect("ownership transfers");
864
865        assert_eq!(transferred.owner_label, "owner-b");
866        assert!(matches!(
867            transferred.scope,
868            SessionOwnerCapabilityScope::Fragments(_)
869        ));
870        assert!(matches!(
871            state.ensure_session_owner(session_id, &original),
872            Err(SessionOwnershipError::OwnerMismatch { .. })
873                | Err(SessionOwnershipError::CapabilityMismatch { .. })
874        ));
875        assert!(state.ensure_session_owner(session_id, &transferred).is_ok());
876    }
877
878    #[test]
879    fn cross_session_forged_capability_is_rejected() {
880        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([12; 16]));
881        let role = ChoreographicRole::new(
882            authority_id,
883            AuthorityId::new_from_entropy([6u8; 32]),
884            RoleIndex::new(0).expect("role index"),
885        );
886        let session_a = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(51));
887        let session_b = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(52));
888        let context_a = ContextId::new_from_entropy([14; 32]);
889        let context_b = ContextId::new_from_entropy([15; 32]);
890        let mut state = ChoreographyState::new();
891
892        state
893            .start_session(session_a, None, context_a, vec![role], role, Some(1000), 0)
894            .expect("session a starts");
895        state.task_bindings.clear();
896        state
897            .start_session(session_b, None, context_b, vec![role], role, Some(1000), 1)
898            .expect("session b starts");
899
900        let capability_a = state
901            .claim_session_owner(session_a, "owner-a")
902            .expect("owner claims session a");
903        let capability_b = state
904            .claim_session_owner(session_b, "owner-a")
905            .expect("owner claims session b");
906
907        assert_eq!(capability_a.generation, capability_b.generation);
908
909        let forged = SessionOwnerCapability {
910            session_id: session_a,
911            owner_label: capability_b.owner_label.clone(),
912            generation: capability_b.generation,
913            scope: capability_b.scope.clone(),
914        };
915
916        assert!(matches!(
917            state.ensure_session_owner(session_b, &forged),
918            Err(SessionOwnershipError::CapabilitySessionMismatch { .. })
919        ));
920        assert!(state.ensure_session_owner(session_b, &capability_b).is_ok());
921    }
922
923    #[test]
924    fn duplicate_session_start_is_typed() {
925        let authority_id = DeviceId::from_uuid(Uuid::from_bytes([11; 16]));
926        let role = ChoreographicRole::new(
927            authority_id,
928            AuthorityId::new_from_entropy([5u8; 32]),
929            RoleIndex::new(0).expect("role index"),
930        );
931        let session_id = RuntimeChoreographySessionId::from_uuid(Uuid::from_u128(50));
932        let context_id = ContextId::new_from_entropy([13; 32]);
933        let mut state = ChoreographyState::new();
934
935        state
936            .start_session(
937                session_id,
938                None,
939                context_id,
940                vec![role],
941                role,
942                Some(1000),
943                0,
944            )
945            .expect("session starts");
946        state.task_bindings.clear();
947
948        assert_eq!(
949            state
950                .start_session(
951                    session_id,
952                    None,
953                    context_id,
954                    vec![role],
955                    role,
956                    Some(1000),
957                    1
958                )
959                .expect_err("duplicate session should be rejected"),
960            SessionStartError::SessionAlreadyExists { session_id }
961        );
962    }
963}