impl<'de> Deserialize<'de> for SessionState {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let raw = SessionStateSerde::deserialize(deserializer)?;
let mut session = Self {
sid: raw.sid,
roles: raw.roles,
role_ids: BTreeMap::new(),
local_types: raw.local_types,
buffers: raw.buffers,
edge_lookup: BTreeMap::new(),
handler_ids: BTreeMap::new(),
handlers_by_id: Vec::new(),
edge_handler_lookup: BTreeMap::new(),
default_handler_id: None,
label_ids: BTreeMap::new(),
labels_by_id: Vec::new(),
branch_lookup: BTreeMap::new(),
auth_leaves: raw.auth_leaves,
auth_trees: raw.auth_trees,
auth_roots: raw.auth_roots,
edge_handlers: raw.edge_handlers,
default_handler: raw.default_handler,
edge_traces: raw.edge_traces,
status: raw.status,
epoch: raw.epoch,
ownership: raw.ownership,
};
session.rebuild_derived_indexes();
Ok(session)
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct SessionStore {
sessions: BTreeMap<SessionId, SessionState>,
#[serde(default)]
archived_closed: Vec<ClosedSessionSummary>,
next_id: SessionId,
}
impl SessionStore {
fn session_mut_or_error(
&mut self,
sid: SessionId,
) -> Result<&mut SessionState, OwnershipError> {
self.sessions
.get_mut(&sid)
.ok_or(OwnershipError::SessionNotFound { session_id: sid })
}
fn terminal_error(session: &SessionState) -> Option<OwnershipError> {
session
.ownership
.terminal_reason
.clone()
.map(|reason| OwnershipError::Terminal {
session_id: session.sid,
reason,
})
}
fn ensure_mutable_ownership(session: &SessionState) -> Result<(), OwnershipError> {
if let Some(err) = Self::terminal_error(session) {
return Err(err);
}
Ok(())
}
fn validate_current_owner(
session: &SessionState,
capability: &OwnershipCapability,
) -> Result<(), OwnershipError> {
Self::ensure_mutable_ownership(session)?;
let Some(current) = session.ownership.current.as_ref() else {
return Err(OwnershipError::Unclaimed {
session_id: session.sid,
});
};
if current.owner_id != capability.owner_id || current.generation != capability.generation {
return Err(OwnershipError::StaleCapability {
session_id: session.sid,
owner_id: capability.owner_id.clone(),
expected_generation: capability.generation,
actual_generation: current.generation,
});
}
Ok(())
}
fn require_session_scope(
session: &SessionState,
capability: &OwnershipCapability,
) -> Result<(), OwnershipError> {
Self::validate_current_owner(session, capability)?;
let Some(current) = session.ownership.current.as_ref() else {
return Err(OwnershipError::Unclaimed {
session_id: session.sid,
});
};
if !current.scope.allows_session_mutation() {
return Err(OwnershipError::ScopeViolation {
session_id: session.sid,
owner_id: current.owner_id.clone(),
required: OwnershipScope::Session,
actual: current.scope.clone(),
});
}
Ok(())
}
fn next_witness_id(session: &mut SessionState) -> AuthorityWitnessId {
let witness_id = session.ownership.next_witness_id;
session.ownership.next_witness_id = session.ownership.next_witness_id.saturating_add(1);
witness_id
}
fn push_authority_audit(
session: &mut SessionState,
artifact: AuthorityArtifact,
event: AuthorityAuditEvent,
reason: Option<String>,
) {
session.ownership.audit_log.push(AuthorityAuditRecord {
tick: None,
artifact,
event,
reason,
});
}
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[allow(clippy::needless_pass_by_value)]
pub fn open_with_sid(
&mut self,
sid: SessionId,
roles: Vec<String>,
buffer_config: &BufferConfig,
initial_types: &BTreeMap<String, LocalTypeR>,
) -> SessionId {
let plan = SessionOpenPlan::new(&roles, initial_types);
self.open_with_sid_from_plan(sid, &plan, buffer_config)
}
pub fn open_with_sid_from_plan(
&mut self,
sid: SessionId,
plan: &SessionOpenPlan,
buffer_config: &BufferConfig,
) -> SessionId {
let state = SessionState::from_open_plan(sid, plan, buffer_config);
self.sessions.insert(sid, state);
self.next_id = self.next_id.max(sid.saturating_add(1));
sid
}
#[allow(clippy::needless_pass_by_value)]
pub fn open(
&mut self,
roles: Vec<String>,
buffer_config: &BufferConfig,
initial_types: &BTreeMap<String, LocalTypeR>,
) -> SessionId {
let sid = self.next_id;
self.open_with_sid(sid, roles, buffer_config, initial_types)
}
#[must_use]
pub fn next_session_id(&self) -> SessionId {
self.next_id
}
#[must_use]
pub fn lookup_type(&self, ep: &Endpoint) -> Option<&LocalTypeR> {
self.sessions
.get(&ep.sid)?
.local_types
.get(ep)
.map(|e| &e.current)
}
pub fn update_type(&mut self, ep: &Endpoint, new_type: LocalTypeR) {
if let Some(session) = self.sessions.get_mut(&ep.sid) {
if let Some(entry) = session.local_types.get_mut(ep) {
entry.current = new_type;
}
session.refresh_endpoint_branch_lookup(ep);
}
}
pub fn update_original(&mut self, ep: &Endpoint, new_original: LocalTypeR) {
if let Some(session) = self.sessions.get_mut(&ep.sid) {
if let Some(entry) = session.local_types.get_mut(ep) {
entry.original = new_original;
}
}
}
#[must_use]
pub fn original_type(&self, ep: &Endpoint) -> Option<&LocalTypeR> {
self.sessions
.get(&ep.sid)?
.local_types
.get(ep)
.map(|e| &e.original)
}
pub fn remove_type(&mut self, ep: &Endpoint) {
if let Some(session) = self.sessions.get_mut(&ep.sid) {
session.local_types.remove(ep);
session.branch_lookup.remove(ep);
}
}
#[must_use]
pub fn get(&self, sid: SessionId) -> Option<&SessionState> {
self.sessions.get(&sid)
}
pub fn get_mut(&mut self, sid: SessionId) -> Option<&mut SessionState> {
self.sessions.get_mut(&sid)
}
#[must_use]
pub fn current_ownership(&self, sid: SessionId) -> Option<&OwnershipCapability> {
self.sessions.get(&sid)?.ownership.current.as_ref()
}
pub fn validate_ownership_capability(
&self,
capability: &OwnershipCapability,
) -> Result<(), OwnershipError> {
let session = self
.sessions
.get(&capability.session_id)
.ok_or(OwnershipError::SessionNotFound {
session_id: capability.session_id,
})?;
Self::validate_current_owner(session, capability)
}
#[must_use]
pub fn authority_audit_log(&self, sid: SessionId) -> Option<&[AuthorityAuditRecord]> {
Some(self.sessions.get(&sid)?.ownership.audit_log.as_slice())
}
pub fn claim_ownership(
&mut self,
sid: SessionId,
owner_id: impl Into<FragmentOwnerId>,
scope: OwnershipScope,
) -> Result<OwnershipCapability, OwnershipError> {
let session = self.session_mut_or_error(sid)?;
Self::ensure_mutable_ownership(session)?;
if let Some(current) = session.ownership.current.as_ref() {
return Err(OwnershipError::AlreadyClaimed {
session_id: sid,
current_owner_id: current.owner_id.clone(),
});
}
if let Some(pending) = session.ownership.pending_transfer.as_ref() {
return Err(OwnershipError::TransferPending {
session_id: sid,
claim_id: pending.receipt.claim_id,
});
}
let capability = OwnershipCapability {
session_id: sid,
owner_id: owner_id.into(),
generation: 0,
scope,
};
session.ownership.current = Some(capability.clone());
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(capability.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(capability)
}
pub fn release_ownership(
&mut self,
capability: &OwnershipCapability,
) -> Result<(), OwnershipError> {
let session = self.session_mut_or_error(capability.session_id)?;
Self::validate_current_owner(session, capability)?;
if let Some(pending) = session.ownership.pending_transfer.as_ref() {
return Err(OwnershipError::TransferPending {
session_id: capability.session_id,
claim_id: pending.receipt.claim_id,
});
}
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(capability.clone()),
AuthorityAuditEvent::Invalidated,
Some("ownership released".to_string()),
);
session.ownership.current = None;
Ok(())
}
pub fn begin_ownership_transfer(
&mut self,
capability: &OwnershipCapability,
new_owner_id: impl Into<FragmentOwnerId>,
new_scope: OwnershipScope,
) -> Result<OwnershipReceipt, OwnershipError> {
let session = self.session_mut_or_error(capability.session_id)?;
Self::validate_current_owner(session, capability)?;
if let Some(pending) = session.ownership.pending_transfer.as_ref() {
return Err(OwnershipError::TransferPending {
session_id: capability.session_id,
claim_id: pending.receipt.claim_id,
});
}
let claim_id = session.ownership.next_claim_id;
session.ownership.next_claim_id = session.ownership.next_claim_id.saturating_add(1);
let receipt = OwnershipReceipt {
session_id: capability.session_id,
claim_id,
from_owner_id: capability.owner_id.clone(),
from_generation: capability.generation,
to_owner_id: new_owner_id.into(),
to_generation: capability.generation.saturating_add(1),
scope: new_scope,
};
session.ownership.pending_transfer = Some(PendingOwnershipTransfer {
receipt: receipt.clone(),
});
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(receipt)
}
pub fn commit_ownership_transfer(
&mut self,
receipt: &OwnershipReceipt,
) -> Result<OwnershipCapability, OwnershipError> {
let session = self.session_mut_or_error(receipt.session_id)?;
Self::ensure_mutable_ownership(session)?;
let Some(current) = session.ownership.current.as_ref() else {
return Err(OwnershipError::Unclaimed {
session_id: receipt.session_id,
});
};
let Some(pending) = session.ownership.pending_transfer.as_ref() else {
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::Rejected,
Some("receipt is no longer pending".to_string()),
);
return Err(OwnershipError::TransferNotPending {
session_id: receipt.session_id,
});
};
if pending.receipt != *receipt {
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::Rejected,
Some("receipt payload mismatch".to_string()),
);
return Err(OwnershipError::ReceiptMismatch {
session_id: receipt.session_id,
claim_id: receipt.claim_id,
});
}
if current.owner_id != receipt.from_owner_id || current.generation != receipt.from_generation
{
return Err(OwnershipError::StaleCapability {
session_id: receipt.session_id,
owner_id: receipt.from_owner_id.clone(),
expected_generation: receipt.from_generation,
actual_generation: current.generation,
});
}
let capability = OwnershipCapability {
session_id: receipt.session_id,
owner_id: receipt.to_owner_id.clone(),
generation: receipt.to_generation,
scope: receipt.scope.clone(),
};
let old_capability = current.clone();
session.ownership.current = Some(capability.clone());
session.ownership.pending_transfer = None;
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(old_capability),
AuthorityAuditEvent::Invalidated,
Some("ownership transferred".to_string()),
);
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::Committed,
None,
);
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(capability.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(capability)
}
pub fn rollback_ownership_transfer(
&mut self,
receipt: &OwnershipReceipt,
) -> Result<(), OwnershipError> {
let session = self.session_mut_or_error(receipt.session_id)?;
Self::ensure_mutable_ownership(session)?;
let Some(pending) = session.ownership.pending_transfer.as_ref() else {
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::Rejected,
Some("receipt is no longer pending".to_string()),
);
return Err(OwnershipError::TransferNotPending {
session_id: receipt.session_id,
});
};
if pending.receipt != *receipt {
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::Rejected,
Some("receipt payload mismatch".to_string()),
);
return Err(OwnershipError::ReceiptMismatch {
session_id: receipt.session_id,
claim_id: receipt.claim_id,
});
}
session.ownership.pending_transfer = None;
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::RolledBack,
None,
);
Ok(())
}
pub fn attenuate_ownership_scope(
&mut self,
capability: &OwnershipCapability,
new_scope: OwnershipScope,
) -> Result<OwnershipCapability, OwnershipError> {
let session = self.session_mut_or_error(capability.session_id)?;
Self::validate_current_owner(session, capability)?;
if let Some(pending) = session.ownership.pending_transfer.as_ref() {
return Err(OwnershipError::TransferPending {
session_id: capability.session_id,
claim_id: pending.receipt.claim_id,
});
}
let next = OwnershipCapability {
session_id: capability.session_id,
owner_id: capability.owner_id.clone(),
generation: capability.generation.saturating_add(1),
scope: new_scope,
};
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(capability.clone()),
AuthorityAuditEvent::Invalidated,
Some("ownership scope attenuated".to_string()),
);
session.ownership.current = Some(next.clone());
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(next.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(next)
}
pub fn apply_owned_session_mutation(
&mut self,
capability: &OwnershipCapability,
mutation: SessionHostMutation,
) -> Result<(), OwnershipError> {
let session = self.session_mut_or_error(capability.session_id)?;
Self::require_session_scope(session, capability)?;
match mutation {
SessionHostMutation::SetDefaultHandler { handler } => {
let handler_id = session.intern_handler_binding(&handler);
session.default_handler = handler;
session.default_handler_id = Some(handler_id);
}
SessionHostMutation::UpdateEdgeHandler { edge, handler } => {
let handler_id = session.intern_handler_binding(&handler);
if let Some(edge_key) = session.edge_key_for_roles(&edge.sender, &edge.receiver) {
session.edge_handler_lookup.insert(edge_key, handler_id);
}
session.edge_handlers.insert(edge, handler);
}
SessionHostMutation::UpdateTrace { edge, trace } => {
session.edge_traces.insert(edge, trace);
}
}
Ok(())
}
pub fn issue_readiness_witness(
&mut self,
capability: &OwnershipCapability,
predicate_ref: impl Into<String>,
) -> Result<ReadinessWitness, OwnershipError> {
let session = self.session_mut_or_error(capability.session_id)?;
Self::require_session_scope(session, capability)?;
let witness = ReadinessWitness {
witness_id: Self::next_witness_id(session),
session_id: capability.session_id,
owner_id: capability.owner_id.clone(),
generation: capability.generation,
scope: capability.scope.clone(),
predicate_ref: predicate_ref.into(),
};
session
.ownership
.issued_readiness
.insert(witness.witness_id, witness.clone());
Self::push_authority_audit(
session,
AuthorityArtifact::Readiness(witness.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(witness)
}
pub fn consume_readiness_witness(
&mut self,
capability: &OwnershipCapability,
witness: &ReadinessWitness,
) -> Result<(), OwnershipError> {
let session = self.session_mut_or_error(capability.session_id)?;
Self::require_session_scope(session, capability)?;
if session.ownership.consumed_witnesses.contains(&witness.witness_id) {
Self::push_authority_audit(
session,
AuthorityArtifact::Readiness(witness.clone()),
AuthorityAuditEvent::Rejected,
Some("witness already consumed".to_string()),
);
return Err(OwnershipError::WitnessConsumed {
session_id: witness.session_id,
witness_id: witness.witness_id,
});
}
let Some(issued) = session.ownership.issued_readiness.get(&witness.witness_id) else {
Self::push_authority_audit(
session,
AuthorityArtifact::Readiness(witness.clone()),
AuthorityAuditEvent::Rejected,
Some("witness was never issued".to_string()),
);
return Err(OwnershipError::InvalidWitness {
session_id: capability.session_id,
witness_id: witness.witness_id,
reason: "witness was never issued".to_string(),
});
};
if issued != witness {
Self::push_authority_audit(
session,
AuthorityArtifact::Readiness(witness.clone()),
AuthorityAuditEvent::Rejected,
Some("witness payload mismatch".to_string()),
);
return Err(OwnershipError::InvalidWitness {
session_id: capability.session_id,
witness_id: witness.witness_id,
reason: "witness payload mismatch".to_string(),
});
}
if witness.session_id != capability.session_id
|| witness.owner_id != capability.owner_id
|| witness.generation != capability.generation
|| witness.scope != capability.scope
{
Self::push_authority_audit(
session,
AuthorityArtifact::Readiness(witness.clone()),
AuthorityAuditEvent::Rejected,
Some("live ownership no longer matches witness".to_string()),
);
return Err(OwnershipError::InvalidWitness {
session_id: capability.session_id,
witness_id: witness.witness_id,
reason: "live ownership no longer matches witness".to_string(),
});
}
session.ownership.issued_readiness.remove(&witness.witness_id);
session
.ownership
.consumed_witnesses
.insert(witness.witness_id);
Self::push_authority_audit(
session,
AuthorityArtifact::Readiness(witness.clone()),
AuthorityAuditEvent::Consumed,
None,
);
Ok(())
}
pub fn mark_owner_died(
&mut self,
sid: SessionId,
owner_id: &str,
) -> Result<CancellationWitness, OwnershipError> {
let session = self.session_mut_or_error(sid)?;
Self::ensure_mutable_ownership(session)?;
let Some(current) = session.ownership.current.clone() else {
return Err(OwnershipError::Unclaimed { session_id: sid });
};
if current.owner_id != owner_id {
return Err(OwnershipError::StaleCapability {
session_id: sid,
owner_id: owner_id.to_string(),
expected_generation: current.generation,
actual_generation: current.generation,
});
}
let generation = current.generation;
let reason = OwnershipTerminalReason::OwnerDied {
owner_id: owner_id.to_string(),
};
session.status = SessionStatus::Faulted {
reason: format!("ownership owner `{owner_id}` died"),
};
let witness = CancellationWitness {
witness_id: Self::next_witness_id(session),
session_id: sid,
owner_id: owner_id.to_string(),
generation,
reason: reason.clone(),
};
session.ownership.current = None;
session.ownership.pending_transfer = None;
session.ownership.terminal_reason = Some(reason);
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(current),
AuthorityAuditEvent::Invalidated,
Some("owner died".to_string()),
);
Self::push_authority_audit(
session,
AuthorityArtifact::Cancellation(witness.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(witness)
}
pub fn cancel_abandoned_transfer(
&mut self,
receipt: &OwnershipReceipt,
) -> Result<CancellationWitness, OwnershipError> {
let session = self.session_mut_or_error(receipt.session_id)?;
Self::ensure_mutable_ownership(session)?;
let Some(pending) = session.ownership.pending_transfer.as_ref() else {
return Err(OwnershipError::TransferNotPending {
session_id: receipt.session_id,
});
};
if pending.receipt != *receipt {
return Err(OwnershipError::ReceiptMismatch {
session_id: receipt.session_id,
claim_id: receipt.claim_id,
});
}
let reason = OwnershipTerminalReason::TransferAbandoned {
owner_id: receipt.from_owner_id.clone(),
claim_id: receipt.claim_id,
};
session.status = SessionStatus::Cancelled;
let witness = CancellationWitness {
witness_id: Self::next_witness_id(session),
session_id: receipt.session_id,
owner_id: receipt.from_owner_id.clone(),
generation: receipt.from_generation,
reason: reason.clone(),
};
if let Some(current) = session.ownership.current.clone() {
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(current),
AuthorityAuditEvent::Invalidated,
Some("transfer abandoned".to_string()),
);
}
session.ownership.current = None;
session.ownership.pending_transfer = None;
session.ownership.terminal_reason = Some(reason);
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::RolledBack,
Some("transfer abandoned".to_string()),
);
Self::push_authority_audit(
session,
AuthorityArtifact::Cancellation(witness.clone()),
AuthorityAuditEvent::Issued,
None,
);
Ok(witness)
}
pub fn fault_failed_transfer_commit(
&mut self,
receipt: &OwnershipReceipt,
reason: impl Into<String>,
) -> Result<(), OwnershipError> {
let session = self.session_mut_or_error(receipt.session_id)?;
Self::ensure_mutable_ownership(session)?;
let Some(pending) = session.ownership.pending_transfer.as_ref() else {
return Err(OwnershipError::TransferNotPending {
session_id: receipt.session_id,
});
};
if pending.receipt != *receipt {
return Err(OwnershipError::ReceiptMismatch {
session_id: receipt.session_id,
claim_id: receipt.claim_id,
});
}
let reason = reason.into();
let terminal = OwnershipTerminalReason::TransferCommitFailed {
owner_id: receipt.from_owner_id.clone(),
claim_id: receipt.claim_id,
reason: reason.clone(),
};
session.status = SessionStatus::Faulted {
reason: format!("ownership transfer commit failed: {reason}"),
};
if let Some(current) = session.ownership.current.clone() {
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipCapability(current),
AuthorityAuditEvent::Invalidated,
Some("transfer commit failed".to_string()),
);
}
session.ownership.current = None;
session.ownership.pending_transfer = None;
session.ownership.terminal_reason = Some(terminal);
Self::push_authority_audit(
session,
AuthorityArtifact::OwnershipReceipt(receipt.clone()),
AuthorityAuditEvent::RolledBack,
Some(reason),
);
Ok(())
}
pub fn iter(&self) -> impl Iterator<Item = &SessionState> {
self.sessions.values()
}
pub fn close(&mut self, sid: SessionId) -> Result<(), String> {
let session = self
.sessions
.get_mut(&sid)
.ok_or_else(|| format!("session {sid} not found"))?;
session.status = SessionStatus::Closed;
session.buffers.clear();
session.edge_traces.clear();
session.epoch = session.epoch.saturating_add(1);
Ok(())
}
#[must_use]
pub fn closed_session_ids(&self) -> Vec<SessionId> {
self.sessions
.iter()
.filter_map(|(sid, session)| {
matches!(
session.status,
SessionStatus::Closed
| SessionStatus::Cancelled
| SessionStatus::Faulted { .. }
)
.then_some(*sid)
})
.collect()
}
pub fn reap_sessions(&mut self, session_ids: &[SessionId]) -> Vec<ClosedSessionSummary> {
let mut reaped = Vec::new();
for sid in session_ids {
let Some(session) = self.sessions.get(sid) else {
continue;
};
if !matches!(
session.status,
SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
) {
continue;
}
let session = self
.sessions
.remove(sid)
.expect("session existence checked before removal");
let summary = ClosedSessionSummary::from_session(&session);
self.archived_closed.push(summary.clone());
reaped.push(summary);
}
reaped
}
pub fn reap_closed(&mut self) -> Vec<ClosedSessionSummary> {
let sids = self.closed_session_ids();
self.reap_sessions(&sids)
}
#[must_use]
pub fn active_count(&self) -> usize {
self.sessions
.values()
.filter(|s| s.status == SessionStatus::Active)
.count()
}
#[must_use]
pub fn live_count(&self) -> usize {
self.sessions.len()
}
#[must_use]
pub fn session_ids(&self) -> Vec<SessionId> {
self.sessions.keys().copied().collect()
}
#[must_use]
pub fn archived_closed(&self) -> &[ClosedSessionSummary] {
&self.archived_closed
}
#[must_use]
pub fn memory_usage(&self) -> SessionStoreMemoryUsage {
let mut usage = SessionStoreMemoryUsage {
live_sessions: self.sessions.len(),
archived_closed_sessions: self.archived_closed.len(),
..SessionStoreMemoryUsage::default()
};
usage.retained_bytes.archived_closed = self
.archived_closed
.iter()
.map(ClosedSessionSummary::retained_bytes_estimate)
.sum();
for session in self.sessions.values() {
if matches!(
session.status,
SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
) {
usage.live_closed_sessions += 1;
}
usage.live_local_type_entries += session.local_types.len();
usage.live_buffer_count += session.buffers.len();
usage.live_buffered_messages += session
.buffers
.values()
.map(BoundedBuffer::len)
.sum::<usize>();
usage.live_edge_handler_count += session.edge_handlers.len();
usage.live_auth_leaf_count += session.auth_leaves.values().map(Vec::len).sum::<usize>();
usage.live_auth_tree_count += session.auth_trees.len();
usage.live_auth_root_count += session.auth_roots.len();
usage.retained_bytes.live_sessions += session.retained_session_core_bytes();
usage.retained_bytes.local_types += session.retained_local_type_bytes();
usage.retained_bytes.buffers += session.retained_buffer_bytes();
usage.retained_bytes.traces += session.retained_trace_bytes();
usage.retained_bytes.auth += session.retained_auth_bytes();
usage.retained_bytes.handlers += session.retained_handler_bytes();
}
usage.retained_bytes.total = usage
.retained_bytes
.live_sessions
.saturating_add(usage.retained_bytes.archived_closed)
.saturating_add(usage.retained_bytes.local_types)
.saturating_add(usage.retained_bytes.buffers)
.saturating_add(usage.retained_bytes.traces)
.saturating_add(usage.retained_bytes.auth)
.saturating_add(usage.retained_bytes.handlers);
usage
}
#[must_use]
pub fn lookup_handler(&self, edge: &Edge) -> Option<&HandlerId> {
self.sessions
.get(&edge.sid)?
.lookup_handler_for_roles(&edge.sender, &edge.receiver)
}
#[must_use]
pub fn default_handler_for_session(&self, sid: SessionId) -> Option<&HandlerId> {
self.sessions.get(&sid)?.default_handler_binding()
}
pub(crate) fn set_default_handler_for_session(&mut self, sid: SessionId, handler: HandlerId) {
if let Some(session) = self.sessions.get_mut(&sid) {
let handler_id = session.intern_handler_binding(&handler);
session.default_handler = handler;
session.default_handler_id = Some(handler_id);
}
}
pub(crate) fn update_handler(&mut self, edge: &Edge, handler: HandlerId) {
if let Some(session) = self.sessions.get_mut(&edge.sid) {
let handler_id = session.intern_handler_binding(&handler);
if let Some(edge_key) = session.edge_key_for_roles(&edge.sender, &edge.receiver) {
session.edge_handler_lookup.insert(edge_key, handler_id);
}
session.edge_handlers.insert(edge.clone(), handler);
}
}
#[must_use]
pub fn lookup_trace(&self, edge: &Edge) -> Option<&[ValType]> {
self.sessions
.get(&edge.sid)?
.edge_traces
.get(edge)
.map(Vec::as_slice)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn update_trace(&mut self, edge: &Edge, trace: Vec<ValType>) {
if let Some(session) = self.sessions.get_mut(&edge.sid) {
session.edge_traces.insert(edge.clone(), trace);
}
}
}
#[must_use]
pub fn unfold_mu(lt: &LocalTypeR) -> LocalTypeR {
match lt {
LocalTypeR::Mu { body, .. } => unfold_mu(body),
other => other.clone(),
}
}
#[must_use]
pub fn unfold_if_var(cont: &LocalTypeR, original: &LocalTypeR) -> LocalTypeR {
match cont {
LocalTypeR::Var(_) => unfold_mu(original),
LocalTypeR::Mu { .. } => unfold_mu(cont),
other => other.clone(),
}
}
#[must_use]
pub(crate) fn unfold_if_var_with_scope(
cont: &LocalTypeR,
original: &LocalTypeR,
) -> (LocalTypeR, Option<LocalTypeR>) {
match cont {
LocalTypeR::Var(_) => (unfold_mu(original), None),
LocalTypeR::Mu { .. } => (unfold_mu(cont), Some(cont.clone())),
other => (other.clone(), None),
}
}