Skip to main content

de_mls/app/user/
lifecycle.rs

1//! Create and leave operations for a conversation.
2
3use std::sync::{Arc, RwLock};
4
5use tracing::info;
6
7use crate::{
8    app::{ConversationState, LockExt, PhaseTimer, SessionRunner, User, UserError},
9    core::{
10        ConsensusPlugin, Conversation, ConversationConfig, ConversationLifecycle,
11        ConversationPluginsFactory, ConversationStateMachine, PeerScoringPlugin, SessionEvent,
12        StewardListPlugin,
13    },
14};
15
16impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
17    pub async fn start_conversation(
18        &mut self,
19        conversation_name: &str,
20        is_creation: bool,
21    ) -> Result<(), UserError> {
22        self.create_conversation_with_config(
23            conversation_name,
24            is_creation,
25            self.plugins.default_conversation_config.clone(),
26        )
27        .await
28    }
29
30    /// Like [`Self::start_conversation`] but with a per-conversation config override.
31    pub async fn create_conversation_with_config(
32        &mut self,
33        conversation_name: &str,
34        is_creation: bool,
35        config: ConversationConfig,
36    ) -> Result<(), UserError> {
37        if self
38            .conversations
39            .read()
40            .map_err(|_| UserError::LockPoisoned("conversation registry"))?
41            .contains_key(conversation_name)
42        {
43            return Err(UserError::ConversationAlreadyExists);
44        }
45
46        let self_identity_bytes = self.self_identity().to_vec();
47        let (conversation, mls_opt, state_machine, phase_timer) = if is_creation {
48            let mls = self
49                .plugins
50                .conversation_plugins
51                .create_mls(conversation_name.to_string())?;
52            let conversation = Conversation::new(conversation_name);
53            let state_machine = ConversationStateMachine::new_as_member();
54            (conversation, Some(mls), state_machine, PhaseTimer::new())
55        } else {
56            let conversation = Conversation::new(conversation_name);
57            let state_machine = ConversationStateMachine::new_as_pending_join();
58            // Anchor the timer at "now" so `is_pending_join_expired` can
59            // detect the 3× commit-inactivity timeout.
60            let mut phase_timer = PhaseTimer::new();
61            phase_timer.start();
62            (conversation, None, state_machine, phase_timer)
63        };
64
65        let mut steward_list = self.plugins.conversation_plugins.make_steward_list(
66            conversation_name.as_bytes(),
67            self.plugins.default_steward_list_config.clone(),
68        );
69        steward_list.set_max_retries(config.max_reelection_attempts);
70        // Creator path: bootstrap the list with self as sole steward at
71        // epoch 0. Joiner path leaves the plug-in empty until `ConversationSync`.
72        if is_creation {
73            let _events =
74                steward_list.install_list(0, std::slice::from_ref(&self_identity_bytes), 1, 0)?;
75        }
76
77        let mut scoring = self
78            .plugins
79            .conversation_plugins
80            .make_scoring(&self.plugins.default_scoring_config);
81        // Joiners get tracked at `JoinedConversation` time, once members are known.
82        if is_creation {
83            // Creator is self at `default_score`; under standard config
84            // (`default > threshold`) no cross event fires, so we drop
85            // the return value here.
86            let _events = scoring.add_member(&self_identity_bytes);
87        }
88
89        let initial_state = state_machine.current_state();
90        if initial_state == ConversationState::PendingJoin {
91            info!(
92                conversation = conversation_name,
93                timeout_s = config.commit_inactivity_duration.as_secs() * 3,
94                "pending join, awaiting welcome"
95            );
96        }
97        let consensus = self.build_consensus_service();
98        let session = Arc::new(RwLock::new(SessionRunner::new(
99            conversation_name.to_string(),
100            conversation,
101            mls_opt,
102            state_machine,
103            phase_timer,
104            config,
105            scoring,
106            steward_list,
107            consensus,
108            Arc::clone(&self.transport),
109            Arc::from(self.identity.identity_bytes()),
110            Arc::from(self.identity.identity_display()),
111            Arc::from(self.app_id.as_slice()),
112        )));
113        {
114            let mut conversations = self
115                .conversations
116                .write()
117                .map_err(|_| UserError::LockPoisoned("conversation registry"))?;
118            if conversations.contains_key(conversation_name) {
119                return Err(UserError::ConversationAlreadyExists);
120            }
121            conversations.insert(conversation_name.to_string(), Arc::clone(&session));
122        }
123
124        // Record the lifecycle event first so integrators draining
125        // [`User::drain_lifecycle_events`] see `Created` before any
126        // per-session event emitted below.
127        self.emit_lifecycle(ConversationLifecycle::Created(
128            conversation_name.to_string(),
129        ));
130        session
131            .read_or_err("session")?
132            .emit_event(SessionEvent::PhaseChange(initial_state));
133
134        Ok(())
135    }
136
137    /// Leave the conversation. `PendingJoin` short-circuits to local
138    /// teardown (no MLS state yet). Otherwise delegates the protocol work
139    /// to the session-side `initiate_self_leave` — opens a self-leave
140    /// consensus session with the leaver's YES bundled at submit. We stay
141    /// active until the next steward commit merges the removal; on that
142    /// commit `ProcessResult::LeaveConversation` fires.
143    pub async fn leave_conversation(&mut self, conversation_name: &str) -> Result<(), UserError> {
144        info!(conversation = conversation_name, "leaving conversation");
145
146        let entry_arc = self
147            .lookup_entry(conversation_name)?
148            .ok_or(UserError::ConversationNotFound)?;
149
150        let is_pending_join = entry_arc.read_or_err("session")?.handle.current_state()
151            == ConversationState::PendingJoin;
152        if is_pending_join {
153            entry_arc
154                .read_or_err("session")?
155                .emit_event(SessionEvent::Leaving);
156            // Cancel auto-vote timers before removing the registry entry —
157            // see `finalize_self_leave` for the rationale.
158            self.cleanup_consensus_scope(conversation_name).await?;
159            self.conversations
160                .write()
161                .map_err(|_| UserError::LockPoisoned("conversation registry"))?
162                .remove(conversation_name);
163            self.emit_lifecycle(ConversationLifecycle::Removed(
164                conversation_name.to_string(),
165            ));
166            return Ok(());
167        }
168
169        SessionRunner::initiate_self_leave(&entry_arc).await
170    }
171}