#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClosedSessionSummary {
pub sid: SessionId,
pub status: SessionStatus,
pub role_count: usize,
pub local_type_entries: usize,
pub edge_count: usize,
pub edge_handler_count: usize,
pub auth_leaf_count: usize,
pub auth_tree_count: usize,
pub auth_root_count: usize,
pub epoch: usize,
}
impl ClosedSessionSummary {
fn from_session(session: &SessionState) -> Self {
Self {
sid: session.sid,
status: session.status.clone(),
role_count: session.roles.len(),
local_type_entries: session.local_types.len(),
edge_count: session.buffers.len(),
edge_handler_count: session.edge_handlers.len(),
auth_leaf_count: session.auth_leaves.values().map(Vec::len).sum(),
auth_tree_count: session.auth_trees.len(),
auth_root_count: session.auth_roots.len(),
epoch: session.epoch,
}
}
fn retained_bytes_estimate(&self) -> usize {
std::mem::size_of::<Self>().saturating_add(serialized_bytes(self))
}
}
#[derive(Debug, Clone)]
pub struct SessionOpenPlan {
pub(crate) roles: Vec<String>,
pub(crate) role_ids: BTreeMap<String, u16>,
pub(crate) initial_types: Vec<(String, LocalTypeR, LocalTypeR)>,
pub(crate) edge_blueprint: Vec<((u16, u16), String, String)>,
pub(crate) active_branch_roles: Vec<String>,
}
impl SessionOpenPlan {
fn collect_protocol_edges(
role: &str,
local_type: &LocalTypeR,
role_ids: &BTreeMap<String, u16>,
edges: &mut BTreeSet<(u16, u16)>,
) {
match local_type {
LocalTypeR::End | LocalTypeR::Var(_) => {}
LocalTypeR::Mu { body, .. } => {
Self::collect_protocol_edges(role, body, role_ids, edges);
}
LocalTypeR::Send { partner, branches } => {
if let (Some(from_id), Some(to_id)) = (role_ids.get(role), role_ids.get(partner)) {
if from_id != to_id {
edges.insert((*from_id, *to_id));
}
}
for (_, _, continuation) in branches {
Self::collect_protocol_edges(role, continuation, role_ids, edges);
}
}
LocalTypeR::Recv { partner, branches } => {
if let (Some(from_id), Some(to_id)) = (role_ids.get(partner), role_ids.get(role)) {
if from_id != to_id {
edges.insert((*from_id, *to_id));
}
}
for (_, _, continuation) in branches {
Self::collect_protocol_edges(role, continuation, role_ids, edges);
}
}
}
}
#[must_use]
pub fn new(roles: &[String], initial_types: &BTreeMap<String, LocalTypeR>) -> Self {
let role_ids = SessionState::build_role_ids(roles);
let mut planned_types = Vec::with_capacity(roles.len());
let mut active_branch_roles = Vec::new();
for role in roles {
if let Some(original) = initial_types.get(role) {
let current = unfold_mu(original);
if SessionState::branch_shape(¤t).is_some() {
active_branch_roles.push(role.clone());
}
planned_types.push((role.clone(), current, original.clone()));
}
}
let mut protocol_edges = BTreeSet::new();
for role in roles {
if let Some(original) = initial_types.get(role) {
Self::collect_protocol_edges(role, original, &role_ids, &mut protocol_edges);
}
}
let mut edge_blueprint = Vec::with_capacity(protocol_edges.len());
for (from_id, to_id) in protocol_edges {
let from = roles
.get(usize::from(from_id))
.expect("sender role id must index the session-open role set")
.clone();
let to = roles
.get(usize::from(to_id))
.expect("receiver role id must index the session-open role set")
.clone();
edge_blueprint.push(((from_id, to_id), from, to));
}
Self {
roles: roles.to_vec(),
role_ids,
initial_types: planned_types,
edge_blueprint,
active_branch_roles,
}
}
#[must_use]
pub fn roles(&self) -> &[String] {
&self.roles
}
#[must_use]
pub fn edge_blueprint(&self) -> &[((u16, u16), String, String)] {
&self.edge_blueprint
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionStoreMemoryUsage {
pub live_sessions: usize,
pub live_closed_sessions: usize,
pub archived_closed_sessions: usize,
pub live_local_type_entries: usize,
pub live_buffer_count: usize,
pub live_buffered_messages: usize,
pub live_edge_handler_count: usize,
pub live_auth_leaf_count: usize,
pub live_auth_tree_count: usize,
pub live_auth_root_count: usize,
pub retained_bytes: SessionStoreRetainedBytes,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionStoreRetainedBytes {
pub live_sessions: usize,
pub archived_closed: usize,
pub local_types: usize,
pub buffers: usize,
pub traces: usize,
pub auth: usize,
pub handlers: usize,
pub total: usize,
}
pub type SessionId = usize;
pub type FragmentOwnerId = String;
pub type OwnershipEpoch = u64;
pub type OwnershipClaimId = u64;
pub type AuthorityWitnessId = u64;
pub type HandlerId = String;
type HandlerNumericId = u16;
type LabelNumericId = u16;
type EdgeKey = (u16, u16);
type LocalBranches<'a> = &'a [(Label, Option<ValType>, LocalTypeR)];
type HandlerIndexBuild = (
BTreeMap<HandlerId, HandlerNumericId>,
Vec<HandlerId>,
BTreeMap<EdgeKey, HandlerNumericId>,
Option<HandlerNumericId>,
);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub(crate) enum BranchDirection {
Send,
Recv,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CachedBranch {
pub(crate) direction: BranchDirection,
pub(crate) partner: String,
pub(crate) expected_type: Option<ValType>,
pub(crate) continuation: LocalTypeR,
}
pub(crate) const DEFAULT_HANDLER_ID: &str = "default_handler";
fn default_handler_id() -> HandlerId {
DEFAULT_HANDLER_ID.to_string()
}
fn serialized_bytes<T: Serialize>(value: &T) -> usize {
crate::serialization::binary_size(value)
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Edge {
pub sid: SessionId,
pub sender: String,
pub receiver: String,
}
impl Edge {
#[must_use]
pub fn new(sid: SessionId, sender: impl Into<String>, receiver: impl Into<String>) -> Self {
Self {
sid,
sender: sender.into(),
receiver: receiver.into(),
}
}
}
#[derive(Debug, Deserialize)]
struct EdgeJson {
sid: Option<SessionId>,
sender: String,
receiver: String,
}
pub fn decode_edge_json(
value: &JsonValue,
session_hint: Option<SessionId>,
) -> Result<Edge, String> {
let raw: EdgeJson =
serde_json::from_value(value.clone()).map_err(|e| format!("invalid edge json: {e}"))?;
let sid = raw
.sid
.or(session_hint)
.ok_or_else(|| "missing sid in edge json".to_string())?;
Ok(Edge::new(sid, raw.sender, raw.receiver))
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionStatus {
Active,
Draining,
Closed,
Cancelled,
Faulted {
reason: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OwnershipScope {
Session,
Fragments(BTreeSet<String>),
}
impl OwnershipScope {
#[must_use]
pub fn allows_session_mutation(&self) -> bool {
matches!(self, Self::Session)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OwnershipCapability {
pub session_id: SessionId,
pub owner_id: FragmentOwnerId,
pub generation: OwnershipEpoch,
pub scope: OwnershipScope,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OwnershipReceipt {
pub session_id: SessionId,
pub claim_id: OwnershipClaimId,
pub from_owner_id: FragmentOwnerId,
pub from_generation: OwnershipEpoch,
pub to_owner_id: FragmentOwnerId,
pub to_generation: OwnershipEpoch,
pub scope: OwnershipScope,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadinessWitness {
pub witness_id: AuthorityWitnessId,
pub session_id: SessionId,
pub owner_id: FragmentOwnerId,
pub generation: OwnershipEpoch,
pub scope: OwnershipScope,
pub predicate_ref: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CancellationWitness {
pub witness_id: AuthorityWitnessId,
pub session_id: SessionId,
pub owner_id: FragmentOwnerId,
pub generation: OwnershipEpoch,
pub reason: OwnershipTerminalReason,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TimeoutWitness {
pub witness_id: AuthorityWitnessId,
pub site: String,
pub issued_at_tick: u64,
pub until_tick: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuthorityArtifact {
OwnershipCapability(OwnershipCapability),
OwnershipReceipt(OwnershipReceipt),
Readiness(ReadinessWitness),
Cancellation(CancellationWitness),
Timeout(TimeoutWitness),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuthorityAuditEvent {
Issued,
Consumed,
Invalidated,
Committed,
RolledBack,
Rejected,
Expired,
}
impl From<AuthorityAuditEvent> for crate::capabilities::ProtocolCriticalCapabilityLifecycleState {
fn from(event: AuthorityAuditEvent) -> Self {
match event {
AuthorityAuditEvent::Issued => Self::Issued,
AuthorityAuditEvent::Consumed => Self::Consumed,
AuthorityAuditEvent::Invalidated => Self::Invalidated,
AuthorityAuditEvent::Committed => Self::Committed,
AuthorityAuditEvent::RolledBack => Self::RolledBack,
AuthorityAuditEvent::Rejected => Self::Rejected,
AuthorityAuditEvent::Expired => Self::Expired,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuthorityAuditRecord {
pub tick: Option<u64>,
pub artifact: AuthorityArtifact,
pub event: AuthorityAuditEvent,
pub reason: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OwnershipTerminalReason {
OwnerDied {
owner_id: FragmentOwnerId,
},
TransferAbandoned {
owner_id: FragmentOwnerId,
claim_id: OwnershipClaimId,
},
TransferCommitFailed {
owner_id: FragmentOwnerId,
claim_id: OwnershipClaimId,
reason: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OwnershipError {
SessionNotFound {
session_id: SessionId,
},
AlreadyClaimed {
session_id: SessionId,
current_owner_id: FragmentOwnerId,
},
Unclaimed {
session_id: SessionId,
},
StaleCapability {
session_id: SessionId,
owner_id: FragmentOwnerId,
expected_generation: OwnershipEpoch,
actual_generation: OwnershipEpoch,
},
ScopeViolation {
session_id: SessionId,
owner_id: FragmentOwnerId,
required: OwnershipScope,
actual: OwnershipScope,
},
TransferPending {
session_id: SessionId,
claim_id: OwnershipClaimId,
},
TransferNotPending {
session_id: SessionId,
},
ReceiptMismatch {
session_id: SessionId,
claim_id: OwnershipClaimId,
},
InvalidWitness {
session_id: SessionId,
witness_id: AuthorityWitnessId,
reason: String,
},
WitnessConsumed {
session_id: SessionId,
witness_id: AuthorityWitnessId,
},
Terminal {
session_id: SessionId,
reason: OwnershipTerminalReason,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionHostMutation {
SetDefaultHandler {
handler: HandlerId,
},
UpdateEdgeHandler {
edge: Edge,
handler: HandlerId,
},
UpdateTrace {
edge: Edge,
trace: Vec<ValType>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct PendingOwnershipTransfer {
pub(crate) receipt: OwnershipReceipt,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SessionOwnershipState {
pub(crate) current: Option<OwnershipCapability>,
pub(crate) pending_transfer: Option<PendingOwnershipTransfer>,
pub(crate) terminal_reason: Option<OwnershipTerminalReason>,
pub(crate) next_claim_id: OwnershipClaimId,
pub(crate) next_witness_id: AuthorityWitnessId,
pub(crate) issued_readiness: BTreeMap<AuthorityWitnessId, ReadinessWitness>,
pub(crate) consumed_witnesses: BTreeSet<AuthorityWitnessId>,
pub(crate) audit_log: Vec<AuthorityAuditRecord>,
}
impl Default for SessionOwnershipState {
fn default() -> Self {
Self {
current: None,
pending_transfer: None,
terminal_reason: None,
next_claim_id: 1,
next_witness_id: 1,
issued_readiness: BTreeMap::new(),
consumed_witnesses: BTreeSet::new(),
audit_log: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypeEntry {
pub current: LocalTypeR,
pub original: LocalTypeR,
}