1#[derive(Debug, Clone, Serialize)]
6pub struct SessionState {
7 pub sid: SessionId,
9 pub roles: Vec<String>,
11 #[serde(skip)]
13 role_ids: BTreeMap<String, u16>,
14 pub local_types: BTreeMap<Endpoint, TypeEntry>,
18 pub buffers: BTreeMap<Edge, SignedBuffer<Signature>>,
20 #[serde(skip)]
22 edge_lookup: BTreeMap<(u16, u16), Edge>,
23 #[serde(skip)]
25 handler_ids: BTreeMap<HandlerId, HandlerNumericId>,
26 #[serde(skip)]
28 handlers_by_id: Vec<HandlerId>,
29 #[serde(skip)]
31 edge_handler_lookup: BTreeMap<(u16, u16), HandlerNumericId>,
32 #[serde(skip)]
34 default_handler_id: Option<HandlerNumericId>,
35 #[serde(skip)]
37 label_ids: BTreeMap<String, LabelNumericId>,
38 #[serde(skip)]
40 labels_by_id: Vec<String>,
41 #[serde(skip)]
43 branch_lookup: BTreeMap<Endpoint, BTreeMap<LabelNumericId, CachedBranch>>,
44 pub auth_leaves: BTreeMap<Edge, Vec<Hash>>,
46 #[serde(default)]
48 pub auth_trees: BTreeMap<Edge, AuthTree>,
49 pub auth_roots: BTreeMap<Edge, Hash>,
51 pub edge_handlers: BTreeMap<Edge, HandlerId>,
53 #[serde(default = "default_handler_id")]
55 pub default_handler: HandlerId,
56 pub edge_traces: BTreeMap<Edge, Vec<ValType>>,
58 pub status: SessionStatus,
60 pub epoch: usize,
62 ownership: SessionOwnershipState,
64}
65
66impl SessionState {
67 pub(crate) fn from_open_plan(
68 sid: SessionId,
69 plan: &SessionOpenPlan,
70 buffer_config: &BufferConfig,
71 ) -> Self {
72 let mut local_type_entries = Vec::with_capacity(plan.initial_types.len());
73 for (role, current, original) in &plan.initial_types {
74 local_type_entries.push((
75 Endpoint {
76 sid,
77 role: role.clone(),
78 },
79 TypeEntry {
80 current: current.clone(),
81 original: original.clone(),
82 },
83 ));
84 }
85 let local_types = local_type_entries.into_iter().collect();
86
87 let mut edge_entries = Vec::with_capacity(plan.edge_blueprint().len());
88 let mut buffer_entries = Vec::with_capacity(plan.edge_blueprint().len());
89 for (key, from, to) in plan.edge_blueprint() {
90 let edge = Edge::new(sid, from.clone(), to.clone());
91 edge_entries.push((*key, edge.clone()));
92 buffer_entries.push((edge, BoundedBuffer::new(buffer_config)));
93 }
94 let edge_lookup = edge_entries.into_iter().collect();
95 let buffers = buffer_entries.into_iter().collect();
96
97 let default_handler = default_handler_id();
98 let (handler_ids, handlers_by_id, edge_handler_lookup, default_handler_id) =
99 Self::build_handler_indexes(&plan.role_ids, &default_handler, &BTreeMap::new());
100
101 let mut state = Self {
102 sid,
103 roles: plan.roles.clone(),
104 role_ids: plan.role_ids.clone(),
105 local_types,
106 buffers,
107 edge_lookup,
108 handler_ids,
109 handlers_by_id,
110 edge_handler_lookup,
111 default_handler_id,
112 label_ids: BTreeMap::new(),
113 labels_by_id: Vec::new(),
114 branch_lookup: BTreeMap::new(),
115 auth_leaves: BTreeMap::new(),
116 auth_trees: BTreeMap::new(),
117 auth_roots: BTreeMap::new(),
118 edge_handlers: BTreeMap::new(),
119 default_handler,
120 edge_traces: BTreeMap::new(),
121 status: SessionStatus::Active,
122 epoch: 0,
123 ownership: SessionOwnershipState::default(),
124 };
125 for role in &plan.active_branch_roles {
126 state.refresh_endpoint_branch_lookup(&Endpoint {
127 sid,
128 role: role.clone(),
129 });
130 }
131 state
132 }
133
134 fn retained_session_core_bytes(&self) -> usize {
135 std::mem::size_of::<Self>()
136 .saturating_add(serialized_bytes(&self.sid))
137 .saturating_add(serialized_bytes(&self.roles))
138 .saturating_add(serialized_bytes(&self.role_ids))
139 .saturating_add(serialized_bytes(&self.edge_lookup))
140 .saturating_add(serialized_bytes(&self.handler_ids))
141 .saturating_add(serialized_bytes(&self.handlers_by_id))
142 .saturating_add(serialized_bytes(&self.edge_handler_lookup))
143 .saturating_add(serialized_bytes(&self.default_handler_id))
144 .saturating_add(serialized_bytes(&self.label_ids))
145 .saturating_add(serialized_bytes(&self.labels_by_id))
146 .saturating_add(serialized_bytes(&self.branch_lookup))
147 .saturating_add(serialized_bytes(&self.status))
148 .saturating_add(serialized_bytes(&self.epoch))
149 .saturating_add(serialized_bytes(&self.ownership))
150 }
151
152 fn retained_local_type_bytes(&self) -> usize {
153 serialized_bytes(&self.local_types)
154 }
155
156 fn retained_buffer_bytes(&self) -> usize {
157 serialized_bytes(&self.buffers)
158 }
159
160 fn retained_trace_bytes(&self) -> usize {
161 serialized_bytes(&self.edge_traces)
162 }
163
164 fn retained_auth_bytes(&self) -> usize {
165 serialized_bytes(&self.auth_leaves)
166 .saturating_add(serialized_bytes(&self.auth_trees))
167 .saturating_add(serialized_bytes(&self.auth_roots))
168 }
169
170 fn retained_handler_bytes(&self) -> usize {
171 serialized_bytes(&self.edge_handlers)
172 .saturating_add(serialized_bytes(&self.default_handler))
173 }
174
175 fn rebuild_derived_indexes(&mut self) {
176 self.role_ids = Self::build_role_ids(&self.roles);
177 self.edge_lookup = Self::build_edge_lookup_from_buffers(&self.role_ids, &self.buffers);
178 self.refresh_handler_indexes();
179 self.label_ids = BTreeMap::new();
180 self.labels_by_id = Vec::new();
181 self.branch_lookup = BTreeMap::new();
182 let endpoints: Vec<Endpoint> = self.local_types.keys().cloned().collect();
183 for endpoint in endpoints {
184 self.refresh_endpoint_branch_lookup(&endpoint);
185 }
186 }
187
188 pub(crate) fn refresh_handler_indexes(&mut self) {
189 let (handler_ids, handlers_by_id, edge_handler_lookup, default_handler_id) =
190 Self::build_handler_indexes(&self.role_ids, &self.default_handler, &self.edge_handlers);
191 self.handler_ids = handler_ids;
192 self.handlers_by_id = handlers_by_id;
193 self.edge_handler_lookup = edge_handler_lookup;
194 self.default_handler_id = default_handler_id;
195 }
196
197 pub(crate) fn build_role_ids(roles: &[String]) -> BTreeMap<String, u16> {
198 roles
199 .iter()
200 .enumerate()
201 .map(|(idx, role)| {
202 (
203 role.clone(),
204 u16::try_from(idx).expect("role count should fit in u16"),
205 )
206 })
207 .collect()
208 }
209
210 pub(crate) fn build_edge_lookup_from_buffers(
211 role_ids: &BTreeMap<String, u16>,
212 buffers: &BTreeMap<Edge, SignedBuffer<Signature>>,
213 ) -> BTreeMap<EdgeKey, Edge> {
214 let mut lookup = BTreeMap::new();
215 for edge in buffers.keys() {
216 let Some(from_id) = role_ids.get(&edge.sender) else {
217 continue;
218 };
219 let Some(to_id) = role_ids.get(&edge.receiver) else {
220 continue;
221 };
222 lookup.insert((*from_id, *to_id), edge.clone());
223 }
224 lookup
225 }
226
227 pub(crate) fn build_handler_indexes(
228 role_ids: &BTreeMap<String, u16>,
229 default_handler: &str,
230 edge_handlers: &BTreeMap<Edge, HandlerId>,
231 ) -> HandlerIndexBuild {
232 let mut handler_ids = BTreeMap::new();
233 let mut handlers_by_id = Vec::new();
234 let intern_handler = |handler: &str,
235 handler_ids: &mut BTreeMap<HandlerId, HandlerNumericId>,
236 handlers_by_id: &mut Vec<HandlerId>|
237 -> HandlerNumericId {
238 if let Some(id) = handler_ids.get(handler) {
239 return *id;
240 }
241 let id = u16::try_from(handlers_by_id.len()).expect("handler count should fit in u16");
242 let owned = handler.to_string();
243 handler_ids.insert(owned.clone(), id);
244 handlers_by_id.push(owned);
245 id
246 };
247
248 let default_handler_id = (!default_handler.is_empty())
249 .then(|| intern_handler(default_handler, &mut handler_ids, &mut handlers_by_id));
250
251 let mut edge_handler_lookup = BTreeMap::new();
252 for (edge, handler) in edge_handlers {
253 let Some(from_id) = role_ids.get(&edge.sender) else {
254 continue;
255 };
256 let Some(to_id) = role_ids.get(&edge.receiver) else {
257 continue;
258 };
259 let handler_id = intern_handler(handler, &mut handler_ids, &mut handlers_by_id);
260 edge_handler_lookup.insert((*from_id, *to_id), handler_id);
261 }
262
263 (
264 handler_ids,
265 handlers_by_id,
266 edge_handler_lookup,
267 default_handler_id,
268 )
269 }
270
271 fn edge_for_roles(&self, from: &str, to: &str) -> Option<&Edge> {
272 let from_id = self.role_ids.get(from)?;
273 let to_id = self.role_ids.get(to)?;
274 self.edge_lookup.get(&(*from_id, *to_id))
275 }
276
277 fn edge_key_for_roles(&self, from: &str, to: &str) -> Option<(u16, u16)> {
278 let from_id = self.role_ids.get(from)?;
279 let to_id = self.role_ids.get(to)?;
280 Some((*from_id, *to_id))
281 }
282
283 fn intern_label(&mut self, label: &str) -> LabelNumericId {
284 if let Some(id) = self.label_ids.get(label) {
285 return *id;
286 }
287 let id = u16::try_from(self.labels_by_id.len()).expect("label count should fit in u16");
288 let owned = label.to_string();
289 self.label_ids.insert(owned.clone(), id);
290 self.labels_by_id.push(owned);
291 id
292 }
293
294 fn intern_handler_binding(&mut self, handler: &str) -> HandlerNumericId {
295 if let Some(id) = self.handler_ids.get(handler) {
296 return *id;
297 }
298 let id = u16::try_from(self.handlers_by_id.len()).expect("handler count should fit in u16");
299 let owned = handler.to_string();
300 self.handler_ids.insert(owned.clone(), id);
301 self.handlers_by_id.push(owned);
302 id
303 }
304
305 fn handler_by_id(&self, handler_id: HandlerNumericId) -> Option<&HandlerId> {
306 self.handlers_by_id.get(usize::from(handler_id))
307 }
308
309 fn branch_shape(local_type: &LocalTypeR) -> Option<(BranchDirection, &str, LocalBranches<'_>)> {
310 match local_type {
311 LocalTypeR::Send { partner, branches } => {
312 Some((BranchDirection::Send, partner.as_str(), branches.as_slice()))
313 }
314 LocalTypeR::Recv { partner, branches } => {
315 Some((BranchDirection::Recv, partner.as_str(), branches.as_slice()))
316 }
317 _ => None,
318 }
319 }
320
321 pub(crate) fn refresh_endpoint_branch_lookup(&mut self, ep: &Endpoint) {
322 self.branch_lookup.remove(ep);
323 let Some(entry) = self.local_types.get(ep) else {
324 return;
325 };
326 let Some((direction, partner, branches)) = Self::branch_shape(&entry.current) else {
327 return;
328 };
329 let partner = partner.to_string();
330 let branches: Vec<(String, Option<ValType>, LocalTypeR)> = branches
331 .iter()
332 .map(|(label, expected_type, continuation)| {
333 (
334 label.name.clone(),
335 expected_type.clone(),
336 continuation.clone(),
337 )
338 })
339 .collect();
340
341 let mut endpoint_lookup = BTreeMap::new();
342 for (label, expected_type, continuation) in branches {
343 let label_id = self.intern_label(&label);
344 endpoint_lookup.insert(
345 label_id,
346 CachedBranch {
347 direction,
348 partner: partner.clone(),
349 expected_type,
350 continuation,
351 },
352 );
353 }
354 if !endpoint_lookup.is_empty() {
355 self.branch_lookup.insert(ep.clone(), endpoint_lookup);
356 }
357 }
358
359 #[must_use]
361 pub(crate) fn lookup_branch_resolution(
362 &self,
363 ep: &Endpoint,
364 label: &str,
365 ) -> Option<&CachedBranch> {
366 let label_id = self.label_ids.get(label)?;
367 self.branch_lookup.get(ep)?.get(label_id)
368 }
369
370 fn update_auth_tree(&mut self, edge: &Edge, signed: &SignedValue<Signature>) {
371 let bytes = crate::serialization::binary_encode(signed).unwrap_or_default();
372 let leaf = DefaultVerificationModel::hash(HashTag::MerkleLeaf, &bytes);
373 self.auth_leaves.entry(edge.clone()).or_default().push(leaf);
374 let tree = self
375 .auth_trees
376 .entry(edge.clone())
377 .or_insert_with(|| AuthTree::new(Vec::new()));
378 tree.append_leaf(leaf);
379 self.auth_roots.insert(edge.clone(), tree.root());
380 }
381
382 pub fn send_signed(
388 &mut self,
389 from: &str,
390 to: &str,
391 signed: &SignedValue<Signature>,
392 ) -> Result<crate::buffer::EnqueueResult, String> {
393 let edge = self
394 .edge_for_roles(from, to)
395 .cloned()
396 .ok_or_else(|| format!("no buffer for edge {from} → {to}"))?;
397 let buf = self
398 .buffers
399 .get_mut(&edge)
400 .ok_or_else(|| format!("no buffer for edge {from} → {to}"))?;
401 let result = buf.enqueue(signed.clone());
402 if matches!(result, crate::buffer::EnqueueResult::Ok) {
403 self.update_auth_tree(&edge, signed);
404 }
405 Ok(result)
406 }
407
408 pub fn send(
416 &mut self,
417 from: &str,
418 to: &str,
419 val: Value,
420 ) -> Result<crate::buffer::EnqueueResult, String> {
421 let signer = signing_key_for_endpoint(&Endpoint {
422 sid: self.sid,
423 role: from.to_string(),
424 });
425 let signature = sign_value(&val, &signer);
426 self.send_signed(
427 from,
428 to,
429 &SignedValue {
430 payload: val,
431 signature,
432 sequence_no: 0,
433 },
434 )
435 }
436
437 pub fn send_with_sequence(
443 &mut self,
444 from: &str,
445 to: &str,
446 val: Value,
447 sequence_no: u64,
448 ) -> Result<crate::buffer::EnqueueResult, String> {
449 let signer = signing_key_for_endpoint(&Endpoint {
450 sid: self.sid,
451 role: from.to_string(),
452 });
453 let signature = sign_value(&val, &signer);
454 self.send_signed(
455 from,
456 to,
457 &SignedValue {
458 payload: val,
459 signature,
460 sequence_no,
461 },
462 )
463 }
464
465 pub fn recv_signed(&mut self, from: &str, to: &str) -> Option<SignedValue<Signature>> {
467 let edge = self.edge_for_roles(from, to)?.clone();
468 self.buffers.get_mut(&edge).and_then(|buf| buf.dequeue())
469 }
470
471 pub fn recv_verified_signed(
477 &mut self,
478 from: &str,
479 to: &str,
480 ) -> Result<Option<SignedValue<Signature>>, String> {
481 let sender = Endpoint {
482 sid: self.sid,
483 role: from.to_string(),
484 };
485 let verifying = verifying_key_for_endpoint(&sender);
486 let signed = self.recv_signed(from, to);
487 let Some(signed) = signed else {
488 return Ok(None);
489 };
490 if !verify_signed_value(&signed.payload, &signed.signature, &verifying) {
491 return Err(format!(
492 "signature verification failed on edge {from} -> {to}"
493 ));
494 }
495 Ok(Some(signed))
496 }
497
498 pub fn recv_verified(&mut self, from: &str, to: &str) -> Result<Option<Value>, String> {
504 Ok(self
505 .recv_verified_signed(from, to)?
506 .map(|signed| signed.payload))
507 }
508
509 pub fn recv(&mut self, from: &str, to: &str) -> Option<Value> {
511 self.recv_verified(from, to).ok().flatten()
512 }
513
514 #[must_use]
516 pub fn has_message(&self, from: &str, to: &str) -> bool {
517 let Some(edge) = self.edge_for_roles(from, to) else {
518 return false;
519 };
520 self.buffers.get(edge).is_some_and(|buf| !buf.is_empty())
521 }
522
523 #[must_use]
525 pub fn lookup_handler_for_roles(&self, from: &str, to: &str) -> Option<&HandlerId> {
526 if self.edge_handlers.is_empty() {
527 return None;
528 }
529 let edge_key = self.edge_key_for_roles(from, to)?;
530 let handler_id = self.edge_handler_lookup.get(&edge_key)?;
531 self.handler_by_id(*handler_id)
532 }
533
534 #[must_use]
536 pub fn default_handler_binding(&self) -> Option<&HandlerId> {
537 if self.default_handler.is_empty() {
538 return None;
539 }
540 let handler_id = self.default_handler_id?;
541 self.handler_by_id(handler_id)
542 }
543
544 #[must_use]
546 pub fn has_bound_handler(&self) -> bool {
547 !self.default_handler.is_empty() || !self.edge_handlers.is_empty()
548 }
549
550 #[must_use]
552 #[cfg_attr(not(test), allow(dead_code))]
553 pub(crate) fn ownership(&self) -> &SessionOwnershipState {
554 &self.ownership
555 }
556
557 #[cfg(feature = "multi-thread")]
559 pub(crate) fn ownership_mut(&mut self) -> &mut SessionOwnershipState {
560 &mut self.ownership
561 }
562}
563
564#[derive(Debug, Deserialize)]
565struct SessionStateSerde {
566 sid: SessionId,
567 roles: Vec<String>,
568 local_types: BTreeMap<Endpoint, TypeEntry>,
569 buffers: BTreeMap<Edge, SignedBuffer<Signature>>,
570 auth_leaves: BTreeMap<Edge, Vec<Hash>>,
571 #[serde(default)]
572 auth_trees: BTreeMap<Edge, AuthTree>,
573 auth_roots: BTreeMap<Edge, Hash>,
574 edge_handlers: BTreeMap<Edge, HandlerId>,
575 #[serde(default = "default_handler_id")]
576 default_handler: HandlerId,
577 edge_traces: BTreeMap<Edge, Vec<ValType>>,
578 status: SessionStatus,
579 epoch: usize,
580 #[serde(default)]
581 ownership: SessionOwnershipState,
582}