1use std::sync::{Arc, RwLock};
13
14use tracing::{error, info};
15
16use crate::{
17 app::{CreatorVote, LockExt, SessionRunner, UserError, session::runner::send_packet},
18 core::{
19 ConsensusPlugin, ConversationPluginsFactory, ElectionDecision, PeerScoringPlugin,
20 StewardListPlugin, member_set, scoring_member_diff, target_identity_of,
21 },
22 mls_crypto::MlsService,
23 protos::de_mls::messages::v1::{
24 AppMessage, ConversationSync, ConversationUpdateRequest, PeerScore,
25 StewardElectionProposal, TimingConfig, ViolationEvidence, conversation_update_request,
26 },
27};
28
29impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
30 pub fn sync_scoring_members(&mut self, mls_members: &[Vec<u8>]) {
36 let scored: Vec<Vec<u8>> = self
37 .handle
38 .scoring
39 .all_members_with_scores()
40 .into_iter()
41 .map(|(id, _)| id)
42 .collect();
43 let diff = scoring_member_diff(&scored, mls_members);
44 for member_id in &diff.to_add {
45 let _events = self.handle.scoring.add_member(member_id);
50 }
51 for member_id in &diff.to_remove {
52 self.handle.scoring.remove_member(member_id);
53 }
54 }
55
56 pub async fn steward_list_housekeeping(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
61 arc.write_or_err("session")?.try_auto_fill_steward_list()?;
62 if let Err(e) = Self::try_initiate_steward_election(arc, false, None).await {
63 let conv_name = arc.read_or_err("session")?.conversation_name.clone();
64 info!(conversation = %conv_name, error = %e, "election initiation deferred");
65 }
66 Ok(())
67 }
68
69 pub fn regenerate_steward_list(&mut self) -> Result<(), UserError> {
73 let mls = self.handle.expect_mls()?;
74 let current_epoch = mls.current_epoch()?;
75 let members = mls.members()?;
76 let sn = self
77 .handle
78 .steward_list
79 .config()
80 .compute_list_size(members.len());
81 let _events = self
83 .handle
84 .steward_list
85 .install_list(current_epoch, &members, sn, 0)?;
86 Ok(())
87 }
88
89 pub fn prune_pending_updates_after_commit(&mut self) -> Result<(), UserError> {
93 let (current_epoch, members, max_age) = {
94 let Some(mls) = self.handle.mls() else {
95 return Ok(());
96 };
97 (
98 mls.current_epoch()?,
99 mls.members()?,
100 self.handle.config.pending_update_max_epochs,
101 )
102 };
103
104 let before = self.handle.conversation.pending_update_count();
105 self.handle
106 .conversation
107 .prune_pending_updates_for_members(&members);
108 let expired = self
109 .handle
110 .conversation
111 .expire_pending_updates(current_epoch, max_age);
112 let after = self.handle.conversation.pending_update_count();
113 if before != after {
114 info!(
115 conversation = %self.conversation_name,
116 before,
117 after,
118 expired = expired.len(),
119 "pruned pending updates after commit"
120 );
121 }
122 Ok(())
123 }
124
125 pub async fn process_buffered_updates(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
129 let (current_epoch, to_propose, conversation_name): (
130 u64,
131 Vec<ConversationUpdateRequest>,
132 String,
133 ) = {
134 let s = arc.read_or_err("session")?;
135
136 let (current_epoch, members) = match s.handle.mls() {
137 Some(mls) => (mls.current_epoch()?, mls.members()?),
138 None => (0, Vec::new()),
139 };
140 let self_identity: &[u8] = &s.self_identity;
141 let eligible = s.handle.conversation.steward_eligibility(&members);
142 let is_live = s
143 .handle
144 .steward_list
145 .epoch_steward(current_epoch, &eligible)
146 .is_some_and(|es| es == self_identity);
147 if !is_live {
148 return Ok(());
149 }
150
151 let approved = s.handle.conversation.approved_proposals();
155 let approved_targets: std::collections::HashSet<&[u8]> =
156 approved.values().filter_map(target_identity_of).collect();
157 let members_set = member_set(&members);
158
159 let to_propose: Vec<ConversationUpdateRequest> = s
160 .handle
161 .conversation
162 .pending_updates()
163 .iter()
164 .filter(|(id, _)| !approved_targets.contains(id.as_slice()))
165 .filter(|(id, p)| {
166 let is_member = members_set.contains(id.as_slice());
168 match p.request.payload.as_ref() {
169 Some(conversation_update_request::Payload::InviteMember(_)) => !is_member,
170 Some(conversation_update_request::Payload::RemoveMember(_)) => is_member,
171 _ => false,
172 }
173 })
174 .map(|(_, p)| p.request.clone())
175 .collect();
176 (current_epoch, to_propose, s.conversation_name.clone())
177 };
178
179 if to_propose.is_empty() {
180 return Ok(());
181 }
182
183 info!(
184 conversation = %conversation_name,
185 epoch = current_epoch,
186 count = to_propose.len(),
187 "promoting buffered updates to proposals"
188 );
189
190 for request in to_propose {
193 if let Err(e) = Self::initiate_proposal(arc, request, CreatorVote::Deferred).await {
194 info!(
195 conversation = %conversation_name,
196 error = %e,
197 "buffered proposal deferred"
198 );
199 }
200 }
201 Ok(())
202 }
203
204 pub(crate) fn build_conversation_sync_packet(
208 &mut self,
209 ) -> Result<Option<crate::ds::OutboundPacket>, UserError> {
210 let scores: Vec<PeerScore> = self
216 .handle
217 .scoring
218 .snapshot()
219 .diverged
220 .into_iter()
221 .map(|(id, score)| PeerScore {
222 member_id: id,
223 score,
224 })
225 .collect();
226
227 let list = match self.handle.steward_list.current_list() {
228 Some(l) => l,
229 None => return Ok(None),
230 };
231
232 let timing = TimingConfig::from(&self.handle.config);
233
234 let mls_members = self.handle.expect_mls()?.members()?;
238 let steward_members = {
239 let eligible = self.handle.conversation.steward_eligibility(&mls_members);
240 self.handle.steward_list.steward_members(&eligible)
241 };
242
243 let sync = ConversationSync {
248 steward_members,
249 election_epoch: list.election_epoch(),
250 sn_min: list.config().sn_min as u32,
251 sn_max: list.config().sn_max as u32,
252 allow_subset_candidates: self.handle.steward_list.config().allow_subset_candidates,
253 peer_scores: scores,
254 timing: Some(timing),
255 retry_round: list.retry_round(),
256 max_reelection_attempts: self.handle.steward_list.max_retries(),
257 liveness_criteria_yes: self.handle.config.liveness_criteria_yes,
258 threshold_peer_score: self.handle.scoring.threshold(),
259 pending_update_max_epochs: self.handle.config.pending_update_max_epochs,
260 };
261
262 let app_msg: AppMessage = sync.into();
263 let app_id = Arc::clone(&self.app_id);
264 Ok(Some(
265 self.handle
266 .expect_mls_mut()?
267 .build_message(&app_msg, &app_id)?,
268 ))
269 }
270
271 pub async fn send_conversation_sync(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
279 let (transport, packet, conversation_name) = {
280 let mut s = arc.write_or_err("session")?;
281 let transport = Arc::clone(s.transport());
282 let conversation_name = s.conversation_name.clone();
283 let Some(packet) = s.build_conversation_sync_packet()? else {
284 return Ok(());
285 };
286 (transport, packet, conversation_name)
287 };
288 send_packet(&transport, packet)?;
289 info!(conversation = %conversation_name, "conversation sync sent");
290 Ok(())
291 }
292
293 pub async fn check_and_initiate_score_removals(
297 arc: &Arc<RwLock<Self>>,
298 ) -> Result<(), UserError> {
299 let (epoch, to_remove, self_id_arc, conversation_name) = {
304 let mut s = arc.write_or_err("session")?;
305 let epoch = s.handle.expect_mls()?.current_epoch()?;
306 let self_id_arc = Arc::clone(&s.self_identity);
307 let is_steward = s.handle.steward_list.is_steward(&self_id_arc);
308 if !is_steward {
309 return Ok(());
310 }
311 let self_id: &[u8] = &self_id_arc;
323 let to_remove: Vec<(Vec<u8>, i64)> = s
324 .handle
325 .scoring
326 .members_below_threshold()
327 .into_iter()
328 .filter(|id| id.as_slice() != self_id)
329 .filter(|id| !s.handle.conversation.has_pending_removal(id))
330 .filter(|id| !s.handle.conversation.is_pending_removal(id))
331 .filter_map(|id| {
332 let score = s.handle.scoring.score_for(&id)?;
333 Some((id, score))
334 })
335 .collect();
336 for (id, _) in &to_remove {
337 s.handle.conversation.observe_pending_removal(id.clone());
338 }
339 (epoch, to_remove, self_id_arc, s.conversation_name.clone())
340 };
341
342 for (target_id, current_score) in to_remove {
344 let evidence =
345 ViolationEvidence::score_below_threshold(target_id.clone(), epoch, current_score)
346 .with_creator(self_id_arc.to_vec());
347 let request = evidence.into_update_request()?;
348
349 info!(
350 conversation = %conversation_name,
351 target = ?target_id,
352 score = current_score,
353 "initiating SCORE_BELOW_THRESHOLD removal"
354 );
355 if let Err(e) = Self::initiate_proposal(arc, request, CreatorVote::Yes).await {
359 arc.write_or_err("session")?
360 .handle
361 .conversation
362 .resolve_pending_removal(&target_id);
363 error!(
364 conversation = %conversation_name,
365 target = ?target_id,
366 error = %e,
367 "SCORE_BELOW_THRESHOLD vote failed to start"
368 );
369 }
370 }
371
372 Ok(())
373 }
374
375 pub(crate) async fn try_initiate_steward_election(
385 arc: &Arc<RwLock<Self>>,
386 recovery: bool,
387 extra_exclude: Option<&[u8]>,
388 ) -> Result<(), UserError> {
389 let (proposed_stewards, election_epoch, retry_round, conversation_name) = {
390 let s = arc.read_or_err("session")?;
391 let mls = s.handle.expect_mls()?;
392 let epoch = mls.current_epoch()?;
393 let mls_members = mls.members()?;
394 let self_identity: &[u8] = &s.self_identity;
395
396 if s.handle.conversation.has_election_in_flight() {
399 return Ok(());
400 }
401
402 let candidate_pool: Vec<Vec<u8>> = mls_members
405 .iter()
406 .filter(|m| {
407 if extra_exclude.is_some_and(|x| x == m.as_slice()) {
408 return false;
409 }
410 if recovery && s.handle.conversation.is_pending_removal(m) {
411 return false;
412 }
413 true
414 })
415 .cloned()
416 .collect();
417 let pool_set: std::collections::HashSet<&[u8]> =
418 candidate_pool.iter().map(Vec::as_slice).collect();
419 let eligible = |c: &[u8]| pool_set.contains(c);
420
421 match s.handle.steward_list.propose_election(
422 epoch,
423 &candidate_pool,
424 self_identity,
425 eligible,
426 recovery,
427 )? {
428 ElectionDecision::Skip(reason) => {
429 if matches!(reason, "no eligible candidates after filter") {
430 info!(
431 conversation = %s.conversation_name,
432 "skipping election: {reason}"
433 );
434 }
435 return Ok(());
436 }
437 ElectionDecision::Proposed {
438 proposed_stewards,
439 election_epoch,
440 retry_round,
441 } => (
442 proposed_stewards,
443 election_epoch,
444 retry_round,
445 s.conversation_name.clone(),
446 ),
447 }
448 };
449
450 let stewards_len = proposed_stewards.len();
451 let request = ConversationUpdateRequest {
452 payload: Some(conversation_update_request::Payload::StewardElection(
453 StewardElectionProposal {
454 proposed_stewards,
455 election_epoch,
456 retry_round,
457 },
458 )),
459 };
460
461 info!(
462 conversation = %conversation_name,
463 epoch = election_epoch,
464 retry_round,
465 stewards = stewards_len,
466 recovery,
467 "initiating steward election"
468 );
469
470 Self::initiate_proposal(arc, request, CreatorVote::Deferred).await?;
473
474 Ok(())
475 }
476
477 pub(crate) async fn try_initiate_deadlock_ecp(
481 arc: &Arc<RwLock<Self>>,
482 ) -> Result<(), UserError> {
483 let (is_authorized, self_id, epoch, conversation_name) = {
484 let s = arc.read_or_err("session")?;
485 let mls = s.handle.expect_mls()?;
486 let mls_members = mls.members()?;
487 let epoch = mls.current_epoch()?;
488 let self_id: &[u8] = &s.self_identity;
489 let mls_set: std::collections::HashSet<&[u8]> =
492 mls_members.iter().map(Vec::as_slice).collect();
493 let conversation_ref = &s.handle.conversation;
494 let authorized = s
495 .handle
496 .steward_list
497 .election_proposer(|c: &[u8]| {
498 mls_set.contains(c) && !conversation_ref.is_pending_removal(c)
499 })
500 .is_some_and(|proposer| proposer == self_id);
501 (
502 authorized,
503 Arc::clone(&s.self_identity),
504 epoch,
505 s.conversation_name.clone(),
506 )
507 };
508
509 if !is_authorized {
510 return Ok(());
511 }
512
513 let request = ViolationEvidence::deadlock(epoch)
514 .with_creator(self_id.to_vec())
515 .into_update_request()?;
516
517 info!(
518 conversation = %conversation_name,
519 epoch, "initiating Deadlock ECP"
520 );
521
522 Self::initiate_proposal(arc, request, CreatorVote::Yes).await?;
525 Ok(())
526 }
527
528 fn try_auto_fill_steward_list(&mut self) -> Result<(), UserError> {
535 let mls = self.handle.expect_mls()?;
536 let epoch = mls.current_epoch()?;
537 let members = mls.members()?;
538 let _events = self.handle.steward_list.maybe_auto_fill(epoch, &members)?;
539 Ok(())
540 }
541}