use std::sync::{Arc, RwLock};
use tracing::info;
use crate::{
app::{ConversationState, LockExt, PhaseTimer, SessionRunner, User, UserError},
core::{
ConsensusPlugin, Conversation, ConversationConfig, ConversationLifecycle,
ConversationPluginsFactory, ConversationStateMachine, PeerScoringPlugin, SessionEvent,
StewardListPlugin,
},
};
impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
pub async fn start_conversation(
&mut self,
conversation_name: &str,
is_creation: bool,
) -> Result<(), UserError> {
self.create_conversation_with_config(
conversation_name,
is_creation,
self.plugins.default_conversation_config.clone(),
)
.await
}
pub async fn create_conversation_with_config(
&mut self,
conversation_name: &str,
is_creation: bool,
config: ConversationConfig,
) -> Result<(), UserError> {
if self
.conversations
.read()
.map_err(|_| UserError::LockPoisoned("conversation registry"))?
.contains_key(conversation_name)
{
return Err(UserError::ConversationAlreadyExists);
}
let self_identity_bytes = self.self_identity().to_vec();
let (conversation, mls_opt, state_machine, phase_timer) = if is_creation {
let mls = self
.plugins
.conversation_plugins
.create_mls(conversation_name.to_string())?;
let conversation = Conversation::new(conversation_name);
let state_machine = ConversationStateMachine::new_as_member();
(conversation, Some(mls), state_machine, PhaseTimer::new())
} else {
let conversation = Conversation::new(conversation_name);
let state_machine = ConversationStateMachine::new_as_pending_join();
let mut phase_timer = PhaseTimer::new();
phase_timer.start();
(conversation, None, state_machine, phase_timer)
};
let mut steward_list = self.plugins.conversation_plugins.make_steward_list(
conversation_name.as_bytes(),
self.plugins.default_steward_list_config.clone(),
);
steward_list.set_max_retries(config.max_reelection_attempts);
if is_creation {
let _events =
steward_list.install_list(0, std::slice::from_ref(&self_identity_bytes), 1, 0)?;
}
let mut scoring = self
.plugins
.conversation_plugins
.make_scoring(&self.plugins.default_scoring_config);
if is_creation {
let _events = scoring.add_member(&self_identity_bytes);
}
let initial_state = state_machine.current_state();
if initial_state == ConversationState::PendingJoin {
info!(
conversation = conversation_name,
timeout_s = config.commit_inactivity_duration.as_secs() * 3,
"pending join, awaiting welcome"
);
}
let consensus = self.build_consensus_service();
let session = Arc::new(RwLock::new(SessionRunner::new(
conversation_name.to_string(),
conversation,
mls_opt,
state_machine,
phase_timer,
config,
scoring,
steward_list,
consensus,
Arc::clone(&self.transport),
Arc::from(self.identity.identity_bytes()),
Arc::from(self.identity.identity_display()),
Arc::from(self.app_id.as_slice()),
)));
{
let mut conversations = self
.conversations
.write()
.map_err(|_| UserError::LockPoisoned("conversation registry"))?;
if conversations.contains_key(conversation_name) {
return Err(UserError::ConversationAlreadyExists);
}
conversations.insert(conversation_name.to_string(), Arc::clone(&session));
}
self.emit_lifecycle(ConversationLifecycle::Created(
conversation_name.to_string(),
));
session
.read_or_err("session")?
.emit_event(SessionEvent::PhaseChange(initial_state));
Ok(())
}
pub async fn leave_conversation(&mut self, conversation_name: &str) -> Result<(), UserError> {
info!(conversation = conversation_name, "leaving conversation");
let entry_arc = self
.lookup_entry(conversation_name)?
.ok_or(UserError::ConversationNotFound)?;
let is_pending_join = entry_arc.read_or_err("session")?.handle.current_state()
== ConversationState::PendingJoin;
if is_pending_join {
entry_arc
.read_or_err("session")?
.emit_event(SessionEvent::Leaving);
self.cleanup_consensus_scope(conversation_name).await?;
self.conversations
.write()
.map_err(|_| UserError::LockPoisoned("conversation registry"))?
.remove(conversation_name);
self.emit_lifecycle(ConversationLifecycle::Removed(
conversation_name.to_string(),
));
return Ok(());
}
SessionRunner::initiate_self_leave(&entry_arc).await
}
}