1impl<'de> Deserialize<'de> for SessionState {
3 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
4 where
5 D: serde::Deserializer<'de>,
6 {
7 let raw = SessionStateSerde::deserialize(deserializer)?;
8 let mut session = Self {
9 sid: raw.sid,
10 roles: raw.roles,
11 role_ids: BTreeMap::new(),
12 local_types: raw.local_types,
13 buffers: raw.buffers,
14 edge_lookup: BTreeMap::new(),
15 handler_ids: BTreeMap::new(),
16 handlers_by_id: Vec::new(),
17 edge_handler_lookup: BTreeMap::new(),
18 default_handler_id: None,
19 label_ids: BTreeMap::new(),
20 labels_by_id: Vec::new(),
21 branch_lookup: BTreeMap::new(),
22 auth_leaves: raw.auth_leaves,
23 auth_trees: raw.auth_trees,
24 auth_roots: raw.auth_roots,
25 edge_handlers: raw.edge_handlers,
26 default_handler: raw.default_handler,
27 edge_traces: raw.edge_traces,
28 status: raw.status,
29 epoch: raw.epoch,
30 ownership: raw.ownership,
31 };
32 session.rebuild_derived_indexes();
33 Ok(session)
34 }
35}
36
37#[derive(Debug, Default, Serialize, Deserialize)]
42pub struct SessionStore {
43 sessions: BTreeMap<SessionId, SessionState>,
44 #[serde(default)]
45 archived_closed: Vec<ClosedSessionSummary>,
46 next_id: SessionId,
47}
48
49impl SessionStore {
50 fn session_mut_or_error(
51 &mut self,
52 sid: SessionId,
53 ) -> Result<&mut SessionState, OwnershipError> {
54 self.sessions
55 .get_mut(&sid)
56 .ok_or(OwnershipError::SessionNotFound { session_id: sid })
57 }
58
59 fn terminal_error(session: &SessionState) -> Option<OwnershipError> {
60 session
61 .ownership
62 .terminal_reason
63 .clone()
64 .map(|reason| OwnershipError::Terminal {
65 session_id: session.sid,
66 reason,
67 })
68 }
69
70 fn ensure_mutable_ownership(session: &SessionState) -> Result<(), OwnershipError> {
71 if let Some(err) = Self::terminal_error(session) {
72 return Err(err);
73 }
74 Ok(())
75 }
76
77 fn validate_current_owner(
78 session: &SessionState,
79 capability: &OwnershipCapability,
80 ) -> Result<(), OwnershipError> {
81 Self::ensure_mutable_ownership(session)?;
82 let Some(current) = session.ownership.current.as_ref() else {
83 return Err(OwnershipError::Unclaimed {
84 session_id: session.sid,
85 });
86 };
87 if current.owner_id != capability.owner_id || current.generation != capability.generation {
88 return Err(OwnershipError::StaleCapability {
89 session_id: session.sid,
90 owner_id: capability.owner_id.clone(),
91 expected_generation: capability.generation,
92 actual_generation: current.generation,
93 });
94 }
95 Ok(())
96 }
97
98 fn require_session_scope(
99 session: &SessionState,
100 capability: &OwnershipCapability,
101 ) -> Result<(), OwnershipError> {
102 Self::validate_current_owner(session, capability)?;
103 let Some(current) = session.ownership.current.as_ref() else {
104 return Err(OwnershipError::Unclaimed {
105 session_id: session.sid,
106 });
107 };
108 if !current.scope.allows_session_mutation() {
109 return Err(OwnershipError::ScopeViolation {
110 session_id: session.sid,
111 owner_id: current.owner_id.clone(),
112 required: OwnershipScope::Session,
113 actual: current.scope.clone(),
114 });
115 }
116 Ok(())
117 }
118
119 fn next_witness_id(session: &mut SessionState) -> AuthorityWitnessId {
120 let witness_id = session.ownership.next_witness_id;
121 session.ownership.next_witness_id = session.ownership.next_witness_id.saturating_add(1);
122 witness_id
123 }
124
125 fn push_authority_audit(
126 session: &mut SessionState,
127 artifact: AuthorityArtifact,
128 event: AuthorityAuditEvent,
129 reason: Option<String>,
130 ) {
131 session.ownership.audit_log.push(AuthorityAuditRecord {
132 tick: None,
133 artifact,
134 event,
135 reason,
136 });
137 }
138
139 #[must_use]
141 pub fn new() -> Self {
142 Self::default()
143 }
144
145 #[allow(clippy::needless_pass_by_value)]
149 pub fn open_with_sid(
150 &mut self,
151 sid: SessionId,
152 roles: Vec<String>,
153 buffer_config: &BufferConfig,
154 initial_types: &BTreeMap<String, LocalTypeR>,
155 ) -> SessionId {
156 let plan = SessionOpenPlan::new(&roles, initial_types);
157 self.open_with_sid_from_plan(sid, &plan, buffer_config)
158 }
159
160 pub fn open_with_sid_from_plan(
162 &mut self,
163 sid: SessionId,
164 plan: &SessionOpenPlan,
165 buffer_config: &BufferConfig,
166 ) -> SessionId {
167 let state = SessionState::from_open_plan(sid, plan, buffer_config);
168 self.sessions.insert(sid, state);
169 self.next_id = self.next_id.max(sid.saturating_add(1));
170 sid
171 }
172
173 #[allow(clippy::needless_pass_by_value)]
177 pub fn open(
178 &mut self,
179 roles: Vec<String>,
180 buffer_config: &BufferConfig,
181 initial_types: &BTreeMap<String, LocalTypeR>,
182 ) -> SessionId {
183 let sid = self.next_id;
184 self.open_with_sid(sid, roles, buffer_config, initial_types)
185 }
186
187 #[must_use]
189 pub fn next_session_id(&self) -> SessionId {
190 self.next_id
191 }
192
193 #[must_use]
199 pub fn lookup_type(&self, ep: &Endpoint) -> Option<&LocalTypeR> {
200 self.sessions
201 .get(&ep.sid)?
202 .local_types
203 .get(ep)
204 .map(|e| &e.current)
205 }
206
207 pub fn update_type(&mut self, ep: &Endpoint, new_type: LocalTypeR) {
211 if let Some(session) = self.sessions.get_mut(&ep.sid) {
212 if let Some(entry) = session.local_types.get_mut(ep) {
213 entry.current = new_type;
214 }
215 session.refresh_endpoint_branch_lookup(ep);
216 }
217 }
218
219 pub fn update_original(&mut self, ep: &Endpoint, new_original: LocalTypeR) {
221 if let Some(session) = self.sessions.get_mut(&ep.sid) {
222 if let Some(entry) = session.local_types.get_mut(ep) {
223 entry.original = new_original;
224 }
225 }
226 }
227
228 #[must_use]
230 pub fn original_type(&self, ep: &Endpoint) -> Option<&LocalTypeR> {
231 self.sessions
232 .get(&ep.sid)?
233 .local_types
234 .get(ep)
235 .map(|e| &e.original)
236 }
237
238 pub fn remove_type(&mut self, ep: &Endpoint) {
240 if let Some(session) = self.sessions.get_mut(&ep.sid) {
241 session.local_types.remove(ep);
242 session.branch_lookup.remove(ep);
243 }
244 }
245
246 #[must_use]
250 pub fn get(&self, sid: SessionId) -> Option<&SessionState> {
251 self.sessions.get(&sid)
252 }
253
254 pub fn get_mut(&mut self, sid: SessionId) -> Option<&mut SessionState> {
256 self.sessions.get_mut(&sid)
257 }
258
259 #[must_use]
261 pub fn current_ownership(&self, sid: SessionId) -> Option<&OwnershipCapability> {
262 self.sessions.get(&sid)?.ownership.current.as_ref()
263 }
264
265 pub fn validate_ownership_capability(
271 &self,
272 capability: &OwnershipCapability,
273 ) -> Result<(), OwnershipError> {
274 let session = self
275 .sessions
276 .get(&capability.session_id)
277 .ok_or(OwnershipError::SessionNotFound {
278 session_id: capability.session_id,
279 })?;
280 Self::validate_current_owner(session, capability)
281 }
282
283 #[must_use]
285 pub fn authority_audit_log(&self, sid: SessionId) -> Option<&[AuthorityAuditRecord]> {
286 Some(self.sessions.get(&sid)?.ownership.audit_log.as_slice())
287 }
288
289 pub fn claim_ownership(
295 &mut self,
296 sid: SessionId,
297 owner_id: impl Into<FragmentOwnerId>,
298 scope: OwnershipScope,
299 ) -> Result<OwnershipCapability, OwnershipError> {
300 let session = self.session_mut_or_error(sid)?;
301 Self::ensure_mutable_ownership(session)?;
302 if let Some(current) = session.ownership.current.as_ref() {
303 return Err(OwnershipError::AlreadyClaimed {
304 session_id: sid,
305 current_owner_id: current.owner_id.clone(),
306 });
307 }
308 if let Some(pending) = session.ownership.pending_transfer.as_ref() {
309 return Err(OwnershipError::TransferPending {
310 session_id: sid,
311 claim_id: pending.receipt.claim_id,
312 });
313 }
314 let capability = OwnershipCapability {
315 session_id: sid,
316 owner_id: owner_id.into(),
317 generation: 0,
318 scope,
319 };
320 session.ownership.current = Some(capability.clone());
321 Self::push_authority_audit(
322 session,
323 AuthorityArtifact::OwnershipCapability(capability.clone()),
324 AuthorityAuditEvent::Issued,
325 None,
326 );
327 Ok(capability)
328 }
329
330 pub fn release_ownership(
336 &mut self,
337 capability: &OwnershipCapability,
338 ) -> Result<(), OwnershipError> {
339 let session = self.session_mut_or_error(capability.session_id)?;
340 Self::validate_current_owner(session, capability)?;
341 if let Some(pending) = session.ownership.pending_transfer.as_ref() {
342 return Err(OwnershipError::TransferPending {
343 session_id: capability.session_id,
344 claim_id: pending.receipt.claim_id,
345 });
346 }
347 Self::push_authority_audit(
348 session,
349 AuthorityArtifact::OwnershipCapability(capability.clone()),
350 AuthorityAuditEvent::Invalidated,
351 Some("ownership released".to_string()),
352 );
353 session.ownership.current = None;
354 Ok(())
355 }
356
357 pub fn begin_ownership_transfer(
363 &mut self,
364 capability: &OwnershipCapability,
365 new_owner_id: impl Into<FragmentOwnerId>,
366 new_scope: OwnershipScope,
367 ) -> Result<OwnershipReceipt, OwnershipError> {
368 let session = self.session_mut_or_error(capability.session_id)?;
369 Self::validate_current_owner(session, capability)?;
370 if let Some(pending) = session.ownership.pending_transfer.as_ref() {
371 return Err(OwnershipError::TransferPending {
372 session_id: capability.session_id,
373 claim_id: pending.receipt.claim_id,
374 });
375 }
376 let claim_id = session.ownership.next_claim_id;
377 session.ownership.next_claim_id = session.ownership.next_claim_id.saturating_add(1);
378 let receipt = OwnershipReceipt {
379 session_id: capability.session_id,
380 claim_id,
381 from_owner_id: capability.owner_id.clone(),
382 from_generation: capability.generation,
383 to_owner_id: new_owner_id.into(),
384 to_generation: capability.generation.saturating_add(1),
385 scope: new_scope,
386 };
387 session.ownership.pending_transfer = Some(PendingOwnershipTransfer {
388 receipt: receipt.clone(),
389 });
390 Self::push_authority_audit(
391 session,
392 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
393 AuthorityAuditEvent::Issued,
394 None,
395 );
396 Ok(receipt)
397 }
398
399 pub fn commit_ownership_transfer(
405 &mut self,
406 receipt: &OwnershipReceipt,
407 ) -> Result<OwnershipCapability, OwnershipError> {
408 let session = self.session_mut_or_error(receipt.session_id)?;
409 Self::ensure_mutable_ownership(session)?;
410 let Some(current) = session.ownership.current.as_ref() else {
411 return Err(OwnershipError::Unclaimed {
412 session_id: receipt.session_id,
413 });
414 };
415 let Some(pending) = session.ownership.pending_transfer.as_ref() else {
416 Self::push_authority_audit(
417 session,
418 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
419 AuthorityAuditEvent::Rejected,
420 Some("receipt is no longer pending".to_string()),
421 );
422 return Err(OwnershipError::TransferNotPending {
423 session_id: receipt.session_id,
424 });
425 };
426 if pending.receipt != *receipt {
427 Self::push_authority_audit(
428 session,
429 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
430 AuthorityAuditEvent::Rejected,
431 Some("receipt payload mismatch".to_string()),
432 );
433 return Err(OwnershipError::ReceiptMismatch {
434 session_id: receipt.session_id,
435 claim_id: receipt.claim_id,
436 });
437 }
438 if current.owner_id != receipt.from_owner_id || current.generation != receipt.from_generation
439 {
440 return Err(OwnershipError::StaleCapability {
441 session_id: receipt.session_id,
442 owner_id: receipt.from_owner_id.clone(),
443 expected_generation: receipt.from_generation,
444 actual_generation: current.generation,
445 });
446 }
447 let capability = OwnershipCapability {
448 session_id: receipt.session_id,
449 owner_id: receipt.to_owner_id.clone(),
450 generation: receipt.to_generation,
451 scope: receipt.scope.clone(),
452 };
453 let old_capability = current.clone();
454 session.ownership.current = Some(capability.clone());
455 session.ownership.pending_transfer = None;
456 Self::push_authority_audit(
457 session,
458 AuthorityArtifact::OwnershipCapability(old_capability),
459 AuthorityAuditEvent::Invalidated,
460 Some("ownership transferred".to_string()),
461 );
462 Self::push_authority_audit(
463 session,
464 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
465 AuthorityAuditEvent::Committed,
466 None,
467 );
468 Self::push_authority_audit(
469 session,
470 AuthorityArtifact::OwnershipCapability(capability.clone()),
471 AuthorityAuditEvent::Issued,
472 None,
473 );
474 Ok(capability)
475 }
476
477 pub fn rollback_ownership_transfer(
483 &mut self,
484 receipt: &OwnershipReceipt,
485 ) -> Result<(), OwnershipError> {
486 let session = self.session_mut_or_error(receipt.session_id)?;
487 Self::ensure_mutable_ownership(session)?;
488 let Some(pending) = session.ownership.pending_transfer.as_ref() else {
489 Self::push_authority_audit(
490 session,
491 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
492 AuthorityAuditEvent::Rejected,
493 Some("receipt is no longer pending".to_string()),
494 );
495 return Err(OwnershipError::TransferNotPending {
496 session_id: receipt.session_id,
497 });
498 };
499 if pending.receipt != *receipt {
500 Self::push_authority_audit(
501 session,
502 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
503 AuthorityAuditEvent::Rejected,
504 Some("receipt payload mismatch".to_string()),
505 );
506 return Err(OwnershipError::ReceiptMismatch {
507 session_id: receipt.session_id,
508 claim_id: receipt.claim_id,
509 });
510 }
511 session.ownership.pending_transfer = None;
512 Self::push_authority_audit(
513 session,
514 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
515 AuthorityAuditEvent::RolledBack,
516 None,
517 );
518 Ok(())
519 }
520
521 pub fn attenuate_ownership_scope(
527 &mut self,
528 capability: &OwnershipCapability,
529 new_scope: OwnershipScope,
530 ) -> Result<OwnershipCapability, OwnershipError> {
531 let session = self.session_mut_or_error(capability.session_id)?;
532 Self::validate_current_owner(session, capability)?;
533 if let Some(pending) = session.ownership.pending_transfer.as_ref() {
534 return Err(OwnershipError::TransferPending {
535 session_id: capability.session_id,
536 claim_id: pending.receipt.claim_id,
537 });
538 }
539 let next = OwnershipCapability {
540 session_id: capability.session_id,
541 owner_id: capability.owner_id.clone(),
542 generation: capability.generation.saturating_add(1),
543 scope: new_scope,
544 };
545 Self::push_authority_audit(
546 session,
547 AuthorityArtifact::OwnershipCapability(capability.clone()),
548 AuthorityAuditEvent::Invalidated,
549 Some("ownership scope attenuated".to_string()),
550 );
551 session.ownership.current = Some(next.clone());
552 Self::push_authority_audit(
553 session,
554 AuthorityArtifact::OwnershipCapability(next.clone()),
555 AuthorityAuditEvent::Issued,
556 None,
557 );
558 Ok(next)
559 }
560
561 pub fn apply_owned_session_mutation(
567 &mut self,
568 capability: &OwnershipCapability,
569 mutation: SessionHostMutation,
570 ) -> Result<(), OwnershipError> {
571 let session = self.session_mut_or_error(capability.session_id)?;
572 Self::require_session_scope(session, capability)?;
573 match mutation {
574 SessionHostMutation::SetDefaultHandler { handler } => {
575 let handler_id = session.intern_handler_binding(&handler);
576 session.default_handler = handler;
577 session.default_handler_id = Some(handler_id);
578 }
579 SessionHostMutation::UpdateEdgeHandler { edge, handler } => {
580 let handler_id = session.intern_handler_binding(&handler);
581 if let Some(edge_key) = session.edge_key_for_roles(&edge.sender, &edge.receiver) {
582 session.edge_handler_lookup.insert(edge_key, handler_id);
583 }
584 session.edge_handlers.insert(edge, handler);
585 }
586 SessionHostMutation::UpdateTrace { edge, trace } => {
587 session.edge_traces.insert(edge, trace);
588 }
589 }
590 Ok(())
591 }
592
593 pub fn issue_readiness_witness(
599 &mut self,
600 capability: &OwnershipCapability,
601 predicate_ref: impl Into<String>,
602 ) -> Result<ReadinessWitness, OwnershipError> {
603 let session = self.session_mut_or_error(capability.session_id)?;
604 Self::require_session_scope(session, capability)?;
605 let witness = ReadinessWitness {
606 witness_id: Self::next_witness_id(session),
607 session_id: capability.session_id,
608 owner_id: capability.owner_id.clone(),
609 generation: capability.generation,
610 scope: capability.scope.clone(),
611 predicate_ref: predicate_ref.into(),
612 };
613 session
614 .ownership
615 .issued_readiness
616 .insert(witness.witness_id, witness.clone());
617 Self::push_authority_audit(
618 session,
619 AuthorityArtifact::Readiness(witness.clone()),
620 AuthorityAuditEvent::Issued,
621 None,
622 );
623 Ok(witness)
624 }
625
626 pub fn consume_readiness_witness(
632 &mut self,
633 capability: &OwnershipCapability,
634 witness: &ReadinessWitness,
635 ) -> Result<(), OwnershipError> {
636 let session = self.session_mut_or_error(capability.session_id)?;
637 Self::require_session_scope(session, capability)?;
638 if session.ownership.consumed_witnesses.contains(&witness.witness_id) {
639 Self::push_authority_audit(
640 session,
641 AuthorityArtifact::Readiness(witness.clone()),
642 AuthorityAuditEvent::Rejected,
643 Some("witness already consumed".to_string()),
644 );
645 return Err(OwnershipError::WitnessConsumed {
646 session_id: witness.session_id,
647 witness_id: witness.witness_id,
648 });
649 }
650 let Some(issued) = session.ownership.issued_readiness.get(&witness.witness_id) else {
651 Self::push_authority_audit(
652 session,
653 AuthorityArtifact::Readiness(witness.clone()),
654 AuthorityAuditEvent::Rejected,
655 Some("witness was never issued".to_string()),
656 );
657 return Err(OwnershipError::InvalidWitness {
658 session_id: capability.session_id,
659 witness_id: witness.witness_id,
660 reason: "witness was never issued".to_string(),
661 });
662 };
663 if issued != witness {
664 Self::push_authority_audit(
665 session,
666 AuthorityArtifact::Readiness(witness.clone()),
667 AuthorityAuditEvent::Rejected,
668 Some("witness payload mismatch".to_string()),
669 );
670 return Err(OwnershipError::InvalidWitness {
671 session_id: capability.session_id,
672 witness_id: witness.witness_id,
673 reason: "witness payload mismatch".to_string(),
674 });
675 }
676 if witness.session_id != capability.session_id
677 || witness.owner_id != capability.owner_id
678 || witness.generation != capability.generation
679 || witness.scope != capability.scope
680 {
681 Self::push_authority_audit(
682 session,
683 AuthorityArtifact::Readiness(witness.clone()),
684 AuthorityAuditEvent::Rejected,
685 Some("live ownership no longer matches witness".to_string()),
686 );
687 return Err(OwnershipError::InvalidWitness {
688 session_id: capability.session_id,
689 witness_id: witness.witness_id,
690 reason: "live ownership no longer matches witness".to_string(),
691 });
692 }
693 session.ownership.issued_readiness.remove(&witness.witness_id);
694 session
695 .ownership
696 .consumed_witnesses
697 .insert(witness.witness_id);
698 Self::push_authority_audit(
699 session,
700 AuthorityArtifact::Readiness(witness.clone()),
701 AuthorityAuditEvent::Consumed,
702 None,
703 );
704 Ok(())
705 }
706
707 pub fn mark_owner_died(
713 &mut self,
714 sid: SessionId,
715 owner_id: &str,
716 ) -> Result<CancellationWitness, OwnershipError> {
717 let session = self.session_mut_or_error(sid)?;
718 Self::ensure_mutable_ownership(session)?;
719 let Some(current) = session.ownership.current.clone() else {
720 return Err(OwnershipError::Unclaimed { session_id: sid });
721 };
722 if current.owner_id != owner_id {
723 return Err(OwnershipError::StaleCapability {
724 session_id: sid,
725 owner_id: owner_id.to_string(),
726 expected_generation: current.generation,
727 actual_generation: current.generation,
728 });
729 }
730 let generation = current.generation;
731 let reason = OwnershipTerminalReason::OwnerDied {
732 owner_id: owner_id.to_string(),
733 };
734 session.status = SessionStatus::Faulted {
735 reason: format!("ownership owner `{owner_id}` died"),
736 };
737 let witness = CancellationWitness {
738 witness_id: Self::next_witness_id(session),
739 session_id: sid,
740 owner_id: owner_id.to_string(),
741 generation,
742 reason: reason.clone(),
743 };
744 session.ownership.current = None;
745 session.ownership.pending_transfer = None;
746 session.ownership.terminal_reason = Some(reason);
747 Self::push_authority_audit(
748 session,
749 AuthorityArtifact::OwnershipCapability(current),
750 AuthorityAuditEvent::Invalidated,
751 Some("owner died".to_string()),
752 );
753 Self::push_authority_audit(
754 session,
755 AuthorityArtifact::Cancellation(witness.clone()),
756 AuthorityAuditEvent::Issued,
757 None,
758 );
759 Ok(witness)
760 }
761
762 pub fn cancel_abandoned_transfer(
768 &mut self,
769 receipt: &OwnershipReceipt,
770 ) -> Result<CancellationWitness, OwnershipError> {
771 let session = self.session_mut_or_error(receipt.session_id)?;
772 Self::ensure_mutable_ownership(session)?;
773 let Some(pending) = session.ownership.pending_transfer.as_ref() else {
774 return Err(OwnershipError::TransferNotPending {
775 session_id: receipt.session_id,
776 });
777 };
778 if pending.receipt != *receipt {
779 return Err(OwnershipError::ReceiptMismatch {
780 session_id: receipt.session_id,
781 claim_id: receipt.claim_id,
782 });
783 }
784 let reason = OwnershipTerminalReason::TransferAbandoned {
785 owner_id: receipt.from_owner_id.clone(),
786 claim_id: receipt.claim_id,
787 };
788 session.status = SessionStatus::Cancelled;
789 let witness = CancellationWitness {
790 witness_id: Self::next_witness_id(session),
791 session_id: receipt.session_id,
792 owner_id: receipt.from_owner_id.clone(),
793 generation: receipt.from_generation,
794 reason: reason.clone(),
795 };
796 if let Some(current) = session.ownership.current.clone() {
797 Self::push_authority_audit(
798 session,
799 AuthorityArtifact::OwnershipCapability(current),
800 AuthorityAuditEvent::Invalidated,
801 Some("transfer abandoned".to_string()),
802 );
803 }
804 session.ownership.current = None;
805 session.ownership.pending_transfer = None;
806 session.ownership.terminal_reason = Some(reason);
807 Self::push_authority_audit(
808 session,
809 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
810 AuthorityAuditEvent::RolledBack,
811 Some("transfer abandoned".to_string()),
812 );
813 Self::push_authority_audit(
814 session,
815 AuthorityArtifact::Cancellation(witness.clone()),
816 AuthorityAuditEvent::Issued,
817 None,
818 );
819 Ok(witness)
820 }
821
822 pub fn fault_failed_transfer_commit(
828 &mut self,
829 receipt: &OwnershipReceipt,
830 reason: impl Into<String>,
831 ) -> Result<(), OwnershipError> {
832 let session = self.session_mut_or_error(receipt.session_id)?;
833 Self::ensure_mutable_ownership(session)?;
834 let Some(pending) = session.ownership.pending_transfer.as_ref() else {
835 return Err(OwnershipError::TransferNotPending {
836 session_id: receipt.session_id,
837 });
838 };
839 if pending.receipt != *receipt {
840 return Err(OwnershipError::ReceiptMismatch {
841 session_id: receipt.session_id,
842 claim_id: receipt.claim_id,
843 });
844 }
845 let reason = reason.into();
846 let terminal = OwnershipTerminalReason::TransferCommitFailed {
847 owner_id: receipt.from_owner_id.clone(),
848 claim_id: receipt.claim_id,
849 reason: reason.clone(),
850 };
851 session.status = SessionStatus::Faulted {
852 reason: format!("ownership transfer commit failed: {reason}"),
853 };
854 if let Some(current) = session.ownership.current.clone() {
855 Self::push_authority_audit(
856 session,
857 AuthorityArtifact::OwnershipCapability(current),
858 AuthorityAuditEvent::Invalidated,
859 Some("transfer commit failed".to_string()),
860 );
861 }
862 session.ownership.current = None;
863 session.ownership.pending_transfer = None;
864 session.ownership.terminal_reason = Some(terminal);
865 Self::push_authority_audit(
866 session,
867 AuthorityArtifact::OwnershipReceipt(receipt.clone()),
868 AuthorityAuditEvent::RolledBack,
869 Some(reason),
870 );
871 Ok(())
872 }
873
874 pub fn iter(&self) -> impl Iterator<Item = &SessionState> {
876 self.sessions.values()
877 }
878
879 pub fn close(&mut self, sid: SessionId) -> Result<(), String> {
885 let session = self
886 .sessions
887 .get_mut(&sid)
888 .ok_or_else(|| format!("session {sid} not found"))?;
889
890 session.status = SessionStatus::Closed;
891 session.buffers.clear();
892 session.edge_traces.clear();
893 session.epoch = session.epoch.saturating_add(1);
894 Ok(())
895 }
896
897 #[must_use]
899 pub fn closed_session_ids(&self) -> Vec<SessionId> {
900 self.sessions
901 .iter()
902 .filter_map(|(sid, session)| {
903 matches!(
904 session.status,
905 SessionStatus::Closed
906 | SessionStatus::Cancelled
907 | SessionStatus::Faulted { .. }
908 )
909 .then_some(*sid)
910 })
911 .collect()
912 }
913
914 pub fn reap_sessions(&mut self, session_ids: &[SessionId]) -> Vec<ClosedSessionSummary> {
921 let mut reaped = Vec::new();
922 for sid in session_ids {
923 let Some(session) = self.sessions.get(sid) else {
924 continue;
925 };
926 if !matches!(
927 session.status,
928 SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
929 ) {
930 continue;
931 }
932
933 let session = self
934 .sessions
935 .remove(sid)
936 .expect("session existence checked before removal");
937 let summary = ClosedSessionSummary::from_session(&session);
938 self.archived_closed.push(summary.clone());
939 reaped.push(summary);
940 }
941 reaped
942 }
943
944 pub fn reap_closed(&mut self) -> Vec<ClosedSessionSummary> {
946 let sids = self.closed_session_ids();
947 self.reap_sessions(&sids)
948 }
949
950 #[must_use]
952 pub fn active_count(&self) -> usize {
953 self.sessions
954 .values()
955 .filter(|s| s.status == SessionStatus::Active)
956 .count()
957 }
958
959 #[must_use]
961 pub fn live_count(&self) -> usize {
962 self.sessions.len()
963 }
964
965 #[must_use]
967 pub fn session_ids(&self) -> Vec<SessionId> {
968 self.sessions.keys().copied().collect()
969 }
970
971 #[must_use]
973 pub fn archived_closed(&self) -> &[ClosedSessionSummary] {
974 &self.archived_closed
975 }
976
977 #[must_use]
979 pub fn memory_usage(&self) -> SessionStoreMemoryUsage {
980 let mut usage = SessionStoreMemoryUsage {
981 live_sessions: self.sessions.len(),
982 archived_closed_sessions: self.archived_closed.len(),
983 ..SessionStoreMemoryUsage::default()
984 };
985 usage.retained_bytes.archived_closed = self
986 .archived_closed
987 .iter()
988 .map(ClosedSessionSummary::retained_bytes_estimate)
989 .sum();
990
991 for session in self.sessions.values() {
992 if matches!(
993 session.status,
994 SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
995 ) {
996 usage.live_closed_sessions += 1;
997 }
998 usage.live_local_type_entries += session.local_types.len();
999 usage.live_buffer_count += session.buffers.len();
1000 usage.live_buffered_messages += session
1001 .buffers
1002 .values()
1003 .map(BoundedBuffer::len)
1004 .sum::<usize>();
1005 usage.live_edge_handler_count += session.edge_handlers.len();
1006 usage.live_auth_leaf_count += session.auth_leaves.values().map(Vec::len).sum::<usize>();
1007 usage.live_auth_tree_count += session.auth_trees.len();
1008 usage.live_auth_root_count += session.auth_roots.len();
1009 usage.retained_bytes.live_sessions += session.retained_session_core_bytes();
1010 usage.retained_bytes.local_types += session.retained_local_type_bytes();
1011 usage.retained_bytes.buffers += session.retained_buffer_bytes();
1012 usage.retained_bytes.traces += session.retained_trace_bytes();
1013 usage.retained_bytes.auth += session.retained_auth_bytes();
1014 usage.retained_bytes.handlers += session.retained_handler_bytes();
1015 }
1016 usage.retained_bytes.total = usage
1017 .retained_bytes
1018 .live_sessions
1019 .saturating_add(usage.retained_bytes.archived_closed)
1020 .saturating_add(usage.retained_bytes.local_types)
1021 .saturating_add(usage.retained_bytes.buffers)
1022 .saturating_add(usage.retained_bytes.traces)
1023 .saturating_add(usage.retained_bytes.auth)
1024 .saturating_add(usage.retained_bytes.handlers);
1025
1026 usage
1027 }
1028
1029 #[must_use]
1031 pub fn lookup_handler(&self, edge: &Edge) -> Option<&HandlerId> {
1032 self.sessions
1033 .get(&edge.sid)?
1034 .lookup_handler_for_roles(&edge.sender, &edge.receiver)
1035 }
1036
1037 #[must_use]
1039 pub fn default_handler_for_session(&self, sid: SessionId) -> Option<&HandlerId> {
1040 self.sessions.get(&sid)?.default_handler_binding()
1041 }
1042
1043 pub(crate) fn set_default_handler_for_session(&mut self, sid: SessionId, handler: HandlerId) {
1045 if let Some(session) = self.sessions.get_mut(&sid) {
1046 let handler_id = session.intern_handler_binding(&handler);
1047 session.default_handler = handler;
1048 session.default_handler_id = Some(handler_id);
1049 }
1050 }
1051
1052 pub(crate) fn update_handler(&mut self, edge: &Edge, handler: HandlerId) {
1054 if let Some(session) = self.sessions.get_mut(&edge.sid) {
1055 let handler_id = session.intern_handler_binding(&handler);
1056 if let Some(edge_key) = session.edge_key_for_roles(&edge.sender, &edge.receiver) {
1057 session.edge_handler_lookup.insert(edge_key, handler_id);
1058 }
1059 session.edge_handlers.insert(edge.clone(), handler);
1060 }
1061 }
1062
1063 #[must_use]
1065 pub fn lookup_trace(&self, edge: &Edge) -> Option<&[ValType]> {
1066 self.sessions
1067 .get(&edge.sid)?
1068 .edge_traces
1069 .get(edge)
1070 .map(Vec::as_slice)
1071 }
1072
1073 #[cfg_attr(not(test), allow(dead_code))]
1075 pub(crate) fn update_trace(&mut self, edge: &Edge, trace: Vec<ValType>) {
1076 if let Some(session) = self.sessions.get_mut(&edge.sid) {
1077 session.edge_traces.insert(edge.clone(), trace);
1078 }
1079 }
1080}
1081
1082#[must_use]
1088pub fn unfold_mu(lt: &LocalTypeR) -> LocalTypeR {
1090 match lt {
1091 LocalTypeR::Mu { body, .. } => unfold_mu(body),
1092 other => other.clone(),
1093 }
1094}
1095
1096#[must_use]
1101pub fn unfold_if_var(cont: &LocalTypeR, original: &LocalTypeR) -> LocalTypeR {
1102 match cont {
1103 LocalTypeR::Var(_) => unfold_mu(original),
1104 LocalTypeR::Mu { .. } => unfold_mu(cont),
1105 other => other.clone(),
1106 }
1107}
1108
1109#[must_use]
1115pub(crate) fn unfold_if_var_with_scope(
1116 cont: &LocalTypeR,
1117 original: &LocalTypeR,
1118) -> (LocalTypeR, Option<LocalTypeR>) {
1119 match cont {
1120 LocalTypeR::Var(_) => (unfold_mu(original), None),
1121 LocalTypeR::Mu { .. } => (unfold_mu(cont), Some(cont.clone())),
1122 other => (other.clone(), None),
1123 }
1124}