de_mls/app/user/
lifecycle.rs1use std::sync::{Arc, RwLock};
4
5use tracing::info;
6
7use crate::{
8 app::{ConversationState, LockExt, PhaseTimer, SessionRunner, User, UserError},
9 core::{
10 ConsensusPlugin, Conversation, ConversationConfig, ConversationLifecycle,
11 ConversationPluginsFactory, ConversationStateMachine, PeerScoringPlugin, SessionEvent,
12 StewardListPlugin,
13 },
14};
15
16impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
17 pub async fn start_conversation(
18 &mut self,
19 conversation_name: &str,
20 is_creation: bool,
21 ) -> Result<(), UserError> {
22 self.create_conversation_with_config(
23 conversation_name,
24 is_creation,
25 self.plugins.default_conversation_config.clone(),
26 )
27 .await
28 }
29
30 pub async fn create_conversation_with_config(
32 &mut self,
33 conversation_name: &str,
34 is_creation: bool,
35 config: ConversationConfig,
36 ) -> Result<(), UserError> {
37 if self
38 .conversations
39 .read()
40 .map_err(|_| UserError::LockPoisoned("conversation registry"))?
41 .contains_key(conversation_name)
42 {
43 return Err(UserError::ConversationAlreadyExists);
44 }
45
46 let self_identity_bytes = self.self_identity().to_vec();
47 let (conversation, mls_opt, state_machine, phase_timer) = if is_creation {
48 let mls = self
49 .plugins
50 .conversation_plugins
51 .create_mls(conversation_name.to_string())?;
52 let conversation = Conversation::new(conversation_name);
53 let state_machine = ConversationStateMachine::new_as_member();
54 (conversation, Some(mls), state_machine, PhaseTimer::new())
55 } else {
56 let conversation = Conversation::new(conversation_name);
57 let state_machine = ConversationStateMachine::new_as_pending_join();
58 let mut phase_timer = PhaseTimer::new();
61 phase_timer.start();
62 (conversation, None, state_machine, phase_timer)
63 };
64
65 let mut steward_list = self.plugins.conversation_plugins.make_steward_list(
66 conversation_name.as_bytes(),
67 self.plugins.default_steward_list_config.clone(),
68 );
69 steward_list.set_max_retries(config.max_reelection_attempts);
70 if is_creation {
73 let _events =
74 steward_list.install_list(0, std::slice::from_ref(&self_identity_bytes), 1, 0)?;
75 }
76
77 let mut scoring = self
78 .plugins
79 .conversation_plugins
80 .make_scoring(&self.plugins.default_scoring_config);
81 if is_creation {
83 let _events = scoring.add_member(&self_identity_bytes);
87 }
88
89 let initial_state = state_machine.current_state();
90 if initial_state == ConversationState::PendingJoin {
91 info!(
92 conversation = conversation_name,
93 timeout_s = config.commit_inactivity_duration.as_secs() * 3,
94 "pending join, awaiting welcome"
95 );
96 }
97 let consensus = self.build_consensus_service();
98 let session = Arc::new(RwLock::new(SessionRunner::new(
99 conversation_name.to_string(),
100 conversation,
101 mls_opt,
102 state_machine,
103 phase_timer,
104 config,
105 scoring,
106 steward_list,
107 consensus,
108 Arc::clone(&self.transport),
109 Arc::from(self.identity.identity_bytes()),
110 Arc::from(self.identity.identity_display()),
111 Arc::from(self.app_id.as_slice()),
112 )));
113 {
114 let mut conversations = self
115 .conversations
116 .write()
117 .map_err(|_| UserError::LockPoisoned("conversation registry"))?;
118 if conversations.contains_key(conversation_name) {
119 return Err(UserError::ConversationAlreadyExists);
120 }
121 conversations.insert(conversation_name.to_string(), Arc::clone(&session));
122 }
123
124 self.emit_lifecycle(ConversationLifecycle::Created(
128 conversation_name.to_string(),
129 ));
130 session
131 .read_or_err("session")?
132 .emit_event(SessionEvent::PhaseChange(initial_state));
133
134 Ok(())
135 }
136
137 pub async fn leave_conversation(&mut self, conversation_name: &str) -> Result<(), UserError> {
144 info!(conversation = conversation_name, "leaving conversation");
145
146 let entry_arc = self
147 .lookup_entry(conversation_name)?
148 .ok_or(UserError::ConversationNotFound)?;
149
150 let is_pending_join = entry_arc.read_or_err("session")?.handle.current_state()
151 == ConversationState::PendingJoin;
152 if is_pending_join {
153 entry_arc
154 .read_or_err("session")?
155 .emit_event(SessionEvent::Leaving);
156 self.cleanup_consensus_scope(conversation_name).await?;
159 self.conversations
160 .write()
161 .map_err(|_| UserError::LockPoisoned("conversation registry"))?
162 .remove(conversation_name);
163 self.emit_lifecycle(ConversationLifecycle::Removed(
164 conversation_name.to_string(),
165 ));
166 return Ok(());
167 }
168
169 SessionRunner::initiate_self_leave(&entry_arc).await
170 }
171}