1use std::sync::{Arc, RwLock};
10
11use hashgraph_like_consensus::{error::ConsensusError, storage::ConsensusStorage};
12use prost::Message;
13use tracing::{error, info};
14
15use crate::{
16 app::{
17 ConversationState, LockExt, SessionRunner, UserError,
18 session::{
19 consensus_bridge::{
20 ProposalParams, cast_vote, submit_proposal, submit_self_leave_proposal,
21 },
22 runner::send_packet,
23 },
24 },
25 core::{
26 ConsensusPlugin, ConversationPluginsFactory, ProposalKind, SessionEvent, StewardListPlugin,
27 self_leave_proposal_id, target_identity_of,
28 },
29 mls_crypto::MlsService,
30 protos::de_mls::messages::v1::{
31 AppMessage, ConversationUpdateRequest, RemoveMember, VotePayload,
32 conversation_update_request,
33 },
34};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum CreatorVote {
40 Yes,
45 Deferred,
50}
51
52struct NewProposal {
54 request: ConversationUpdateRequest,
55 expected_voters: u32,
56 kind: ProposalKind,
57 creator_vote: CreatorVote,
58}
59
60pub(crate) fn build_vote_banner_event(
65 conversation_name: &str,
66 proposal_id: u32,
67 payload: Vec<u8>,
68) -> AppMessage {
69 VotePayload {
70 conversation_id: conversation_name.to_string(),
71 proposal_id,
72 payload,
73 timestamp: std::time::SystemTime::now()
74 .duration_since(std::time::UNIX_EPOCH)
75 .map(|d| d.as_secs())
76 .unwrap_or(0),
77 }
78 .into()
79}
80
81impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
82 pub async fn initiate_proposal(
96 arc: &Arc<RwLock<Self>>,
97 request: ConversationUpdateRequest,
98 creator_vote: CreatorVote,
99 ) -> Result<(), UserError> {
100 let kind = ProposalKind::of(&request);
101 let expected_voters = arc.read_or_err("session")?.check_proposal_allowed(kind)?;
102 Self::register_new_proposal(
103 arc,
104 NewProposal {
105 request,
106 expected_voters,
107 kind,
108 creator_vote,
109 },
110 )
111 .await?;
112 Ok(())
113 }
114
115 pub async fn handle_incoming_update_request(
120 arc: &Arc<RwLock<Self>>,
121 request: ConversationUpdateRequest,
122 ) -> Result<(), UserError> {
123 let (pending_join, members_for_rotation, current_epoch) = {
124 let s = arc.read_or_err("session")?;
125 let pending = s.handle.current_state() == ConversationState::PendingJoin;
126 match (pending, s.handle.mls()) {
127 (true, _) | (false, None) => (pending, Vec::new(), 0u64),
128 (false, Some(mls)) => (false, mls.members()?, mls.current_epoch()?),
129 }
130 };
131 if pending_join {
132 return Ok(());
133 }
134
135 let (inserted, is_epoch_steward, state, buffer_total, should_propose, conversation_name) = {
136 let mut s = arc.write_or_err("session")?;
137
138 if target_identity_of(&request).is_none() {
140 return Ok(());
141 }
142
143 let inserted = s
144 .handle
145 .conversation
146 .buffer_pending_update(request.clone(), current_epoch);
147
148 let self_identity = Arc::clone(&s.self_identity);
151 let eligible = s
152 .handle
153 .conversation
154 .steward_eligibility(&members_for_rotation);
155 let is_es = s
156 .handle
157 .steward_list
158 .epoch_steward(current_epoch, &eligible)
159 .is_some_and(|es| es == &*self_identity);
160 let state = s.handle.current_state();
161 let total = s.handle.conversation.pending_update_count();
162 let should = is_es && state == ConversationState::Working;
163 let name = s.conversation_name.clone();
164 (inserted, is_es, state, total, should, name)
165 };
166
167 info!(
168 conversation = %conversation_name,
169 epoch = current_epoch,
170 inserted,
171 buffer_total,
172 is_epoch_steward,
173 state = %state,
174 propose = should_propose,
175 "update request buffered"
176 );
177
178 if should_propose {
179 if let Err(e) = Self::initiate_proposal(arc, request, CreatorVote::Deferred).await {
185 info!(conversation = %conversation_name, error = %e, "proposal deferred");
186 }
187 }
188 Ok(())
189 }
190
191 pub async fn process_user_vote(
195 arc: &Arc<RwLock<Self>>,
196 proposal_id: u32,
197 vote: bool,
198 ) -> Result<(), UserError> {
199 let (consensus, conversation_name) = {
200 let s = arc.read_or_err("session")?;
201 let state = s.handle.current_state();
202 if state == ConversationState::Freezing || state == ConversationState::Selection {
203 return Err(UserError::ConversationBlocked(state.to_string()));
204 }
205 (s.consensus.clone(), s.conversation_name.clone())
206 };
207
208 arc.write_or_err("session")?.cancel_auto_vote(proposal_id);
210
211 let app_message = cast_vote::<P>(&conversation_name, proposal_id, vote, &consensus).await?;
212 let packet = {
213 let mut s = arc.write_or_err("session")?;
214 let app_id = Arc::clone(&s.app_id);
215 s.handle
216 .expect_mls_mut()?
217 .build_message(&app_message, &app_id)?
218 };
219 let transport = Arc::clone(arc.read_or_err("session")?.transport());
220 send_packet(&transport, packet)?;
221 Ok(())
222 }
223
224 pub async fn tick_deadlines(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
229 let now = std::time::Instant::now();
230 let (auto_votes_due, timeouts_due) = {
231 let mut s = arc.write_or_err("session")?;
232 let auto_votes: Vec<(u32, bool)> = s
233 .pending_auto_votes
234 .iter()
235 .filter(|(_, e)| e.fire_at <= now)
236 .map(|(id, e)| (*id, e.vote))
237 .collect();
238 for (id, _) in &auto_votes {
239 s.pending_auto_votes.remove(id);
240 }
241 let timeouts: Vec<u32> = s
242 .pending_consensus_timeouts
243 .iter()
244 .filter(|(_, fire_at)| **fire_at <= now)
245 .map(|(id, _)| *id)
246 .collect();
247 for id in &timeouts {
248 s.pending_consensus_timeouts.remove(id);
249 }
250 (auto_votes, timeouts)
251 };
252
253 for (proposal_id, vote) in auto_votes_due {
254 if let Err(e) = Self::cast_auto_vote(arc, proposal_id, vote).await {
255 tracing::debug!(
256 proposal_id,
257 error = %e,
258 "auto-vote skipped (already voted or session resolved)"
259 );
260 }
261 }
262 for proposal_id in timeouts_due {
263 Self::resolve_on_timeout(arc, proposal_id).await;
264 }
265
266 Ok(())
267 }
268
269 pub(crate) async fn initiate_self_leave(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
279 let self_identity = Arc::clone(&arc.read_or_err("session")?.self_identity);
280
281 let (already_pending, conversation_name) = {
282 let s = arc.read_or_err("session")?;
283 (
284 s.handle.conversation.is_pending_self_leave(&self_identity),
285 s.conversation_name.clone(),
286 )
287 };
288 if already_pending {
289 info!(
290 conversation = %conversation_name,
291 "self-leave already in flight, ignoring duplicate"
292 );
293 return Ok(());
294 }
295
296 let request = ConversationUpdateRequest {
297 payload: Some(conversation_update_request::Payload::RemoveMember(
298 RemoveMember {
299 identity: self_identity.to_vec(),
300 },
301 )),
302 };
303 let proposal_id = self_leave_proposal_id(&self_identity);
304
305 let (consensus, proposal_expiration, consensus_timeout) = {
310 let mut s = arc.write_or_err("session")?;
311 s.handle
312 .conversation
313 .store_voting_proposal(proposal_id, request);
314 (
315 s.consensus.clone(),
316 s.handle.config.proposal_expiration,
317 s.handle.config.consensus_timeout,
318 )
319 };
320
321 let submitted = submit_self_leave_proposal::<P>(
322 &conversation_name,
323 &self_identity,
324 &consensus,
325 ProposalParams {
326 expected_voters: 1,
327 proposal_expiration,
328 consensus_timeout,
329 liveness_criteria_yes: true,
330 },
331 )
332 .await?;
333
334 let Some((_proposal_id, app_msg)) = submitted else {
337 return Ok(());
338 };
339
340 let packet = {
341 let mut s = arc.write_or_err("session")?;
342 let app_id = Arc::clone(&s.app_id);
343 s.handle
344 .expect_mls_mut()?
345 .build_message(&app_msg, &app_id)?
346 };
347 let transport = Arc::clone(arc.read_or_err("session")?.transport());
348 send_packet(&transport, packet)?;
349 Ok(())
350 }
351
352 fn check_proposal_allowed(&self, kind: ProposalKind) -> Result<u32, UserError> {
357 let state = self.handle.current_state();
358
359 match state {
360 ConversationState::Reelection => {
361 if !kind.is_emergency() && !kind.is_steward_election() {
362 return Err(UserError::ConversationBlocked(state.to_string()));
363 }
364 if self.handle.conversation.partial_freeze_blocks(kind) {
365 return Err(UserError::PartialFreeze);
366 }
367 }
368 ConversationState::Freezing | ConversationState::Selection => {
369 return Err(UserError::ConversationBlocked(state.to_string()));
370 }
371 _ => {
372 if self.handle.conversation.partial_freeze_blocks(kind) {
373 return Err(UserError::PartialFreeze);
374 }
375 }
376 }
377
378 let members = self.handle.expect_mls()?.members()?;
379 Ok(members.len() as u32)
380 }
381
382 async fn register_new_proposal(
393 arc: &Arc<RwLock<Self>>,
394 np: NewProposal,
395 ) -> Result<u32, UserError> {
396 let NewProposal {
397 request,
398 expected_voters,
399 kind,
400 creator_vote,
401 } = np;
402
403 let (
404 proposal_expiration,
405 consensus_timeout,
406 liveness_criteria_yes,
407 voting_delay,
408 consensus,
409 conversation_name,
410 self_identity,
411 ) = {
412 let s = arc.read_or_err("session")?;
413 (
414 s.handle.config.proposal_expiration,
415 s.handle.config.consensus_timeout,
416 s.handle.config.liveness_criteria_yes,
417 s.handle.config.voting_delay_for(kind),
418 s.consensus.clone(),
419 s.conversation_name.clone(),
420 Arc::clone(&s.self_identity),
421 )
422 };
423
424 let (proposal_id, unbundled) = submit_proposal::<P>(
425 &conversation_name,
426 &request,
427 &self_identity,
428 &consensus,
429 ProposalParams {
430 expected_voters,
431 proposal_expiration,
432 consensus_timeout,
433 liveness_criteria_yes,
434 },
435 )
436 .await?;
437
438 {
439 let mut s = arc.write_or_err("session")?;
440 s.handle
441 .conversation
442 .store_voting_proposal(proposal_id, request.clone());
443 if kind.is_emergency() {
444 s.handle.conversation.observe_emergency(proposal_id);
445 }
446 s.register_consensus_timeout(proposal_id, consensus_timeout);
451 }
452
453 match creator_vote {
454 CreatorVote::Yes => {
455 let scope = P::Scope::from(conversation_name.clone());
461 let proposal = consensus
462 .cast_vote_and_get_proposal(&scope, proposal_id, true)
463 .await?;
464 info!(
465 conversation = %conversation_name,
466 proposal_id,
467 actor = "owner",
468 "YES vote cast (bundled at submit)"
469 );
470 let outbound: AppMessage = proposal.into();
471 let packet = {
472 let mut s = arc.write_or_err("session")?;
473 let app_id = Arc::clone(&s.app_id);
474 s.handle
475 .expect_mls_mut()?
476 .build_message(&outbound, &app_id)?
477 };
478 let transport = Arc::clone(arc.read_or_err("session")?.transport());
479 send_packet(&transport, packet)?;
480 arc.read_or_err("session")?
482 .emit_event(SessionEvent::OwnProposalSubmitted {
483 proposal_id,
484 request,
485 });
486 }
487 CreatorVote::Deferred => {
488 let packet = {
492 let mut s = arc.write_or_err("session")?;
493 let app_id = Arc::clone(&s.app_id);
494 s.handle
495 .expect_mls_mut()?
496 .build_message(&unbundled, &app_id)?
497 };
498 let transport = Arc::clone(arc.read_or_err("session")?.transport());
499 send_packet(&transport, packet)?;
500 let banner = build_vote_banner_event(
501 &conversation_name,
502 proposal_id,
503 request.encode_to_vec(),
504 );
505 arc.read_or_err("session")?
506 .emit_event(SessionEvent::AppMessage(banner));
507 arc.write_or_err("session")?.register_auto_vote(
508 proposal_id,
509 voting_delay,
510 liveness_criteria_yes,
511 );
512 }
513 }
514
515 Ok(proposal_id)
516 }
517
518 async fn resolve_on_timeout(arc: &Arc<RwLock<Self>>, proposal_id: u32) {
529 let (consensus, conversation_name) = match arc.read_or_err("session") {
530 Ok(s) => (s.consensus.clone(), s.conversation_name.clone()),
531 Err(e) => {
532 error!(proposal_id, error = %e, "timeout resolution aborted: session lock poisoned");
533 return;
534 }
535 };
536 let scope = P::Scope::from(conversation_name.clone());
537 let still_active = consensus
538 .storage()
539 .get_active_proposals(&scope)
540 .await
541 .map(|active| active.iter().any(|p| p.proposal_id == proposal_id))
542 .unwrap_or(false);
543 if !still_active {
544 return;
545 }
546 match consensus
547 .handle_consensus_timeout(&scope, proposal_id)
548 .await
549 {
550 Ok(_) => {}
551 Err(ConsensusError::SessionNotFound) | Err(ConsensusError::SessionNotActive) => {
552 let resolved_locally = arc
553 .read_or_err("session")
554 .map(|s| {
555 s.handle
556 .conversation
557 .is_consensus_outcome_applied(proposal_id)
558 })
559 .unwrap_or(false);
560 if resolved_locally {
561 tracing::debug!(
562 conversation = %conversation_name,
563 proposal_id,
564 "timeout fired for already-resolved proposal: ignoring"
565 );
566 } else {
567 tracing::warn!(
568 conversation = %conversation_name,
569 proposal_id,
570 "timeout fired for unknown proposal id: no session and not in resolved cache"
571 );
572 }
573 }
574 Err(e) => {
575 info!(proposal_id, error = %e, "timeout resolution skipped");
576 }
577 }
578 }
579
580 async fn cast_auto_vote(
583 arc: &Arc<RwLock<Self>>,
584 proposal_id: u32,
585 vote: bool,
586 ) -> Result<(), UserError> {
587 let (consensus, conversation_name) = {
588 let s = arc.read_or_err("session")?;
589 (s.consensus.clone(), s.conversation_name.clone())
590 };
591 let app_message = cast_vote::<P>(&conversation_name, proposal_id, vote, &consensus).await?;
592 let packet = {
593 let mut s = arc.write_or_err("session")?;
594 let app_id = Arc::clone(&s.app_id);
595 s.handle
596 .expect_mls_mut()?
597 .build_message(&app_message, &app_id)?
598 };
599 let transport = Arc::clone(arc.read_or_err("session")?.transport());
600 send_packet(&transport, packet)?;
601 Ok(())
602 }
603}