1use std::sync::{Arc, RwLock};
16
17use hashgraph_like_consensus::protos::consensus::v1::Proposal;
18use prost::Message;
19use tracing::{error, info};
20
21use crate::{
22 app::{
23 ConversationState, LockExt, SessionRunner, UserError,
24 session::{
25 consensus::build_vote_banner_event,
26 consensus_bridge::{forward_incoming_proposal, forward_incoming_vote},
27 runner::send_packet,
28 },
29 },
30 core::{
31 ConsensusPlugin, ConversationPluginsFactory, PeerScoringPlugin, ProcessResult,
32 ProposalKind, ScoreSnapshot, SessionEvent, StewardList, StewardListConfig,
33 StewardListPlugin, member_set,
34 },
35 mls_crypto::MlsService,
36 protos::de_mls::messages::v1::{
37 AppMessage, ConversationMessage, ConversationSync, ConversationUpdateRequest, TimingConfig,
38 conversation_update_request,
39 },
40};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum DispatchOutcome {
49 Done,
51 LeaveRequested,
54}
55
56impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
57 pub(crate) async fn dispatch_inbound_result(
63 arc: &Arc<RwLock<Self>>,
64 result: ProcessResult,
65 ) -> Result<DispatchOutcome, UserError> {
66 match result {
67 ProcessResult::AppMessage(msg) => {
68 arc.read_or_err("session")?
69 .emit_event(SessionEvent::AppMessage(*msg));
70 Ok(DispatchOutcome::Done)
71 }
72 ProcessResult::Proposal(proposal) => {
73 Self::on_incoming_proposal(arc, *proposal).await?;
74 Ok(DispatchOutcome::Done)
75 }
76 ProcessResult::Vote(vote) => {
77 let proposal_id = vote.proposal_id;
78 let (consensus, conversation_name, outcome_applied) = {
79 let s = arc.read_or_err("session")?;
80 (
81 s.consensus.clone(),
82 s.conversation_name.clone(),
83 s.handle
84 .conversation
85 .is_consensus_outcome_applied(proposal_id),
86 )
87 };
88 forward_incoming_vote::<P>(&conversation_name, *vote, &consensus, outcome_applied)
89 .await?;
90 Ok(DispatchOutcome::Done)
91 }
92 ProcessResult::MembershipChangeReceived(request) => {
93 Self::handle_incoming_update_request(arc, *request).await?;
94 Ok(DispatchOutcome::Done)
95 }
96 ProcessResult::JoinedConversation(_name) => {
97 Self::on_joined_conversation(arc).await?;
101 Ok(DispatchOutcome::Done)
102 }
103 ProcessResult::ConversationUpdated => {
104 Self::on_conversation_updated(arc).await?;
105 Ok(DispatchOutcome::Done)
106 }
107 ProcessResult::LeaveConversation => {
108 Self::prepare_self_leave(arc)?;
109 Ok(DispatchOutcome::LeaveRequested)
110 }
111 ProcessResult::CommitCandidateReceived { steward } => {
112 Self::on_commit_candidate_received(arc, &steward).await?;
113 Ok(DispatchOutcome::Done)
114 }
115 ProcessResult::ConversationSyncReceived(sync) => {
116 Self::on_conversation_sync(arc, *sync)?;
117 Ok(DispatchOutcome::Done)
118 }
119 ProcessResult::Noop(reason) => {
120 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
121 tracing::debug!(
122 conversation = %conv_name,
123 ?reason,
124 "inbound dispatched as noop"
125 );
126 Ok(DispatchOutcome::Done)
127 }
128 }
129 }
130
131 async fn on_incoming_proposal(
143 arc: &Arc<RwLock<Self>>,
144 proposal: Proposal,
145 ) -> Result<(), UserError> {
146 let decoded = ConversationUpdateRequest::decode(proposal.payload.as_slice()).ok();
147 if let Some(req) = decoded.as_ref() {
148 let mut s = arc.write_or_err("session")?;
149 let current_epoch = match s.handle.mls() {
150 Some(mls) => mls.current_epoch()?,
151 None => 0,
152 };
153 match &req.payload {
154 Some(conversation_update_request::Payload::EmergencyCriteria(_)) => {
155 s.handle
156 .conversation
157 .observe_emergency(proposal.proposal_id);
158 }
159 Some(conversation_update_request::Payload::InviteMember(_))
160 | Some(conversation_update_request::Payload::RemoveMember(_)) => {
161 s.handle
162 .conversation
163 .buffer_pending_update(req.clone(), current_epoch);
164 }
165 _ => {}
166 }
167 }
168 let proposal_id = proposal.proposal_id;
169 let expected_voters = proposal.expected_voters_count;
170 let payload = proposal.payload.clone();
171 let kind = decoded
172 .as_ref()
173 .map(ProposalKind::of)
174 .unwrap_or(ProposalKind::Commit);
175 let (consensus, conversation_name) = {
176 let s = arc.read_or_err("session")?;
177 (s.consensus.clone(), s.conversation_name.clone())
178 };
179 forward_incoming_proposal::<P>(&conversation_name, proposal, &consensus).await?;
180 if expected_voters > 1 {
184 let banner = build_vote_banner_event(&conversation_name, proposal_id, payload);
185 arc.read_or_err("session")?
186 .emit_event(SessionEvent::AppMessage(banner));
187 let (delay, vote) = {
188 let s = arc.read_or_err("session")?;
189 (
190 s.handle.config.voting_delay_for(kind),
191 s.handle.config.liveness_criteria_yes,
192 )
193 };
194 arc.write_or_err("session")?
195 .register_auto_vote(proposal_id, delay, vote);
196 }
197 Ok(())
198 }
199
200 async fn on_joined_conversation(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
204 arc.write_or_err("session")?
205 .prune_pending_updates_after_commit()?;
206
207 let (packet, mls_members, conversation_name) = {
208 let mut s = arc.write_or_err("session")?;
209 let msg: AppMessage = ConversationMessage {
210 message: format!("User {} joined the conversation", s.identity_display)
211 .into_bytes(),
212 sender: "SYSTEM".to_string(),
213 conversation_name: s.conversation_name.clone(),
214 }
215 .into();
216 let app_id = Arc::clone(&s.app_id);
217 let conversation_name = s.conversation_name.clone();
218 let mls = s.handle.expect_mls_mut()?;
219 let members = mls.members().unwrap_or_default();
220 let packet = mls.build_message(&msg, &app_id)?;
221 (packet, members, conversation_name)
222 };
223 let transport = Arc::clone(arc.read_or_err("session")?.transport());
224 send_packet(&transport, packet)?;
225 arc.read_or_err("session")?.emit_event(SessionEvent::Joined);
226 arc.write_or_err("session")?
227 .sync_scoring_members(&mls_members);
228
229 let event = arc.write_or_err("session")?.start_working();
230 arc.read_or_err("session")?
231 .emit_event(SessionEvent::PhaseChange(event));
232 info!(conversation = %conversation_name, "joined conversation");
233 Ok(())
234 }
235
236 async fn on_conversation_updated(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
241 let mls_members = {
242 let s = arc.read_or_err("session")?;
243 match s.handle.mls() {
244 Some(mls) => mls.members().unwrap_or_default(),
245 None => Vec::new(),
246 }
247 };
248 arc.write_or_err("session")?
249 .sync_scoring_members(&mls_members);
250 arc.write_or_err("session")?
251 .prune_pending_updates_after_commit()?;
252
253 let working_event = {
257 let mut s = arc.write_or_err("session")?;
258 s.handle.steward_list.reset_retry();
259 let state = s.handle.current_state();
260 if matches!(
261 state,
262 ConversationState::Working
263 | ConversationState::Freezing
264 | ConversationState::Selection
265 | ConversationState::Reelection
266 ) {
267 Some(s.start_working())
268 } else {
269 None
270 }
271 };
272
273 Self::steward_list_housekeeping(arc).await?;
274 Self::process_buffered_updates(arc).await?;
275 Self::maybe_close_recovery_window(arc).await;
276
277 if let Some(event) = working_event {
278 arc.read_or_err("session")?
279 .emit_event(SessionEvent::PhaseChange(event));
280 }
281 Ok(())
282 }
283
284 async fn maybe_close_recovery_window(arc: &Arc<RwLock<Self>>) {
287 let in_recovery_mode = match arc.read_or_err("session") {
288 Ok(s) => s.handle.is_in_recovery_mode(),
289 Err(e) => {
290 tracing::warn!(error = %e, "recovery window check skipped: session lock poisoned");
291 return;
292 }
293 };
294 if !in_recovery_mode {
295 return;
296 }
297 if let Err(e) = Self::try_initiate_steward_election(arc, true, None).await {
298 let conv_name = arc
299 .read_or_err("session")
300 .map(|s| s.conversation_name.clone())
301 .unwrap_or_else(|_| "<poisoned>".to_string());
302 info!(
303 conversation = %conv_name,
304 error = %e,
305 "post-recovery election deferred"
306 );
307 }
308 }
309
310 fn prepare_self_leave(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
315 arc.read_or_err("session")?
316 .emit_event(SessionEvent::Leaving);
317 let taken_mls = arc.write_or_err("session")?.handle.take_mls();
322 if let Some(mut mls) = taken_mls {
323 mls.delete()?;
324 }
325 Ok(())
326 }
327
328 async fn on_commit_candidate_received(
331 arc: &Arc<RwLock<Self>>,
332 steward: &[u8],
333 ) -> Result<(), UserError> {
334 {
335 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
336 tracing::debug!(
337 conversation = %conv_name,
338 steward = ?steward,
339 "candidate received from peer steward"
340 );
341 }
342 let (event, outbound) = {
343 let mut s = arc.write_or_err("session")?;
344 if s.handle.current_state() != ConversationState::Working {
345 return Ok(());
346 }
347
348 let event = s.start_freezing();
349 let epoch = s.handle.expect_mls()?.current_epoch()?;
350 s.handle.conversation.ensure_freeze_round(epoch);
351
352 let self_identity = Arc::clone(&s.self_identity);
353 let app_id = Arc::clone(&s.app_id);
354 let outbound = if s.handle.steward_list.is_steward(&self_identity) {
355 match s.handle.create_commit_candidate(&self_identity, &app_id) {
356 Ok(packets) => packets,
357 Err(e) => {
358 error!(
359 conversation = %s.conversation_name,
360 error = %e,
361 "own commit candidate build failed"
362 );
363 None
364 }
365 }
366 } else {
367 None
368 };
369 (event, outbound)
370 };
371
372 arc.read_or_err("session")?
373 .emit_event(SessionEvent::PhaseChange(event));
374 if let Some(message) = outbound {
375 let transport = Arc::clone(arc.read_or_err("session")?.transport());
376 send_packet(&transport, message)?;
377 }
378 Ok(())
379 }
380
381 fn on_conversation_sync(
386 arc: &Arc<RwLock<Self>>,
387 sync: ConversationSync,
388 ) -> Result<(), UserError> {
389 let (members, current_epoch, local_default_peer_score, conversation_name) = {
390 let s = arc.read_or_err("session")?;
391 if s.handle.steward_list.current_list().is_some() {
392 return Ok(());
393 }
394 let mls = s.handle.expect_mls()?;
395 (
396 mls.members()?,
397 mls.current_epoch()?,
398 s.handle.scoring.default_score(),
399 s.conversation_name.clone(),
400 )
401 };
402 if !validate_conversation_sync(
403 &conversation_name,
404 &sync,
405 current_epoch,
406 &members,
407 local_default_peer_score,
408 )? {
409 return Ok(());
410 }
411
412 let sn = sync.steward_members.len();
413 arc.write_or_err("session")?
414 .apply_conversation_sync_to_entry(&sync)?;
415
416 info!(
417 conversation = %conversation_name,
418 election_epoch = sync.election_epoch,
419 stewards = sn,
420 scores = sync.peer_scores.len(),
421 timing = sync.timing.is_some(),
422 "conversation sync applied"
423 );
424 Ok(())
425 }
426
427 fn apply_conversation_sync_to_entry(
428 &mut self,
429 sync: &ConversationSync,
430 ) -> Result<(), UserError> {
431 let mut protocol_config =
432 StewardListConfig::new(sync.sn_min as usize, sync.sn_max as usize)?;
433 protocol_config.allow_subset_candidates = sync.allow_subset_candidates;
434
435 let sn = sync.steward_members.len();
436 self.handle.steward_list.set_config(protocol_config);
437 let _events = self.handle.steward_list.install_list(
438 sync.election_epoch,
439 &sync.steward_members,
440 sn,
441 sync.retry_round,
442 )?;
443 self.handle
444 .steward_list
445 .set_max_retries(sync.max_reelection_attempts);
446 self.handle.scoring.set_threshold(sync.threshold_peer_score);
447 let snapshot = ScoreSnapshot {
448 diverged: sync
449 .peer_scores
450 .iter()
451 .map(|ps| (ps.member_id.clone(), ps.score))
452 .collect(),
453 };
454 let _events = self.handle.scoring.apply_snapshot(&snapshot);
460 self.handle.config.liveness_criteria_yes = sync.liveness_criteria_yes;
461 self.handle.config.pending_update_max_epochs = sync.pending_update_max_epochs;
462 if let Some(timing) = &sync.timing {
463 self.handle.config.apply_timing(timing);
464 }
465 Ok(())
466 }
467}
468
469fn validate_conversation_sync(
481 conversation_name: &str,
482 sync: &ConversationSync,
483 current_epoch: u64,
484 members: &[Vec<u8>],
485 local_default_peer_score: i64,
486) -> Result<bool, UserError> {
487 if sync.election_epoch > current_epoch {
488 info!(
489 conversation = conversation_name,
490 election_epoch = sync.election_epoch,
491 current_epoch,
492 "conversation sync rejected: election_epoch > current_epoch"
493 );
494 return Ok(false);
495 }
496
497 let members_set = member_set(members);
498 let any_present = sync
499 .steward_members
500 .iter()
501 .any(|s| members_set.contains(s.as_slice()));
502 let ordering_valid = StewardList::validate(
503 &sync.steward_members,
504 sync.election_epoch,
505 conversation_name.as_bytes(),
506 &sync.steward_members,
507 &StewardListConfig::new(sync.sn_min as usize, sync.sn_max as usize)?,
508 sync.retry_round,
509 )?;
510 if !(any_present && ordering_valid) {
511 info!(
512 conversation = conversation_name,
513 any_present,
514 ordering = ordering_valid,
515 "conversation sync rejected: invalid"
516 );
517 return Ok(false);
518 }
519
520 if let Some(timing) = &sync.timing
521 && let Some(zero_field) = first_zero_timing_field(timing)
522 {
523 info!(
524 conversation = conversation_name,
525 field = zero_field,
526 "conversation sync rejected: zero-valued timing field"
527 );
528 return Ok(false);
529 }
530
531 if local_default_peer_score <= sync.threshold_peer_score {
532 info!(
533 conversation = conversation_name,
534 local_default_peer_score,
535 threshold_peer_score = sync.threshold_peer_score,
536 "conversation sync rejected: default_peer_score at or below threshold would mark new members removable on add"
537 );
538 return Ok(false);
539 }
540 Ok(true)
541}
542
543fn first_zero_timing_field(timing: &TimingConfig) -> Option<&'static str> {
548 if timing.commit_inactivity_duration_ms == 0 {
549 Some("commit_inactivity_duration_ms")
550 } else if timing.freeze_duration_ms == 0 {
551 Some("freeze_duration_ms")
552 } else if timing.proposal_expiration_ms == 0 {
553 Some("proposal_expiration_ms")
554 } else if timing.consensus_timeout_ms == 0 {
555 Some("consensus_timeout_ms")
556 } else if timing.recovery_inactivity_duration_ms == 0 {
557 Some("recovery_inactivity_duration_ms")
558 } else {
559 None
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use crate::protos::de_mls::messages::v1::TimingConfig;
567
568 fn nonzero_timing() -> TimingConfig {
569 TimingConfig {
570 commit_inactivity_duration_ms: 60_000,
571 freeze_duration_ms: 30_000,
572 proposal_expiration_ms: 3_600_000,
573 consensus_timeout_ms: 30_000,
574 recovery_inactivity_duration_ms: 5_000,
575 }
576 }
577
578 #[test]
579 fn nonzero_timing_passes() {
580 assert!(first_zero_timing_field(&nonzero_timing()).is_none());
581 }
582
583 fn valid_sync_with(threshold: i64) -> ConversationSync {
584 ConversationSync {
585 steward_members: vec![b"alice".to_vec()],
586 election_epoch: 0,
587 sn_min: 1,
588 sn_max: 5,
589 allow_subset_candidates: false,
590 peer_scores: vec![],
591 timing: Some(nonzero_timing()),
592 retry_round: 0,
593 max_reelection_attempts: 1,
594 liveness_criteria_yes: true,
595 threshold_peer_score: threshold,
596 pending_update_max_epochs: 3,
597 }
598 }
599
600 #[test]
603 fn validate_accepts_default_above_threshold() {
604 let sync = valid_sync_with(0);
605 assert!(validate_conversation_sync("g", &sync, 0, &[b"alice".to_vec()], 100).unwrap());
606 }
607
608 #[test]
612 fn validate_rejects_default_equal_to_threshold() {
613 let sync = valid_sync_with(50);
614 assert!(!validate_conversation_sync("g", &sync, 0, &[b"alice".to_vec()], 50).unwrap());
615 }
616
617 #[test]
620 fn validate_rejects_default_below_threshold() {
621 let sync = valid_sync_with(100);
622 assert!(!validate_conversation_sync("g", &sync, 0, &[b"alice".to_vec()], 50).unwrap());
623 }
624
625 #[test]
626 fn each_zero_field_is_detected() {
627 let cases = [
628 (
629 "commit_inactivity_duration_ms",
630 TimingConfig {
631 commit_inactivity_duration_ms: 0,
632 ..nonzero_timing()
633 },
634 ),
635 (
636 "freeze_duration_ms",
637 TimingConfig {
638 freeze_duration_ms: 0,
639 ..nonzero_timing()
640 },
641 ),
642 (
643 "proposal_expiration_ms",
644 TimingConfig {
645 proposal_expiration_ms: 0,
646 ..nonzero_timing()
647 },
648 ),
649 (
650 "consensus_timeout_ms",
651 TimingConfig {
652 consensus_timeout_ms: 0,
653 ..nonzero_timing()
654 },
655 ),
656 (
657 "recovery_inactivity_duration_ms",
658 TimingConfig {
659 recovery_inactivity_duration_ms: 0,
660 ..nonzero_timing()
661 },
662 ),
663 ];
664 for (name, timing) in cases {
665 assert_eq!(
666 first_zero_timing_field(&timing),
667 Some(name),
668 "expected field {name} to be detected as zero"
669 );
670 }
671 }
672}