1use std::sync::{Arc, RwLock};
11
12use hashgraph_like_consensus::{storage::ConsensusStorage, types::ConsensusEvent};
13use prost::Message;
14use tracing::{error, info};
15
16use crate::{
17 app::{ConversationState, LockExt, SessionRunner, UserError},
18 core::{
19 ConsensusApplyResult, ConsensusPlugin, ConversationPluginsFactory, PeerScoringPlugin,
20 ProposalKind, ScoreOp, SessionEvent, StewardListPlugin, apply_consensus_result,
21 emergency_score_ops, target_identity_of,
22 },
23 protos::de_mls::messages::v1::{
24 ConversationUpdateRequest, StewardElectionProposal, conversation_update_request,
25 },
26};
27
28impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
29 pub async fn apply_consensus_outcome(
34 arc: &Arc<RwLock<Self>>,
35 event: ConsensusEvent,
36 ) -> Result<(), UserError> {
37 let (proposal_id, approved) = match &event {
38 ConsensusEvent::ConsensusReached {
39 proposal_id,
40 result,
41 ..
42 } => (*proposal_id, *result),
43 ConsensusEvent::ConsensusFailed { proposal_id, .. } => (*proposal_id, false),
44 };
45
46 arc.write_or_err("session")?.cancel_auto_vote(proposal_id);
48
49 let already_applied = arc
52 .read_or_err("session")?
53 .handle
54 .conversation
55 .is_consensus_outcome_applied(proposal_id);
56 if already_applied {
57 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
58 tracing::debug!(
59 conversation = %conv_name,
60 proposal_id,
61 "duplicate consensus outcome dropped"
62 );
63 return Ok(());
64 }
65
66 let (consensus, conversation_name) = {
68 let s = arc.read_or_err("session")?;
69 (s.consensus.clone(), s.conversation_name.clone())
70 };
71 let scope = P::Scope::from(conversation_name.clone());
72 let proposal = consensus
73 .storage()
74 .get_proposal(&scope, proposal_id)
75 .await?;
76 let payload = proposal.payload;
77
78 let consensus_apply = {
81 let mut s = arc.write_or_err("session")?;
82 info!(
83 conversation = %s.conversation_name,
84 proposal_id, approved, "consensus reached"
85 );
86 s.handle
87 .conversation
88 .mark_consensus_outcome_applied(proposal_id);
89 apply_consensus_result(&mut s.handle.conversation, proposal_id, approved, &payload)?
90 };
91
92 match consensus_apply {
93 ConsensusApplyResult::NoAction => {}
94 ConsensusApplyResult::ElectionAccepted(election) => {
95 return Self::handle_election_accepted(arc, election).await;
96 }
97 ConsensusApplyResult::RecoveryModeOpened => {
98 arc.write_or_err("session")?.handle.enter_recovery_mode();
99 Self::force_freezing_and_emit(arc)?;
100 }
101 ConsensusApplyResult::UrgentRemoval { target } => {
102 Self::force_freezing_and_emit(arc)?;
103 Self::refresh_stewards_after_removal(arc, &target).await?;
104 }
105 ConsensusApplyResult::QueuedRemoval { target } => {
106 Self::refresh_stewards_after_removal(arc, &target).await?;
107 }
108 }
109
110 if !approved && let Ok(req) = ConversationUpdateRequest::decode(payload.as_slice()) {
111 if ProposalKind::of(&req).is_steward_election() {
112 Self::handle_election_rejected(arc).await?;
113 } else if let Some(target) = target_identity_of(&req) {
114 let target = target.to_vec();
115 arc.write_or_err("session")?
116 .handle
117 .conversation
118 .remove_pending_update(&target);
119 }
120 }
121
122 arc.write_or_err("session")?
125 .unregister_consensus_timeout(proposal_id);
126
127 let score_ops = emergency_score_ops(&payload, approved);
128 if !score_ops.is_empty() {
129 Self::handle_emergency_scored(arc, proposal_id, &payload, &score_ops).await?;
130 }
131
132 Ok(())
133 }
134
135 fn force_freezing_and_emit(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
139 let event = arc.write_or_err("session")?.force_freezing();
140 if let Some(event) = event {
141 arc.read_or_err("session")?
142 .emit_event(SessionEvent::PhaseChange(event));
143 }
144 Ok(())
145 }
146
147 async fn refresh_stewards_after_removal(
150 arc: &Arc<RwLock<Self>>,
151 target: &[u8],
152 ) -> Result<(), UserError> {
153 let target_was_steward = arc
154 .read_or_err("session")?
155 .handle
156 .steward_list
157 .is_steward(target);
158 if !target_was_steward {
159 return Ok(());
160 }
161 if let Err(e) = Self::try_initiate_steward_election(arc, true, Some(target)).await {
162 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
163 info!(
164 conversation = %conv_name,
165 error = %e,
166 "post-removal steward-list refresh deferred"
167 );
168 }
169 Ok(())
170 }
171
172 async fn handle_election_accepted(
176 arc: &Arc<RwLock<Self>>,
177 election: StewardElectionProposal,
178 ) -> Result<(), UserError> {
179 let is_valid = {
180 let s = arc.read_or_err("session")?;
181 s.handle.expect_mls()?;
182 s.handle.steward_list.validate_proposed(
186 &election.proposed_stewards,
187 election.election_epoch,
188 &election.proposed_stewards,
189 election.retry_round,
190 )?
191 };
192 if !is_valid {
193 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
194 info!(
195 conversation = %conv_name,
196 "steward election rejected: invalid list"
197 );
198 return Ok(());
199 }
200
201 let resumed_from_reelection = {
202 let mut s = arc.write_or_err("session")?;
203 let _events = s.handle.steward_list.install_list(
204 election.election_epoch,
205 &election.proposed_stewards,
206 election.proposed_stewards.len(),
207 election.retry_round,
208 )?;
209 s.handle.exit_recovery_mode();
213 if s.handle.current_state() == ConversationState::Reelection {
214 Some(s.start_working())
215 } else {
216 None
217 }
218 };
219 if let Some(event) = resumed_from_reelection {
220 arc.read_or_err("session")?
221 .emit_event(SessionEvent::PhaseChange(event));
222 }
223 {
224 let s = arc.read_or_err("session")?;
225 info!(
226 conversation = %s.conversation_name,
227 epoch = election.election_epoch,
228 stewards = election.proposed_stewards.len(),
229 retry_round = election.retry_round,
230 "steward election applied"
231 );
232 }
233
234 Self::process_buffered_updates(arc).await
235 }
236
237 async fn handle_election_rejected(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
240 let (round, max) = {
241 let mut s = arc.write_or_err("session")?;
242 let _events = s.handle.steward_list.bump_retry();
243 (
244 s.handle.steward_list.retry_round(),
245 s.handle.steward_list.max_retries(),
246 )
247 };
248 let conversation_name = arc.read_or_err("session")?.conversation_name.clone();
249 if round > max {
250 info!(
251 conversation = %conversation_name,
252 round, max, "election retries exhausted; escalating to Layer 3"
253 );
254 if let Err(e) = Self::try_initiate_deadlock_ecp(arc).await {
255 error!(conversation = %conversation_name, error = %e, "Deadlock ECP filing failed");
256 arc.read_or_err("session")?.emit_event(SessionEvent::Error {
257 operation: "Reelection stuck".to_string(),
258 message: e.to_string(),
259 });
260 }
261 return Ok(());
262 }
263 info!(
264 conversation = %conversation_name,
265 round, max, "steward election rejected, retrying"
266 );
267 if let Err(e) = Self::try_initiate_steward_election(arc, true, None).await {
268 info!(conversation = %conversation_name, error = %e, "election retry deferred");
269 }
270 Ok(())
271 }
272
273 async fn handle_emergency_scored(
278 arc: &Arc<RwLock<Self>>,
279 proposal_id: u32,
280 payload: &[u8],
281 score_ops: &[ScoreOp],
282 ) -> Result<(), UserError> {
283 {
284 let mut s = arc.write_or_err("session")?;
285 let _events = s.handle.scoring.apply_ops(score_ops);
290 if let Ok(req) = ConversationUpdateRequest::decode(payload)
291 && let Some(conversation_update_request::Payload::EmergencyCriteria(ec)) =
292 &req.payload
293 && let Some(ev) = &ec.evidence
294 {
295 s.handle
296 .conversation
297 .resolve_pending_removal(&ev.target_member_id);
298 }
299 }
300
301 let resumed_event = {
302 let mut s = arc.write_or_err("session")?;
303 s.handle.conversation.resolve_emergency(proposal_id);
304 if s.handle.current_state() == ConversationState::Reelection {
305 Some(s.start_working())
306 } else {
307 None
308 }
309 };
310 if let Some(event) = resumed_event {
311 arc.read_or_err("session")?
312 .emit_event(SessionEvent::PhaseChange(event));
313 }
314
315 if let Err(e) = Self::check_and_initiate_score_removals(arc).await {
316 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
317 error!(conversation = %conv_name, error = %e, "score-removal check failed");
318 }
319 Ok(())
320 }
321}