Skip to main content

de_mls/app/user/
state.rs

1//! [`User`] struct definition, constructor, accessors, and the
2//! consensus-context helpers shared across the User submodules
3//! (`lifecycle`, `inbound`, `registry`).
4
5use std::{
6    collections::HashMap,
7    sync::{Arc, Mutex, RwLock},
8};
9
10use crate::{
11    app::{SessionRunner, UserError, UserPlugins},
12    core::{
13        ConsensusPlugin, ConversationLifecycle, ConversationPluginsFactory, PluginConsensus,
14        ScoringConfig, StewardListConfig,
15    },
16    ds::SharedDeliveryService,
17    identity::Identity,
18    mls_crypto::{KeyPackageBytes, MlsError},
19};
20
21/// Single registry entry: one `Arc<RwLock<SessionRunner>>` per conversation.
22/// Cloned out of the registry under the outer read lock, then locked
23/// independently — writes on one conversation don't block reads on another.
24pub type SessionEntry<P, CP> = Arc<RwLock<SessionRunner<P, CP>>>;
25
26/// Per-user registry of conversation runners. Each entry's inner per-runner
27/// lock guards per-conversation reads/mutations so a write on conversation
28/// A doesn't block reads on conversation B.
29pub(crate) type ConversationRegistry<P, CP> = RwLock<HashMap<String, SessionEntry<P, CP>>>;
30
31pub struct User<P: ConsensusPlugin, CP: ConversationPluginsFactory> {
32    /// Local identity, set once at construction and read-only thereafter.
33    /// Accessed via the [`Identity`] trait wherever bytes or display
34    /// form are needed; per-session cached `Arc<[u8]>` + `Arc<str>` are
35    /// extracted from it at [`SessionRunner`] construction in
36    /// [`super::lifecycle`].
37    pub(crate) identity: Box<dyn Identity>,
38    /// Per-instance UUID embedded in every outbound packet. Inbound packets
39    /// carrying our `app_id` are self-echoes and silently dropped.
40    pub(crate) app_id: Vec<u8>,
41    /// Synchronous outbound transport. Cloned into each `SessionRunner` at
42    /// construction. Stored behind a `Mutex` because the trait takes
43    /// `&mut self`.
44    pub(crate) transport: SharedDeliveryService,
45    /// All User-level plugin state: the per-conversation factory, the
46    /// consensus context, the key-package provider, and the three default
47    /// configs cloned into newly-created sessions.
48    pub(crate) plugins: UserPlugins<P, CP>,
49    /// Per-conversation `SessionRunner`s.
50    pub(crate) conversations: ConversationRegistry<P, CP>,
51    /// User-level conversation lifecycle events: `Created(name)` /
52    /// `Removed(name)`. Integrators drain via
53    /// [`Self::drain_lifecycle_events`] once per polling cycle to learn
54    /// when new sessions appear and old ones disappear. Interior `Mutex`
55    /// so producer-side methods stay `&self`.
56    pub(crate) pending_lifecycle_events: Mutex<Vec<ConversationLifecycle>>,
57}
58
59// ── Public API ──────────────────────────────────────────────────────────
60
61impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
62    /// Display form of the local identity, derived from
63    /// [`Identity::identity_display`]. Stable for the lifetime of the
64    /// `User`; intended for logs and UI.
65    pub fn identity_string(&self) -> String {
66        self.identity.identity_display().to_string()
67    }
68
69    /// Generate a single-use key package for our identity. Conversation-free —
70    /// callable before `start_conversation`. Delegates to the configured
71    /// [`crate::core::ConversationPluginsFactory::generate_key_package`].
72    pub fn generate_key_package(&self) -> Result<KeyPackageBytes, MlsError> {
73        self.plugins.conversation_plugins.generate_key_package()
74    }
75
76    /// Drain every pending [`ConversationLifecycle`] event accumulated
77    /// since the last call. Returns events in insertion order. Callers
78    /// (gateway, integrator) invoke this once per polling cycle to discover
79    /// `Created` / `Removed` sessions and wire up per-session event
80    /// drains via [`SessionRunner::drain_events`].
81    pub fn drain_lifecycle_events(&self) -> Vec<ConversationLifecycle> {
82        match self.pending_lifecycle_events.lock() {
83            Ok(mut buf) => std::mem::take(&mut *buf),
84            Err(_) => Vec::new(),
85        }
86    }
87
88    /// Override the seed [`ScoringConfig`] used for newly-created per-conversation
89    /// scoring plug-ins. Existing conversations are untouched; their plug-ins
90    /// already own their live config (joiner-side overwritten by ConversationSync).
91    pub fn set_default_scoring_config(&mut self, config: ScoringConfig) {
92        self.plugins.default_scoring_config = config;
93    }
94
95    /// Override the seed [`StewardListConfig`] used for newly-created
96    /// per-conversation steward-list plug-ins. Same lifecycle as
97    /// [`Self::set_default_scoring_config`].
98    pub fn set_default_steward_list_config(&mut self, config: StewardListConfig) {
99        self.plugins.default_steward_list_config = config;
100    }
101}
102
103// ── User-internal helpers ───────────────────────────────────────────────
104
105impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
106    /// Borrow the local identity bytes via the [`Identity`] trait.
107    pub(crate) fn self_identity(&self) -> &[u8] {
108        self.identity.identity_bytes()
109    }
110
111    /// Append a [`ConversationLifecycle`] event to the pending-events buffer
112    /// for [`Self::drain_lifecycle_events`]. Silent on poison —
113    /// emit is fire-and-forget.
114    pub(crate) fn emit_lifecycle(&self, event: ConversationLifecycle) {
115        if let Ok(mut buf) = self.pending_lifecycle_events.lock() {
116            buf.push(event);
117        }
118    }
119
120    /// Build a fresh per-conversation `ConsensusService` via the User's
121    /// `ConsensusContext`.
122    pub(crate) fn build_consensus_service(&self) -> PluginConsensus<P> {
123        self.plugins.consensus.build_service()
124    }
125
126    /// Drop this conversation's consensus scope from the shared storage and
127    /// clear every auto-vote registered for it. Called on leave and
128    /// pending-join timeout.
129    pub(crate) async fn cleanup_consensus_scope(
130        &self,
131        conversation_name: &str,
132    ) -> Result<(), UserError> {
133        if let Some(entry_arc) = self.lookup_entry(conversation_name)? {
134            entry_arc
135                .write()
136                .map_err(|_| UserError::LockPoisoned("session"))?
137                .cancel_all_auto_votes();
138        }
139        let scope = P::Scope::from(conversation_name.to_string());
140        self.plugins.consensus.delete_scope(&scope).await?;
141        Ok(())
142    }
143
144    pub fn new_with_plugins(
145        identity: Box<dyn Identity>,
146        plugins: UserPlugins<P, CP>,
147        transport: SharedDeliveryService,
148    ) -> Self {
149        Self {
150            identity,
151            app_id: uuid::Uuid::new_v4().as_bytes().to_vec(),
152            transport,
153            plugins,
154            conversations: RwLock::new(HashMap::new()),
155            pending_lifecycle_events: Mutex::new(Vec::new()),
156        }
157    }
158}