Skip to main content

telltale_machine/session/
store.rs

1// Session store serialization and index rebuilding.
2impl<'de> Deserialize<'de> for SessionState {
3    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
4    where
5        D: serde::Deserializer<'de>,
6    {
7        let raw = SessionStateSerde::deserialize(deserializer)?;
8        let mut session = Self {
9            sid: raw.sid,
10            roles: raw.roles,
11            role_ids: BTreeMap::new(),
12            local_types: raw.local_types,
13            buffers: raw.buffers,
14            edge_lookup: BTreeMap::new(),
15            handler_ids: BTreeMap::new(),
16            handlers_by_id: Vec::new(),
17            edge_handler_lookup: BTreeMap::new(),
18            default_handler_id: None,
19            label_ids: BTreeMap::new(),
20            labels_by_id: Vec::new(),
21            branch_lookup: BTreeMap::new(),
22            auth_leaves: raw.auth_leaves,
23            auth_trees: raw.auth_trees,
24            auth_roots: raw.auth_roots,
25            edge_handlers: raw.edge_handlers,
26            default_handler: raw.default_handler,
27            edge_traces: raw.edge_traces,
28            status: raw.status,
29            epoch: raw.epoch,
30            ownership: raw.ownership,
31        };
32        session.rebuild_derived_indexes();
33        Ok(session)
34    }
35}
36
37/// Store of all sessions managed by the ProtocolMachine.
38///
39/// Provides type lookup/update methods that match the Lean
40/// `SessionStore.lookupType` / `SessionStore.updateType` pattern.
41#[derive(Debug, Default, Serialize, Deserialize)]
42pub struct SessionStore {
43    sessions: BTreeMap<SessionId, SessionState>,
44    #[serde(default)]
45    archived_closed: Vec<ClosedSessionSummary>,
46    next_id: SessionId,
47}
48
49impl SessionStore {
50    fn session_mut_or_error(
51        &mut self,
52        sid: SessionId,
53    ) -> Result<&mut SessionState, OwnershipError> {
54        self.sessions
55            .get_mut(&sid)
56            .ok_or(OwnershipError::SessionNotFound { session_id: sid })
57    }
58
59    fn terminal_error(session: &SessionState) -> Option<OwnershipError> {
60        session
61            .ownership
62            .terminal_reason
63            .clone()
64            .map(|reason| OwnershipError::Terminal {
65                session_id: session.sid,
66                reason,
67            })
68    }
69
70    fn ensure_mutable_ownership(session: &SessionState) -> Result<(), OwnershipError> {
71        if let Some(err) = Self::terminal_error(session) {
72            return Err(err);
73        }
74        Ok(())
75    }
76
77    fn validate_current_owner(
78        session: &SessionState,
79        capability: &OwnershipCapability,
80    ) -> Result<(), OwnershipError> {
81        Self::ensure_mutable_ownership(session)?;
82        let Some(current) = session.ownership.current.as_ref() else {
83            return Err(OwnershipError::Unclaimed {
84                session_id: session.sid,
85            });
86        };
87        if current.owner_id != capability.owner_id || current.generation != capability.generation {
88            return Err(OwnershipError::StaleCapability {
89                session_id: session.sid,
90                owner_id: capability.owner_id.clone(),
91                expected_generation: capability.generation,
92                actual_generation: current.generation,
93            });
94        }
95        Ok(())
96    }
97
98    fn require_session_scope(
99        session: &SessionState,
100        capability: &OwnershipCapability,
101    ) -> Result<(), OwnershipError> {
102        Self::validate_current_owner(session, capability)?;
103        let Some(current) = session.ownership.current.as_ref() else {
104            return Err(OwnershipError::Unclaimed {
105                session_id: session.sid,
106            });
107        };
108        if !current.scope.allows_session_mutation() {
109            return Err(OwnershipError::ScopeViolation {
110                session_id: session.sid,
111                owner_id: current.owner_id.clone(),
112                required: OwnershipScope::Session,
113                actual: current.scope.clone(),
114            });
115        }
116        Ok(())
117    }
118
119    fn next_witness_id(session: &mut SessionState) -> AuthorityWitnessId {
120        let witness_id = session.ownership.next_witness_id;
121        session.ownership.next_witness_id = session.ownership.next_witness_id.saturating_add(1);
122        witness_id
123    }
124
125    fn push_authority_audit(
126        session: &mut SessionState,
127        artifact: AuthorityArtifact,
128        event: AuthorityAuditEvent,
129        reason: Option<String>,
130    ) {
131        session.ownership.audit_log.push(AuthorityAuditRecord {
132            tick: None,
133            artifact,
134            event,
135            reason,
136        });
137    }
138
139    /// Create an empty session store.
140    #[must_use]
141    pub fn new() -> Self {
142        Self::default()
143    }
144
145    /// Open a new session with an externally supplied session id.
146    ///
147    /// Callers should source ids from `SessionStore::next_session_id()`.
148    #[allow(clippy::needless_pass_by_value)]
149    pub fn open_with_sid(
150        &mut self,
151        sid: SessionId,
152        roles: Vec<String>,
153        buffer_config: &BufferConfig,
154        initial_types: &BTreeMap<String, LocalTypeR>,
155    ) -> SessionId {
156        let plan = SessionOpenPlan::new(&roles, initial_types);
157        self.open_with_sid_from_plan(sid, &plan, buffer_config)
158    }
159
160    /// Open a new session from a reusable precomputed open plan.
161    pub fn open_with_sid_from_plan(
162        &mut self,
163        sid: SessionId,
164        plan: &SessionOpenPlan,
165        buffer_config: &BufferConfig,
166    ) -> SessionId {
167        let state = SessionState::from_open_plan(sid, plan, buffer_config);
168        self.sessions.insert(sid, state);
169        self.next_id = self.next_id.max(sid.saturating_add(1));
170        sid
171    }
172
173    /// Open a new session with the given roles, buffer config, and initial local types.
174    ///
175    /// Returns the session ID. Endpoints are constructed as `Endpoint { sid, role }`.
176    #[allow(clippy::needless_pass_by_value)]
177    pub fn open(
178        &mut self,
179        roles: Vec<String>,
180        buffer_config: &BufferConfig,
181        initial_types: &BTreeMap<String, LocalTypeR>,
182    ) -> SessionId {
183        let sid = self.next_id;
184        self.open_with_sid(sid, roles, buffer_config, initial_types)
185    }
186
187    /// Next session identifier that will be allocated by `open`.
188    #[must_use]
189    pub fn next_session_id(&self) -> SessionId {
190        self.next_id
191    }
192
193    // ---- Type state methods (match Lean SessionStore.lookupType / updateType) ----
194
195    /// Lookup the current local type for an endpoint.
196    ///
197    /// Matches Lean `SessionStore.lookupType`.
198    #[must_use]
199    pub fn lookup_type(&self, ep: &Endpoint) -> Option<&LocalTypeR> {
200        self.sessions
201            .get(&ep.sid)?
202            .local_types
203            .get(ep)
204            .map(|e| &e.current)
205    }
206
207    /// Update the local type for an endpoint (type advancement on commit).
208    ///
209    /// Matches Lean `SessionStore.updateType`.
210    pub fn update_type(&mut self, ep: &Endpoint, new_type: LocalTypeR) {
211        if let Some(session) = self.sessions.get_mut(&ep.sid) {
212            if let Some(entry) = session.local_types.get_mut(ep) {
213                entry.current = new_type;
214            }
215            session.refresh_endpoint_branch_lookup(ep);
216        }
217    }
218
219    /// Update the original type (when entering a new Mu scope).
220    pub fn update_original(&mut self, ep: &Endpoint, new_original: LocalTypeR) {
221        if let Some(session) = self.sessions.get_mut(&ep.sid) {
222            if let Some(entry) = session.local_types.get_mut(ep) {
223                entry.original = new_original;
224            }
225        }
226    }
227
228    /// Get the original type for recursive unfolding.
229    #[must_use]
230    pub fn original_type(&self, ep: &Endpoint) -> Option<&LocalTypeR> {
231        self.sessions
232            .get(&ep.sid)?
233            .local_types
234            .get(ep)
235            .map(|e| &e.original)
236    }
237
238    /// Remove type entry (on Halt/End — session endpoint completed).
239    pub fn remove_type(&mut self, ep: &Endpoint) {
240        if let Some(session) = self.sessions.get_mut(&ep.sid) {
241            session.local_types.remove(ep);
242            session.branch_lookup.remove(ep);
243        }
244    }
245
246    // ---- Session access methods ----
247
248    /// Get a reference to a session.
249    #[must_use]
250    pub fn get(&self, sid: SessionId) -> Option<&SessionState> {
251        self.sessions.get(&sid)
252    }
253
254    /// Get a mutable reference to a session.
255    pub fn get_mut(&mut self, sid: SessionId) -> Option<&mut SessionState> {
256        self.sessions.get_mut(&sid)
257    }
258
259    /// Get the current live ownership capability for a session, if any.
260    #[must_use]
261    pub fn current_ownership(&self, sid: SessionId) -> Option<&OwnershipCapability> {
262        self.sessions.get(&sid)?.ownership.current.as_ref()
263    }
264
265    /// Validate that a capability still matches the live owner state.
266    ///
267    /// # Errors
268    ///
269    /// Returns an `OwnershipError` if the session is missing, terminal, or stale.
270    pub fn validate_ownership_capability(
271        &self,
272        capability: &OwnershipCapability,
273    ) -> Result<(), OwnershipError> {
274        let session = self
275            .sessions
276            .get(&capability.session_id)
277            .ok_or(OwnershipError::SessionNotFound {
278                session_id: capability.session_id,
279            })?;
280        Self::validate_current_owner(session, capability)
281    }
282
283    /// Read deterministic authority audit records for one session.
284    #[must_use]
285    pub fn authority_audit_log(&self, sid: SessionId) -> Option<&[AuthorityAuditRecord]> {
286        Some(self.sessions.get(&sid)?.ownership.audit_log.as_slice())
287    }
288
289    /// Claim ownership for an unclaimed session.
290    ///
291    /// # Errors
292    ///
293    /// Returns an `OwnershipError` if the session is missing, terminal, or already claimed.
294    pub fn claim_ownership(
295        &mut self,
296        sid: SessionId,
297        owner_id: impl Into<FragmentOwnerId>,
298        scope: OwnershipScope,
299    ) -> Result<OwnershipCapability, OwnershipError> {
300        let session = self.session_mut_or_error(sid)?;
301        Self::ensure_mutable_ownership(session)?;
302        if let Some(current) = session.ownership.current.as_ref() {
303            return Err(OwnershipError::AlreadyClaimed {
304                session_id: sid,
305                current_owner_id: current.owner_id.clone(),
306            });
307        }
308        if let Some(pending) = session.ownership.pending_transfer.as_ref() {
309            return Err(OwnershipError::TransferPending {
310                session_id: sid,
311                claim_id: pending.receipt.claim_id,
312            });
313        }
314        let capability = OwnershipCapability {
315            session_id: sid,
316            owner_id: owner_id.into(),
317            generation: 0,
318            scope,
319        };
320        session.ownership.current = Some(capability.clone());
321        Self::push_authority_audit(
322            session,
323            AuthorityArtifact::OwnershipCapability(capability.clone()),
324            AuthorityAuditEvent::Issued,
325            None,
326        );
327        Ok(capability)
328    }
329
330    /// Release the current owner capability for a session.
331    ///
332    /// # Errors
333    ///
334    /// Returns an `OwnershipError` if the capability is stale or a transfer is still pending.
335    pub fn release_ownership(
336        &mut self,
337        capability: &OwnershipCapability,
338    ) -> Result<(), OwnershipError> {
339        let session = self.session_mut_or_error(capability.session_id)?;
340        Self::validate_current_owner(session, capability)?;
341        if let Some(pending) = session.ownership.pending_transfer.as_ref() {
342            return Err(OwnershipError::TransferPending {
343                session_id: capability.session_id,
344                claim_id: pending.receipt.claim_id,
345            });
346        }
347        Self::push_authority_audit(
348            session,
349            AuthorityArtifact::OwnershipCapability(capability.clone()),
350            AuthorityAuditEvent::Invalidated,
351            Some("ownership released".to_string()),
352        );
353        session.ownership.current = None;
354        Ok(())
355    }
356
357    /// Begin an explicit ownership transfer and return a typed receipt.
358    ///
359    /// # Errors
360    ///
361    /// Returns an `OwnershipError` if the capability is stale or another transfer is pending.
362    pub fn begin_ownership_transfer(
363        &mut self,
364        capability: &OwnershipCapability,
365        new_owner_id: impl Into<FragmentOwnerId>,
366        new_scope: OwnershipScope,
367    ) -> Result<OwnershipReceipt, OwnershipError> {
368        let session = self.session_mut_or_error(capability.session_id)?;
369        Self::validate_current_owner(session, capability)?;
370        if let Some(pending) = session.ownership.pending_transfer.as_ref() {
371            return Err(OwnershipError::TransferPending {
372                session_id: capability.session_id,
373                claim_id: pending.receipt.claim_id,
374            });
375        }
376        let claim_id = session.ownership.next_claim_id;
377        session.ownership.next_claim_id = session.ownership.next_claim_id.saturating_add(1);
378        let receipt = OwnershipReceipt {
379            session_id: capability.session_id,
380            claim_id,
381            from_owner_id: capability.owner_id.clone(),
382            from_generation: capability.generation,
383            to_owner_id: new_owner_id.into(),
384            to_generation: capability.generation.saturating_add(1),
385            scope: new_scope,
386        };
387        session.ownership.pending_transfer = Some(PendingOwnershipTransfer {
388            receipt: receipt.clone(),
389        });
390        Self::push_authority_audit(
391            session,
392            AuthorityArtifact::OwnershipReceipt(receipt.clone()),
393            AuthorityAuditEvent::Issued,
394            None,
395        );
396        Ok(receipt)
397    }
398
399    /// Commit a previously staged ownership transfer.
400    ///
401    /// # Errors
402    ///
403    /// Returns an `OwnershipError` if the receipt is stale or mismatched.
404    pub fn commit_ownership_transfer(
405        &mut self,
406        receipt: &OwnershipReceipt,
407    ) -> Result<OwnershipCapability, OwnershipError> {
408        let session = self.session_mut_or_error(receipt.session_id)?;
409        Self::ensure_mutable_ownership(session)?;
410        let Some(current) = session.ownership.current.as_ref() else {
411            return Err(OwnershipError::Unclaimed {
412                session_id: receipt.session_id,
413            });
414        };
415        let Some(pending) = session.ownership.pending_transfer.as_ref() else {
416            Self::push_authority_audit(
417                session,
418                AuthorityArtifact::OwnershipReceipt(receipt.clone()),
419                AuthorityAuditEvent::Rejected,
420                Some("receipt is no longer pending".to_string()),
421            );
422            return Err(OwnershipError::TransferNotPending {
423                session_id: receipt.session_id,
424            });
425        };
426        if pending.receipt != *receipt {
427            Self::push_authority_audit(
428                session,
429                AuthorityArtifact::OwnershipReceipt(receipt.clone()),
430                AuthorityAuditEvent::Rejected,
431                Some("receipt payload mismatch".to_string()),
432            );
433            return Err(OwnershipError::ReceiptMismatch {
434                session_id: receipt.session_id,
435                claim_id: receipt.claim_id,
436            });
437        }
438        if current.owner_id != receipt.from_owner_id || current.generation != receipt.from_generation
439        {
440            return Err(OwnershipError::StaleCapability {
441                session_id: receipt.session_id,
442                owner_id: receipt.from_owner_id.clone(),
443                expected_generation: receipt.from_generation,
444                actual_generation: current.generation,
445            });
446        }
447        let capability = OwnershipCapability {
448            session_id: receipt.session_id,
449            owner_id: receipt.to_owner_id.clone(),
450            generation: receipt.to_generation,
451            scope: receipt.scope.clone(),
452        };
453        let old_capability = current.clone();
454        session.ownership.current = Some(capability.clone());
455        session.ownership.pending_transfer = None;
456        Self::push_authority_audit(
457            session,
458            AuthorityArtifact::OwnershipCapability(old_capability),
459            AuthorityAuditEvent::Invalidated,
460            Some("ownership transferred".to_string()),
461        );
462        Self::push_authority_audit(
463            session,
464            AuthorityArtifact::OwnershipReceipt(receipt.clone()),
465            AuthorityAuditEvent::Committed,
466            None,
467        );
468        Self::push_authority_audit(
469            session,
470            AuthorityArtifact::OwnershipCapability(capability.clone()),
471            AuthorityAuditEvent::Issued,
472            None,
473        );
474        Ok(capability)
475    }
476
477    /// Roll back only the staged transfer identified by this receipt.
478    ///
479    /// # Errors
480    ///
481    /// Returns an `OwnershipError` if the receipt does not match the current staged transfer.
482    pub fn rollback_ownership_transfer(
483        &mut self,
484        receipt: &OwnershipReceipt,
485    ) -> Result<(), OwnershipError> {
486        let session = self.session_mut_or_error(receipt.session_id)?;
487        Self::ensure_mutable_ownership(session)?;
488        let Some(pending) = session.ownership.pending_transfer.as_ref() else {
489            Self::push_authority_audit(
490                session,
491                AuthorityArtifact::OwnershipReceipt(receipt.clone()),
492                AuthorityAuditEvent::Rejected,
493                Some("receipt is no longer pending".to_string()),
494            );
495            return Err(OwnershipError::TransferNotPending {
496                session_id: receipt.session_id,
497            });
498        };
499        if pending.receipt != *receipt {
500            Self::push_authority_audit(
501                session,
502                AuthorityArtifact::OwnershipReceipt(receipt.clone()),
503                AuthorityAuditEvent::Rejected,
504                Some("receipt payload mismatch".to_string()),
505            );
506            return Err(OwnershipError::ReceiptMismatch {
507                session_id: receipt.session_id,
508                claim_id: receipt.claim_id,
509            });
510        }
511        session.ownership.pending_transfer = None;
512        Self::push_authority_audit(
513            session,
514            AuthorityArtifact::OwnershipReceipt(receipt.clone()),
515            AuthorityAuditEvent::RolledBack,
516            None,
517        );
518        Ok(())
519    }
520
521    /// Attenuate or otherwise change scope for the same owner.
522    ///
523    /// # Errors
524    ///
525    /// Returns an `OwnershipError` if the capability is stale or a transfer is pending.
526    pub fn attenuate_ownership_scope(
527        &mut self,
528        capability: &OwnershipCapability,
529        new_scope: OwnershipScope,
530    ) -> Result<OwnershipCapability, OwnershipError> {
531        let session = self.session_mut_or_error(capability.session_id)?;
532        Self::validate_current_owner(session, capability)?;
533        if let Some(pending) = session.ownership.pending_transfer.as_ref() {
534            return Err(OwnershipError::TransferPending {
535                session_id: capability.session_id,
536                claim_id: pending.receipt.claim_id,
537            });
538        }
539        let next = OwnershipCapability {
540            session_id: capability.session_id,
541            owner_id: capability.owner_id.clone(),
542            generation: capability.generation.saturating_add(1),
543            scope: new_scope,
544        };
545        Self::push_authority_audit(
546            session,
547            AuthorityArtifact::OwnershipCapability(capability.clone()),
548            AuthorityAuditEvent::Invalidated,
549            Some("ownership scope attenuated".to_string()),
550        );
551        session.ownership.current = Some(next.clone());
552        Self::push_authority_audit(
553            session,
554            AuthorityArtifact::OwnershipCapability(next.clone()),
555            AuthorityAuditEvent::Issued,
556            None,
557        );
558        Ok(next)
559    }
560
561    /// Apply session-local host mutation through the ownership gate.
562    ///
563    /// # Errors
564    ///
565    /// Returns an `OwnershipError` if the capability is stale or lacks full-session scope.
566    pub fn apply_owned_session_mutation(
567        &mut self,
568        capability: &OwnershipCapability,
569        mutation: SessionHostMutation,
570    ) -> Result<(), OwnershipError> {
571        let session = self.session_mut_or_error(capability.session_id)?;
572        Self::require_session_scope(session, capability)?;
573        match mutation {
574            SessionHostMutation::SetDefaultHandler { handler } => {
575                let handler_id = session.intern_handler_binding(&handler);
576                session.default_handler = handler;
577                session.default_handler_id = Some(handler_id);
578            }
579            SessionHostMutation::UpdateEdgeHandler { edge, handler } => {
580                let handler_id = session.intern_handler_binding(&handler);
581                if let Some(edge_key) = session.edge_key_for_roles(&edge.sender, &edge.receiver) {
582                    session.edge_handler_lookup.insert(edge_key, handler_id);
583                }
584                session.edge_handlers.insert(edge, handler);
585            }
586            SessionHostMutation::UpdateTrace { edge, trace } => {
587                session.edge_traces.insert(edge, trace);
588            }
589        }
590        Ok(())
591    }
592
593    /// Issue a single-use readiness witness under the current owner capability.
594    ///
595    /// # Errors
596    ///
597    /// Returns an `OwnershipError` if the capability is stale or lacks full-session scope.
598    pub fn issue_readiness_witness(
599        &mut self,
600        capability: &OwnershipCapability,
601        predicate_ref: impl Into<String>,
602    ) -> Result<ReadinessWitness, OwnershipError> {
603        let session = self.session_mut_or_error(capability.session_id)?;
604        Self::require_session_scope(session, capability)?;
605        let witness = ReadinessWitness {
606            witness_id: Self::next_witness_id(session),
607            session_id: capability.session_id,
608            owner_id: capability.owner_id.clone(),
609            generation: capability.generation,
610            scope: capability.scope.clone(),
611            predicate_ref: predicate_ref.into(),
612        };
613        session
614            .ownership
615            .issued_readiness
616            .insert(witness.witness_id, witness.clone());
617        Self::push_authority_audit(
618            session,
619            AuthorityArtifact::Readiness(witness.clone()),
620            AuthorityAuditEvent::Issued,
621            None,
622        );
623        Ok(witness)
624    }
625
626    /// Consume a readiness witness exactly once under the same live owner capability.
627    ///
628    /// # Errors
629    ///
630    /// Returns an `OwnershipError` if the witness is stale, forged, mismatched, or already used.
631    pub fn consume_readiness_witness(
632        &mut self,
633        capability: &OwnershipCapability,
634        witness: &ReadinessWitness,
635    ) -> Result<(), OwnershipError> {
636        let session = self.session_mut_or_error(capability.session_id)?;
637        Self::require_session_scope(session, capability)?;
638        if session.ownership.consumed_witnesses.contains(&witness.witness_id) {
639            Self::push_authority_audit(
640                session,
641                AuthorityArtifact::Readiness(witness.clone()),
642                AuthorityAuditEvent::Rejected,
643                Some("witness already consumed".to_string()),
644            );
645            return Err(OwnershipError::WitnessConsumed {
646                session_id: witness.session_id,
647                witness_id: witness.witness_id,
648            });
649        }
650        let Some(issued) = session.ownership.issued_readiness.get(&witness.witness_id) else {
651            Self::push_authority_audit(
652                session,
653                AuthorityArtifact::Readiness(witness.clone()),
654                AuthorityAuditEvent::Rejected,
655                Some("witness was never issued".to_string()),
656            );
657            return Err(OwnershipError::InvalidWitness {
658                session_id: capability.session_id,
659                witness_id: witness.witness_id,
660                reason: "witness was never issued".to_string(),
661            });
662        };
663        if issued != witness {
664            Self::push_authority_audit(
665                session,
666                AuthorityArtifact::Readiness(witness.clone()),
667                AuthorityAuditEvent::Rejected,
668                Some("witness payload mismatch".to_string()),
669            );
670            return Err(OwnershipError::InvalidWitness {
671                session_id: capability.session_id,
672                witness_id: witness.witness_id,
673                reason: "witness payload mismatch".to_string(),
674            });
675        }
676        if witness.session_id != capability.session_id
677            || witness.owner_id != capability.owner_id
678            || witness.generation != capability.generation
679            || witness.scope != capability.scope
680        {
681            Self::push_authority_audit(
682                session,
683                AuthorityArtifact::Readiness(witness.clone()),
684                AuthorityAuditEvent::Rejected,
685                Some("live ownership no longer matches witness".to_string()),
686            );
687            return Err(OwnershipError::InvalidWitness {
688                session_id: capability.session_id,
689                witness_id: witness.witness_id,
690                reason: "live ownership no longer matches witness".to_string(),
691            });
692        }
693        session.ownership.issued_readiness.remove(&witness.witness_id);
694        session
695            .ownership
696            .consumed_witnesses
697            .insert(witness.witness_id);
698        Self::push_authority_audit(
699            session,
700            AuthorityArtifact::Readiness(witness.clone()),
701            AuthorityAuditEvent::Consumed,
702            None,
703        );
704        Ok(())
705    }
706
707    /// Mark the current owner as dead and fault the session.
708    ///
709    /// # Errors
710    ///
711    /// Returns an `OwnershipError` if the session is missing or owner mismatch occurs.
712    pub fn mark_owner_died(
713        &mut self,
714        sid: SessionId,
715        owner_id: &str,
716    ) -> Result<CancellationWitness, OwnershipError> {
717        let session = self.session_mut_or_error(sid)?;
718        Self::ensure_mutable_ownership(session)?;
719        let Some(current) = session.ownership.current.clone() else {
720            return Err(OwnershipError::Unclaimed { session_id: sid });
721        };
722        if current.owner_id != owner_id {
723            return Err(OwnershipError::StaleCapability {
724                session_id: sid,
725                owner_id: owner_id.to_string(),
726                expected_generation: current.generation,
727                actual_generation: current.generation,
728            });
729        }
730        let generation = current.generation;
731        let reason = OwnershipTerminalReason::OwnerDied {
732            owner_id: owner_id.to_string(),
733        };
734        session.status = SessionStatus::Faulted {
735            reason: format!("ownership owner `{owner_id}` died"),
736        };
737        let witness = CancellationWitness {
738            witness_id: Self::next_witness_id(session),
739            session_id: sid,
740            owner_id: owner_id.to_string(),
741            generation,
742            reason: reason.clone(),
743        };
744        session.ownership.current = None;
745        session.ownership.pending_transfer = None;
746        session.ownership.terminal_reason = Some(reason);
747        Self::push_authority_audit(
748            session,
749            AuthorityArtifact::OwnershipCapability(current),
750            AuthorityAuditEvent::Invalidated,
751            Some("owner died".to_string()),
752        );
753        Self::push_authority_audit(
754            session,
755            AuthorityArtifact::Cancellation(witness.clone()),
756            AuthorityAuditEvent::Issued,
757            None,
758        );
759        Ok(witness)
760    }
761
762    /// Cancel a session because a staged transfer was abandoned.
763    ///
764    /// # Errors
765    ///
766    /// Returns an `OwnershipError` if the receipt does not match the staged transfer.
767    pub fn cancel_abandoned_transfer(
768        &mut self,
769        receipt: &OwnershipReceipt,
770    ) -> Result<CancellationWitness, OwnershipError> {
771        let session = self.session_mut_or_error(receipt.session_id)?;
772        Self::ensure_mutable_ownership(session)?;
773        let Some(pending) = session.ownership.pending_transfer.as_ref() else {
774            return Err(OwnershipError::TransferNotPending {
775                session_id: receipt.session_id,
776            });
777        };
778        if pending.receipt != *receipt {
779            return Err(OwnershipError::ReceiptMismatch {
780                session_id: receipt.session_id,
781                claim_id: receipt.claim_id,
782            });
783        }
784        let reason = OwnershipTerminalReason::TransferAbandoned {
785            owner_id: receipt.from_owner_id.clone(),
786            claim_id: receipt.claim_id,
787        };
788        session.status = SessionStatus::Cancelled;
789        let witness = CancellationWitness {
790            witness_id: Self::next_witness_id(session),
791            session_id: receipt.session_id,
792            owner_id: receipt.from_owner_id.clone(),
793            generation: receipt.from_generation,
794            reason: reason.clone(),
795        };
796        if let Some(current) = session.ownership.current.clone() {
797            Self::push_authority_audit(
798                session,
799                AuthorityArtifact::OwnershipCapability(current),
800                AuthorityAuditEvent::Invalidated,
801                Some("transfer abandoned".to_string()),
802            );
803        }
804        session.ownership.current = None;
805        session.ownership.pending_transfer = None;
806        session.ownership.terminal_reason = Some(reason);
807        Self::push_authority_audit(
808            session,
809            AuthorityArtifact::OwnershipReceipt(receipt.clone()),
810            AuthorityAuditEvent::RolledBack,
811            Some("transfer abandoned".to_string()),
812        );
813        Self::push_authority_audit(
814            session,
815            AuthorityArtifact::Cancellation(witness.clone()),
816            AuthorityAuditEvent::Issued,
817            None,
818        );
819        Ok(witness)
820    }
821
822    /// Fault a session because a staged transfer could not commit.
823    ///
824    /// # Errors
825    ///
826    /// Returns an `OwnershipError` if the receipt does not match the staged transfer.
827    pub fn fault_failed_transfer_commit(
828        &mut self,
829        receipt: &OwnershipReceipt,
830        reason: impl Into<String>,
831    ) -> Result<(), OwnershipError> {
832        let session = self.session_mut_or_error(receipt.session_id)?;
833        Self::ensure_mutable_ownership(session)?;
834        let Some(pending) = session.ownership.pending_transfer.as_ref() else {
835            return Err(OwnershipError::TransferNotPending {
836                session_id: receipt.session_id,
837            });
838        };
839        if pending.receipt != *receipt {
840            return Err(OwnershipError::ReceiptMismatch {
841                session_id: receipt.session_id,
842                claim_id: receipt.claim_id,
843            });
844        }
845        let reason = reason.into();
846        let terminal = OwnershipTerminalReason::TransferCommitFailed {
847            owner_id: receipt.from_owner_id.clone(),
848            claim_id: receipt.claim_id,
849            reason: reason.clone(),
850        };
851        session.status = SessionStatus::Faulted {
852            reason: format!("ownership transfer commit failed: {reason}"),
853        };
854        if let Some(current) = session.ownership.current.clone() {
855            Self::push_authority_audit(
856                session,
857                AuthorityArtifact::OwnershipCapability(current),
858                AuthorityAuditEvent::Invalidated,
859                Some("transfer commit failed".to_string()),
860            );
861        }
862        session.ownership.current = None;
863        session.ownership.pending_transfer = None;
864        session.ownership.terminal_reason = Some(terminal);
865        Self::push_authority_audit(
866            session,
867            AuthorityArtifact::OwnershipReceipt(receipt.clone()),
868            AuthorityAuditEvent::RolledBack,
869            Some(reason),
870        );
871        Ok(())
872    }
873
874    /// Iterate over all sessions.
875    pub fn iter(&self) -> impl Iterator<Item = &SessionState> {
876        self.sessions.values()
877    }
878
879    /// Close a session.
880    ///
881    /// # Errors
882    ///
883    /// Returns an error if the session is not found.
884    pub fn close(&mut self, sid: SessionId) -> Result<(), String> {
885        let session = self
886            .sessions
887            .get_mut(&sid)
888            .ok_or_else(|| format!("session {sid} not found"))?;
889
890        session.status = SessionStatus::Closed;
891        session.buffers.clear();
892        session.edge_traces.clear();
893        session.epoch = session.epoch.saturating_add(1);
894        Ok(())
895    }
896
897    /// Closed/cancelled/faulted session identifiers still resident in the store.
898    #[must_use]
899    pub fn closed_session_ids(&self) -> Vec<SessionId> {
900        self.sessions
901            .iter()
902            .filter_map(|(sid, session)| {
903                matches!(
904                    session.status,
905                    SessionStatus::Closed
906                        | SessionStatus::Cancelled
907                        | SessionStatus::Faulted { .. }
908                )
909                .then_some(*sid)
910            })
911            .collect()
912    }
913
914    /// Reap specific session ids from live storage and archive compact summaries.
915    ///
916    /// # Panics
917    ///
918    /// Panics if a session disappears between the initial residency/status check
919    /// and the subsequent removal from the store.
920    pub fn reap_sessions(&mut self, session_ids: &[SessionId]) -> Vec<ClosedSessionSummary> {
921        let mut reaped = Vec::new();
922        for sid in session_ids {
923            let Some(session) = self.sessions.get(sid) else {
924                continue;
925            };
926            if !matches!(
927                session.status,
928                SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
929            ) {
930                continue;
931            }
932
933            let session = self
934                .sessions
935                .remove(sid)
936                .expect("session existence checked before removal");
937            let summary = ClosedSessionSummary::from_session(&session);
938            self.archived_closed.push(summary.clone());
939            reaped.push(summary);
940        }
941        reaped
942    }
943
944    /// Reap all closed/cancelled/faulted sessions from live storage.
945    pub fn reap_closed(&mut self) -> Vec<ClosedSessionSummary> {
946        let sids = self.closed_session_ids();
947        self.reap_sessions(&sids)
948    }
949
950    /// Number of active sessions.
951    #[must_use]
952    pub fn active_count(&self) -> usize {
953        self.sessions
954            .values()
955            .filter(|s| s.status == SessionStatus::Active)
956            .count()
957    }
958
959    /// Number of sessions still resident in the store.
960    #[must_use]
961    pub fn live_count(&self) -> usize {
962        self.sessions.len()
963    }
964
965    /// All session IDs.
966    #[must_use]
967    pub fn session_ids(&self) -> Vec<SessionId> {
968        self.sessions.keys().copied().collect()
969    }
970
971    /// Archived closed-session summaries retained after reaping.
972    #[must_use]
973    pub fn archived_closed(&self) -> &[ClosedSessionSummary] {
974        &self.archived_closed
975    }
976
977    /// Approximate retained state for the session store.
978    #[must_use]
979    pub fn memory_usage(&self) -> SessionStoreMemoryUsage {
980        let mut usage = SessionStoreMemoryUsage {
981            live_sessions: self.sessions.len(),
982            archived_closed_sessions: self.archived_closed.len(),
983            ..SessionStoreMemoryUsage::default()
984        };
985        usage.retained_bytes.archived_closed = self
986            .archived_closed
987            .iter()
988            .map(ClosedSessionSummary::retained_bytes_estimate)
989            .sum();
990
991        for session in self.sessions.values() {
992            if matches!(
993                session.status,
994                SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
995            ) {
996                usage.live_closed_sessions += 1;
997            }
998            usage.live_local_type_entries += session.local_types.len();
999            usage.live_buffer_count += session.buffers.len();
1000            usage.live_buffered_messages += session
1001                .buffers
1002                .values()
1003                .map(BoundedBuffer::len)
1004                .sum::<usize>();
1005            usage.live_edge_handler_count += session.edge_handlers.len();
1006            usage.live_auth_leaf_count += session.auth_leaves.values().map(Vec::len).sum::<usize>();
1007            usage.live_auth_tree_count += session.auth_trees.len();
1008            usage.live_auth_root_count += session.auth_roots.len();
1009            usage.retained_bytes.live_sessions += session.retained_session_core_bytes();
1010            usage.retained_bytes.local_types += session.retained_local_type_bytes();
1011            usage.retained_bytes.buffers += session.retained_buffer_bytes();
1012            usage.retained_bytes.traces += session.retained_trace_bytes();
1013            usage.retained_bytes.auth += session.retained_auth_bytes();
1014            usage.retained_bytes.handlers += session.retained_handler_bytes();
1015        }
1016        usage.retained_bytes.total = usage
1017            .retained_bytes
1018            .live_sessions
1019            .saturating_add(usage.retained_bytes.archived_closed)
1020            .saturating_add(usage.retained_bytes.local_types)
1021            .saturating_add(usage.retained_bytes.buffers)
1022            .saturating_add(usage.retained_bytes.traces)
1023            .saturating_add(usage.retained_bytes.auth)
1024            .saturating_add(usage.retained_bytes.handlers);
1025
1026        usage
1027    }
1028
1029    /// Lookup edge-bound handler id.
1030    #[must_use]
1031    pub fn lookup_handler(&self, edge: &Edge) -> Option<&HandlerId> {
1032        self.sessions
1033            .get(&edge.sid)?
1034            .lookup_handler_for_roles(&edge.sender, &edge.receiver)
1035    }
1036
1037    /// Lookup a default handler id for a session.
1038    #[must_use]
1039    pub fn default_handler_for_session(&self, sid: SessionId) -> Option<&HandlerId> {
1040        self.sessions.get(&sid)?.default_handler_binding()
1041    }
1042
1043    /// Set the default handler id for a session.
1044    pub(crate) fn set_default_handler_for_session(&mut self, sid: SessionId, handler: HandlerId) {
1045        if let Some(session) = self.sessions.get_mut(&sid) {
1046            let handler_id = session.intern_handler_binding(&handler);
1047            session.default_handler = handler;
1048            session.default_handler_id = Some(handler_id);
1049        }
1050    }
1051
1052    /// Update edge-bound handler id.
1053    pub(crate) fn update_handler(&mut self, edge: &Edge, handler: HandlerId) {
1054        if let Some(session) = self.sessions.get_mut(&edge.sid) {
1055            let handler_id = session.intern_handler_binding(&handler);
1056            if let Some(edge_key) = session.edge_key_for_roles(&edge.sender, &edge.receiver) {
1057                session.edge_handler_lookup.insert(edge_key, handler_id);
1058            }
1059            session.edge_handlers.insert(edge.clone(), handler);
1060        }
1061    }
1062
1063    /// Lookup coherence trace for an edge.
1064    #[must_use]
1065    pub fn lookup_trace(&self, edge: &Edge) -> Option<&[ValType]> {
1066        self.sessions
1067            .get(&edge.sid)?
1068            .edge_traces
1069            .get(edge)
1070            .map(Vec::as_slice)
1071    }
1072
1073    /// Update coherence trace for an edge.
1074    #[cfg_attr(not(test), allow(dead_code))]
1075    pub(crate) fn update_trace(&mut self, edge: &Edge, trace: Vec<ValType>) {
1076        if let Some(session) = self.sessions.get_mut(&edge.sid) {
1077            session.edge_traces.insert(edge.clone(), trace);
1078        }
1079    }
1080}
1081
1082// ---- Type unfolding utilities ----
1083
1084/// Unfold top-level `Mu` to its body.
1085///
1086/// Recursively strips `Mu` constructors to reach the first action.
1087#[must_use]
1088// RECURSION_SAFE: each step unwraps one Mu node from a finite local type tree.
1089pub fn unfold_mu(lt: &LocalTypeR) -> LocalTypeR {
1090    match lt {
1091        LocalTypeR::Mu { body, .. } => unfold_mu(body),
1092        other => other.clone(),
1093    }
1094}
1095
1096/// Resolve a continuation that may be a `Var` (recursive reference).
1097///
1098/// If `cont` is `Var`, unfolds back to the original type's mu body.
1099/// If `cont` is `Mu`, unfolds it. Otherwise returns as-is.
1100#[must_use]
1101pub fn unfold_if_var(cont: &LocalTypeR, original: &LocalTypeR) -> LocalTypeR {
1102    match cont {
1103        LocalTypeR::Var(_) => unfold_mu(original),
1104        LocalTypeR::Mu { .. } => unfold_mu(cont),
1105        other => other.clone(),
1106    }
1107}
1108
1109/// Like `unfold_if_var`, but also returns the new Mu scope (original) if one was entered.
1110///
1111/// When the continuation is a `Mu`, the Mu itself becomes the new original
1112/// for subsequent `Var` resolution. Returns `(resolved_type, Some(mu))` when
1113/// entering a new Mu scope, `(resolved_type, None)` otherwise.
1114#[must_use]
1115pub(crate) fn unfold_if_var_with_scope(
1116    cont: &LocalTypeR,
1117    original: &LocalTypeR,
1118) -> (LocalTypeR, Option<LocalTypeR>) {
1119    match cont {
1120        LocalTypeR::Var(_) => (unfold_mu(original), None),
1121        LocalTypeR::Mu { .. } => (unfold_mu(cont), Some(cont.clone())),
1122        other => (other.clone(), None),
1123    }
1124}