de_mls/app/user/state.rs
1//! [`User`] struct definition, constructor, accessors, and the
2//! consensus-context helpers shared across the User submodules
3//! (`lifecycle`, `inbound`, `registry`).
4
5use std::{
6 collections::HashMap,
7 sync::{Arc, Mutex, RwLock},
8};
9
10use crate::{
11 app::{SessionRunner, UserError, UserPlugins},
12 core::{
13 ConsensusPlugin, ConversationLifecycle, ConversationPluginsFactory, PluginConsensus,
14 ScoringConfig, StewardListConfig,
15 },
16 ds::SharedDeliveryService,
17 identity::Identity,
18 mls_crypto::{KeyPackageBytes, MlsError},
19};
20
21/// Single registry entry: one `Arc<RwLock<SessionRunner>>` per conversation.
22/// Cloned out of the registry under the outer read lock, then locked
23/// independently — writes on one conversation don't block reads on another.
24pub type SessionEntry<P, CP> = Arc<RwLock<SessionRunner<P, CP>>>;
25
26/// Per-user registry of conversation runners. Each entry's inner per-runner
27/// lock guards per-conversation reads/mutations so a write on conversation
28/// A doesn't block reads on conversation B.
29pub(crate) type ConversationRegistry<P, CP> = RwLock<HashMap<String, SessionEntry<P, CP>>>;
30
31pub struct User<P: ConsensusPlugin, CP: ConversationPluginsFactory> {
32 /// Local identity, set once at construction and read-only thereafter.
33 /// Accessed via the [`Identity`] trait wherever bytes or display
34 /// form are needed; per-session cached `Arc<[u8]>` + `Arc<str>` are
35 /// extracted from it at [`SessionRunner`] construction in
36 /// [`super::lifecycle`].
37 pub(crate) identity: Box<dyn Identity>,
38 /// Per-instance UUID embedded in every outbound packet. Inbound packets
39 /// carrying our `app_id` are self-echoes and silently dropped.
40 pub(crate) app_id: Vec<u8>,
41 /// Synchronous outbound transport. Cloned into each `SessionRunner` at
42 /// construction. Stored behind a `Mutex` because the trait takes
43 /// `&mut self`.
44 pub(crate) transport: SharedDeliveryService,
45 /// All User-level plugin state: the per-conversation factory, the
46 /// consensus context, the key-package provider, and the three default
47 /// configs cloned into newly-created sessions.
48 pub(crate) plugins: UserPlugins<P, CP>,
49 /// Per-conversation `SessionRunner`s.
50 pub(crate) conversations: ConversationRegistry<P, CP>,
51 /// User-level conversation lifecycle events: `Created(name)` /
52 /// `Removed(name)`. Integrators drain via
53 /// [`Self::drain_lifecycle_events`] once per polling cycle to learn
54 /// when new sessions appear and old ones disappear. Interior `Mutex`
55 /// so producer-side methods stay `&self`.
56 pub(crate) pending_lifecycle_events: Mutex<Vec<ConversationLifecycle>>,
57}
58
59// ── Public API ──────────────────────────────────────────────────────────
60
61impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
62 /// Display form of the local identity, derived from
63 /// [`Identity::identity_display`]. Stable for the lifetime of the
64 /// `User`; intended for logs and UI.
65 pub fn identity_string(&self) -> String {
66 self.identity.identity_display().to_string()
67 }
68
69 /// Generate a single-use key package for our identity. Conversation-free —
70 /// callable before `start_conversation`. Delegates to the configured
71 /// [`crate::core::ConversationPluginsFactory::generate_key_package`].
72 pub fn generate_key_package(&self) -> Result<KeyPackageBytes, MlsError> {
73 self.plugins.conversation_plugins.generate_key_package()
74 }
75
76 /// Drain every pending [`ConversationLifecycle`] event accumulated
77 /// since the last call. Returns events in insertion order. Callers
78 /// (gateway, integrator) invoke this once per polling cycle to discover
79 /// `Created` / `Removed` sessions and wire up per-session event
80 /// drains via [`SessionRunner::drain_events`].
81 pub fn drain_lifecycle_events(&self) -> Vec<ConversationLifecycle> {
82 match self.pending_lifecycle_events.lock() {
83 Ok(mut buf) => std::mem::take(&mut *buf),
84 Err(_) => Vec::new(),
85 }
86 }
87
88 /// Override the seed [`ScoringConfig`] used for newly-created per-conversation
89 /// scoring plug-ins. Existing conversations are untouched; their plug-ins
90 /// already own their live config (joiner-side overwritten by ConversationSync).
91 pub fn set_default_scoring_config(&mut self, config: ScoringConfig) {
92 self.plugins.default_scoring_config = config;
93 }
94
95 /// Override the seed [`StewardListConfig`] used for newly-created
96 /// per-conversation steward-list plug-ins. Same lifecycle as
97 /// [`Self::set_default_scoring_config`].
98 pub fn set_default_steward_list_config(&mut self, config: StewardListConfig) {
99 self.plugins.default_steward_list_config = config;
100 }
101}
102
103// ── User-internal helpers ───────────────────────────────────────────────
104
105impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
106 /// Borrow the local identity bytes via the [`Identity`] trait.
107 pub(crate) fn self_identity(&self) -> &[u8] {
108 self.identity.identity_bytes()
109 }
110
111 /// Append a [`ConversationLifecycle`] event to the pending-events buffer
112 /// for [`Self::drain_lifecycle_events`]. Silent on poison —
113 /// emit is fire-and-forget.
114 pub(crate) fn emit_lifecycle(&self, event: ConversationLifecycle) {
115 if let Ok(mut buf) = self.pending_lifecycle_events.lock() {
116 buf.push(event);
117 }
118 }
119
120 /// Build a fresh per-conversation `ConsensusService` via the User's
121 /// `ConsensusContext`.
122 pub(crate) fn build_consensus_service(&self) -> PluginConsensus<P> {
123 self.plugins.consensus.build_service()
124 }
125
126 /// Drop this conversation's consensus scope from the shared storage and
127 /// clear every auto-vote registered for it. Called on leave and
128 /// pending-join timeout.
129 pub(crate) async fn cleanup_consensus_scope(
130 &self,
131 conversation_name: &str,
132 ) -> Result<(), UserError> {
133 if let Some(entry_arc) = self.lookup_entry(conversation_name)? {
134 entry_arc
135 .write()
136 .map_err(|_| UserError::LockPoisoned("session"))?
137 .cancel_all_auto_votes();
138 }
139 let scope = P::Scope::from(conversation_name.to_string());
140 self.plugins.consensus.delete_scope(&scope).await?;
141 Ok(())
142 }
143
144 pub fn new_with_plugins(
145 identity: Box<dyn Identity>,
146 plugins: UserPlugins<P, CP>,
147 transport: SharedDeliveryService,
148 ) -> Self {
149 Self {
150 identity,
151 app_id: uuid::Uuid::new_v4().as_bytes().to_vec(),
152 transport,
153 plugins,
154 conversations: RwLock::new(HashMap::new()),
155 pending_lifecycle_events: Mutex::new(Vec::new()),
156 }
157 }
158}