use std::sync::{Arc, RwLock};
use hashgraph_like_consensus::{storage::ConsensusStorage, types::ConsensusEvent};
use prost::Message;
use tracing::{error, info};
use crate::{
app::{ConversationState, LockExt, SessionRunner, UserError},
core::{
ConsensusApplyResult, ConsensusPlugin, ConversationPluginsFactory, PeerScoringPlugin,
ProposalKind, ScoreOp, SessionEvent, StewardListPlugin, apply_consensus_result,
emergency_score_ops, target_identity_of,
},
protos::de_mls::messages::v1::{
ConversationUpdateRequest, StewardElectionProposal, conversation_update_request,
},
};
impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
pub async fn apply_consensus_outcome(
arc: &Arc<RwLock<Self>>,
event: ConsensusEvent,
) -> Result<(), UserError> {
let (proposal_id, approved) = match &event {
ConsensusEvent::ConsensusReached {
proposal_id,
result,
..
} => (*proposal_id, *result),
ConsensusEvent::ConsensusFailed { proposal_id, .. } => (*proposal_id, false),
};
arc.write_or_err("session")?.cancel_auto_vote(proposal_id);
let already_applied = arc
.read_or_err("session")?
.handle
.conversation
.is_consensus_outcome_applied(proposal_id);
if already_applied {
let conv_name = arc.read_or_err("session")?.conversation_name.clone();
tracing::debug!(
conversation = %conv_name,
proposal_id,
"duplicate consensus outcome dropped"
);
return Ok(());
}
let (consensus, conversation_name) = {
let s = arc.read_or_err("session")?;
(s.consensus.clone(), s.conversation_name.clone())
};
let scope = P::Scope::from(conversation_name.clone());
let proposal = consensus
.storage()
.get_proposal(&scope, proposal_id)
.await?;
let payload = proposal.payload;
let consensus_apply = {
let mut s = arc.write_or_err("session")?;
info!(
conversation = %s.conversation_name,
proposal_id, approved, "consensus reached"
);
s.handle
.conversation
.mark_consensus_outcome_applied(proposal_id);
apply_consensus_result(&mut s.handle.conversation, proposal_id, approved, &payload)?
};
match consensus_apply {
ConsensusApplyResult::NoAction => {}
ConsensusApplyResult::ElectionAccepted(election) => {
return Self::handle_election_accepted(arc, election).await;
}
ConsensusApplyResult::RecoveryModeOpened => {
arc.write_or_err("session")?.handle.enter_recovery_mode();
Self::force_freezing_and_emit(arc)?;
}
ConsensusApplyResult::UrgentRemoval { target } => {
Self::force_freezing_and_emit(arc)?;
Self::refresh_stewards_after_removal(arc, &target).await?;
}
ConsensusApplyResult::QueuedRemoval { target } => {
Self::refresh_stewards_after_removal(arc, &target).await?;
}
}
if !approved && let Ok(req) = ConversationUpdateRequest::decode(payload.as_slice()) {
if ProposalKind::of(&req).is_steward_election() {
Self::handle_election_rejected(arc).await?;
} else if let Some(target) = target_identity_of(&req) {
let target = target.to_vec();
arc.write_or_err("session")?
.handle
.conversation
.remove_pending_update(&target);
}
}
arc.write_or_err("session")?
.unregister_consensus_timeout(proposal_id);
let score_ops = emergency_score_ops(&payload, approved);
if !score_ops.is_empty() {
Self::handle_emergency_scored(arc, proposal_id, &payload, &score_ops).await?;
}
Ok(())
}
fn force_freezing_and_emit(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
let event = arc.write_or_err("session")?.force_freezing();
if let Some(event) = event {
arc.read_or_err("session")?
.emit_event(SessionEvent::PhaseChange(event));
}
Ok(())
}
async fn refresh_stewards_after_removal(
arc: &Arc<RwLock<Self>>,
target: &[u8],
) -> Result<(), UserError> {
let target_was_steward = arc
.read_or_err("session")?
.handle
.steward_list
.is_steward(target);
if !target_was_steward {
return Ok(());
}
if let Err(e) = Self::try_initiate_steward_election(arc, true, Some(target)).await {
let conv_name = arc.read_or_err("session")?.conversation_name.clone();
info!(
conversation = %conv_name,
error = %e,
"post-removal steward-list refresh deferred"
);
}
Ok(())
}
async fn handle_election_accepted(
arc: &Arc<RwLock<Self>>,
election: StewardElectionProposal,
) -> Result<(), UserError> {
let is_valid = {
let s = arc.read_or_err("session")?;
s.handle.expect_mls()?;
s.handle.steward_list.validate_proposed(
&election.proposed_stewards,
election.election_epoch,
&election.proposed_stewards,
election.retry_round,
)?
};
if !is_valid {
let conv_name = arc.read_or_err("session")?.conversation_name.clone();
info!(
conversation = %conv_name,
"steward election rejected: invalid list"
);
return Ok(());
}
let resumed_from_reelection = {
let mut s = arc.write_or_err("session")?;
let _events = s.handle.steward_list.install_list(
election.election_epoch,
&election.proposed_stewards,
election.proposed_stewards.len(),
election.retry_round,
)?;
s.handle.exit_recovery_mode();
if s.handle.current_state() == ConversationState::Reelection {
Some(s.start_working())
} else {
None
}
};
if let Some(event) = resumed_from_reelection {
arc.read_or_err("session")?
.emit_event(SessionEvent::PhaseChange(event));
}
{
let s = arc.read_or_err("session")?;
info!(
conversation = %s.conversation_name,
epoch = election.election_epoch,
stewards = election.proposed_stewards.len(),
retry_round = election.retry_round,
"steward election applied"
);
}
Self::process_buffered_updates(arc).await
}
async fn handle_election_rejected(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
let (round, max) = {
let mut s = arc.write_or_err("session")?;
let _events = s.handle.steward_list.bump_retry();
(
s.handle.steward_list.retry_round(),
s.handle.steward_list.max_retries(),
)
};
let conversation_name = arc.read_or_err("session")?.conversation_name.clone();
if round > max {
info!(
conversation = %conversation_name,
round, max, "election retries exhausted; escalating to Layer 3"
);
if let Err(e) = Self::try_initiate_deadlock_ecp(arc).await {
error!(conversation = %conversation_name, error = %e, "Deadlock ECP filing failed");
arc.read_or_err("session")?.emit_event(SessionEvent::Error {
operation: "Reelection stuck".to_string(),
message: e.to_string(),
});
}
return Ok(());
}
info!(
conversation = %conversation_name,
round, max, "steward election rejected, retrying"
);
if let Err(e) = Self::try_initiate_steward_election(arc, true, None).await {
info!(conversation = %conversation_name, error = %e, "election retry deferred");
}
Ok(())
}
async fn handle_emergency_scored(
arc: &Arc<RwLock<Self>>,
proposal_id: u32,
payload: &[u8],
score_ops: &[ScoreOp],
) -> Result<(), UserError> {
{
let mut s = arc.write_or_err("session")?;
let _events = s.handle.scoring.apply_ops(score_ops);
if let Ok(req) = ConversationUpdateRequest::decode(payload)
&& let Some(conversation_update_request::Payload::EmergencyCriteria(ec)) =
&req.payload
&& let Some(ev) = &ec.evidence
{
s.handle
.conversation
.resolve_pending_removal(&ev.target_member_id);
}
}
let resumed_event = {
let mut s = arc.write_or_err("session")?;
s.handle.conversation.resolve_emergency(proposal_id);
if s.handle.current_state() == ConversationState::Reelection {
Some(s.start_working())
} else {
None
}
};
if let Some(event) = resumed_event {
arc.read_or_err("session")?
.emit_event(SessionEvent::PhaseChange(event));
}
if let Err(e) = Self::check_and_initiate_score_removals(arc).await {
let conv_name = arc.read_or_err("session")?.conversation_name.clone();
error!(conversation = %conv_name, error = %e, "score-removal check failed");
}
Ok(())
}
}